]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/libsystemd-bus/sd-bus.c
udev: always set selinux label at "add" events
[thirdparty/systemd.git] / src / libsystemd-bus / sd-bus.c
CommitLineData
de1c301e
LP
1/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3/***
4 This file is part of systemd.
5
6 Copyright 2013 Lennart Poettering
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20***/
21
22#include <endian.h>
23#include <assert.h>
24#include <stdlib.h>
25#include <unistd.h>
26#include <netdb.h>
27#include <sys/poll.h>
28#include <byteswap.h>
29
30#include "util.h"
31#include "macro.h"
32
33#include "sd-bus.h"
34#include "bus-internal.h"
35#include "bus-message.h"
36#include "bus-type.h"
37
38#define WQUEUE_MAX 128
39
e3017af9
LP
40static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
41
de1c301e
LP
42static void bus_free(sd_bus *b) {
43 struct filter_callback *f;
89ffcd2a 44 unsigned i;
de1c301e
LP
45
46 assert(b);
47
48 if (b->fd >= 0)
49 close_nointr_nofail(b->fd);
50
51 free(b->rbuffer);
89ffcd2a
LP
52 free(b->unique_name);
53 free(b->auth_uid);
54 free(b->address);
55
56 for (i = 0; i < b->rqueue_size; i++)
57 sd_bus_message_unref(b->rqueue[i]);
de1c301e 58 free(b->rqueue);
89ffcd2a
LP
59
60 for (i = 0; i < b->wqueue_size; i++)
61 sd_bus_message_unref(b->wqueue[i]);
de1c301e 62 free(b->wqueue);
de1c301e
LP
63
64 hashmap_free_free(b->reply_callbacks);
e3017af9 65 prioq_free(b->reply_callbacks_prioq);
de1c301e
LP
66
67 while ((f = b->filter_callbacks)) {
68 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
69 free(f);
70 }
71
72 free(b);
73}
74
75static sd_bus* bus_new(void) {
76 sd_bus *r;
77
78 r = new0(sd_bus, 1);
79 if (!r)
80 return NULL;
81
82 r->n_ref = 1;
83 r->fd = -1;
84 r->message_version = 1;
85
86 /* We guarantee that wqueue always has space for at least one
87 * entry */
88 r->wqueue = new(sd_bus_message*, 1);
89 if (!r->wqueue) {
90 free(r);
91 return NULL;
92 }
93
94 return r;
95};
96
e3017af9 97static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
de1c301e
LP
98 const char *s;
99 int r;
100
101 assert(bus);
e3017af9
LP
102
103 if (error != 0)
104 return -error;
105
de1c301e
LP
106 assert(reply);
107
108 bus->state = BUS_RUNNING;
109
110 r = sd_bus_message_read(reply, "s", &s);
111 if (r < 0)
112 return r;
113
114 bus->unique_name = strdup(s);
115 if (!bus->unique_name)
116 return -ENOMEM;
117
118 return 1;
119}
120
121static int bus_send_hello(sd_bus *bus) {
122 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
123 int r;
124
125 assert(bus);
126
127 r = sd_bus_message_new_method_call(
128 bus,
129 "org.freedesktop.DBus",
130 "/",
131 "org.freedesktop.DBus",
132 "Hello",
133 &m);
134 if (r < 0)
135 return r;
136
e3017af9 137 r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
de1c301e
LP
138 if (r < 0)
139 return r;
140
89ffcd2a
LP
141 bus->sent_hello = true;
142 return r;
de1c301e
LP
143}
144
145static int bus_start_running(sd_bus *bus) {
de1c301e
LP
146 assert(bus);
147
89ffcd2a 148 if (bus->sent_hello) {
de1c301e 149 bus->state = BUS_HELLO;
e3017af9 150 return 1;
de1c301e
LP
151 }
152
153 bus->state = BUS_RUNNING;
e3017af9 154 return 1;
de1c301e
LP
155}
156
157static int parse_address_key(const char **p, const char *key, char **value) {
158 size_t l, n = 0;
159 const char *a;
160 char *r = NULL;
161
162 assert(p);
163 assert(*p);
164 assert(key);
165 assert(value);
166
167 l = strlen(key);
89ffcd2a 168 if (strncmp(*p, key, l) != 0)
de1c301e
LP
169 return 0;
170
171 if ((*p)[l] != '=')
172 return 0;
173
174 if (*value)
175 return -EINVAL;
176
177 a = *p + l + 1;
89ffcd2a 178 while (*a != ',' && *a != 0) {
de1c301e
LP
179 char c, *t;
180
181 if (*a == '%') {
182 int x, y;
183
184 x = unhexchar(a[1]);
185 if (x < 0) {
186 free(r);
187 return x;
188 }
189
190 y = unhexchar(a[2]);
191 if (y < 0) {
192 free(r);
193 return y;
194 }
195
de1c301e 196 c = (char) ((x << 4) | y);
89ffcd2a
LP
197 a += 3;
198 } else {
de1c301e 199 c = *a;
89ffcd2a
LP
200 a++;
201 }
de1c301e 202
89ffcd2a 203 t = realloc(r, n + 2);
de1c301e
LP
204 if (!t) {
205 free(r);
206 return -ENOMEM;
207 }
208
209 r = t;
210 r[n++] = c;
211 }
212
89ffcd2a
LP
213 if (!r) {
214 r = strdup("");
215 if (!r)
216 return -ENOMEM;
217 } else
218 r[n] = 0;
219
220 if (*a == ',')
221 a++;
222
de1c301e
LP
223 *p = a;
224 *value = r;
225 return 1;
226}
227
228static void skip_address_key(const char **p) {
229 assert(p);
230 assert(*p);
231
89ffcd2a
LP
232 *p += strcspn(*p, ",");
233
234 if (**p == ',')
235 (*p) ++;
de1c301e
LP
236}
237
238static int bus_parse_next_address(sd_bus *b) {
239 const char *a, *p;
240 _cleanup_free_ char *guid = NULL;
241 int r;
242
243 assert(b);
244
245 if (!b->address)
246 return 0;
247 if (b->address[b->address_index] == 0)
248 return 0;
249
250 a = b->address + b->address_index;
251
252 zero(b->sockaddr);
253 b->sockaddr_size = 0;
254 b->peer = SD_ID128_NULL;
255
256 if (startswith(a, "unix:")) {
257 _cleanup_free_ char *path = NULL, *abstract = NULL;
258
259 p = a + 5;
89ffcd2a 260 while (*p != 0) {
de1c301e
LP
261 r = parse_address_key(&p, "guid", &guid);
262 if (r < 0)
263 return r;
264 else if (r > 0)
265 continue;
266
267 r = parse_address_key(&p, "path", &path);
268 if (r < 0)
269 return r;
270 else if (r > 0)
271 continue;
272
273 r = parse_address_key(&p, "abstract", &abstract);
274 if (r < 0)
275 return r;
276 else if (r > 0)
277 continue;
278
279 skip_address_key(&p);
280 }
281
282 if (!path && !abstract)
283 return -EINVAL;
284
285 if (path && abstract)
286 return -EINVAL;
287
288 if (path) {
289 size_t l;
290
291 l = strlen(path);
292 if (l > sizeof(b->sockaddr.un.sun_path))
293 return -E2BIG;
294
295 b->sockaddr.un.sun_family = AF_UNIX;
296 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
297 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
298 } else if (abstract) {
299 size_t l;
300
89ffcd2a 301 l = strlen(abstract);
de1c301e
LP
302 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
303 return -E2BIG;
304
305 b->sockaddr.un.sun_family = AF_UNIX;
306 b->sockaddr.un.sun_path[0] = 0;
89ffcd2a 307 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
de1c301e
LP
308 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
309 }
310
311 } else if (startswith(a, "tcp:")) {
312 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
313 struct addrinfo hints, *result;
314
315 p = a + 4;
89ffcd2a 316 while (*p != 0) {
de1c301e
LP
317 r = parse_address_key(&p, "guid", &guid);
318 if (r < 0)
319 return r;
320 else if (r > 0)
321 continue;
322
323 r = parse_address_key(&p, "host", &host);
324 if (r < 0)
325 return r;
326 else if (r > 0)
327 continue;
328
329 r = parse_address_key(&p, "port", &port);
330 if (r < 0)
331 return r;
332 else if (r > 0)
333 continue;
334
335 r = parse_address_key(&p, "family", &family);
336 if (r < 0)
337 return r;
338 else if (r > 0)
339 continue;
340
341 skip_address_key(&p);
342 }
343
344 if (!host || !port)
345 return -EINVAL;
346
347 zero(hints);
348 hints.ai_socktype = SOCK_STREAM;
349 hints.ai_flags = AI_ADDRCONFIG;
350
351 if (family) {
352 if (streq(family, "ipv4"))
353 hints.ai_family = AF_INET;
354 else if (streq(family, "ipv6"))
355 hints.ai_family = AF_INET6;
356 else
357 return -EINVAL;
358 }
359
360 r = getaddrinfo(host, port, &hints, &result);
361 if (r == EAI_SYSTEM)
362 return -errno;
363 else if (r != 0)
364 return -EADDRNOTAVAIL;
365
366 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
367 b->sockaddr_size = result->ai_addrlen;
368
369 freeaddrinfo(result);
370 }
371
372 if (guid) {
373 r = sd_id128_from_string(guid, &b->peer);
374 if (r < 0)
375 return r;
376 }
377
378 b->address_index = p - b->address;
379 return 1;
380}
381
382static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
383
384 while (size > 0) {
385 struct iovec *i = iov + *idx;
386
387 if (i->iov_len > size) {
388 i->iov_base = (uint8_t*) i->iov_base + size;
389 i->iov_len -= size;
390 return;
391 }
392
393 size -= i->iov_len;
394
395 i->iov_base = NULL;
396 i->iov_len = 0;
397
398 (*idx) ++;
399 }
400}
401
402static int bus_write_auth(sd_bus *b) {
403 struct msghdr mh;
404 ssize_t k;
405
406 assert(b);
407 assert(b->state == BUS_AUTHENTICATING);
408
409 if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
410 return 0;
411
e3017af9
LP
412 if (b->auth_timeout == 0)
413 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
414
de1c301e
LP
415 zero(mh);
416 mh.msg_iov = b->auth_iovec + b->auth_index;
417 mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
418
419 k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
420 if (k < 0)
421 return errno == EAGAIN ? 0 : -errno;
422
423 iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
424
425 return 1;
426}
427
428static int bus_auth_verify(sd_bus *b) {
429 char *e, *f;
430 sd_id128_t peer;
431 unsigned i;
432 int r;
433
434 /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
435 * that's it */
436
437 e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
438 if (!e)
439 return 0;
440
89ffcd2a 441 f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
de1c301e
LP
442 if (!f)
443 return 0;
444
445 if (e - (char*) b->rbuffer != 3 + 32)
446 return -EPERM;
447
448 if (memcmp(b->rbuffer, "OK ", 3))
449 return -EPERM;
450
451 for (i = 0; i < 32; i += 2) {
452 int x, y;
453
454 x = unhexchar(((char*) b->rbuffer)[3 + i]);
89ffcd2a 455 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
de1c301e
LP
456
457 if (x < 0 || y < 0)
458 return -EINVAL;
459
460 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
461 }
462
463 if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
464 !sd_id128_equal(b->peer, peer))
465 return -EPERM;
466
467 b->peer = peer;
468
469 b->can_fds =
470 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
471 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
472
e3017af9
LP
473 b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
474 memmove(b->rbuffer, f + 2, b->rbuffer_size);
89ffcd2a 475
de1c301e
LP
476 r = bus_start_running(b);
477 if (r < 0)
478 return r;
479
480 return 1;
481}
482
483static int bus_read_auth(sd_bus *b) {
484 struct msghdr mh;
485 struct iovec iov;
486 size_t n;
487 ssize_t k;
488 int r;
89ffcd2a 489 void *p;
de1c301e
LP
490
491 assert(b);
492
493 r = bus_auth_verify(b);
494 if (r != 0)
495 return r;
496
497 n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
89ffcd2a
LP
498 p = realloc(b->rbuffer, n);
499 if (!p)
500 return -ENOMEM;
501
502 b->rbuffer = p;
de1c301e
LP
503
504 zero(iov);
505 iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
506 iov.iov_len = n - b->rbuffer_size;
507
508 zero(mh);
509 mh.msg_iov = &iov;
510 mh.msg_iovlen = 1;
511
512 k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
513 if (k < 0)
514 return errno == EAGAIN ? 0 : -errno;
515
516 b->rbuffer_size += k;
517
518 r = bus_auth_verify(b);
519 if (r != 0)
520 return r;
521
e3017af9 522 return 1;
de1c301e
LP
523}
524
525static int bus_start_auth(sd_bus *b) {
89ffcd2a 526 static const char auth_prefix[] = "\0AUTH EXTERNAL ";
de1c301e
LP
527 static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
528
529 char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
530 size_t l;
531
532 assert(b);
533
534 b->state = BUS_AUTHENTICATING;
535
536 snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
537 char_array_0(text);
538
539 l = strlen(text);
540 b->auth_uid = hexmem(text, l);
541 if (!b->auth_uid)
542 return -ENOMEM;
543
544 b->auth_iovec[0].iov_base = (void*) auth_prefix;
545 b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
546 b->auth_iovec[1].iov_base = (void*) b->auth_uid;
547 b->auth_iovec[1].iov_len = l * 2;
548 b->auth_iovec[2].iov_base = (void*) auth_suffix;
549 b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
550 b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
551
552 return bus_write_auth(b);
553}
554
555static int bus_start_connect(sd_bus *b) {
556 int r;
557
558 assert(b);
559 assert(b->fd < 0);
560
561 for (;;) {
562 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
563 r = bus_parse_next_address(b);
564 if (r < 0)
565 return r;
566 if (r == 0)
e3017af9 567 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
de1c301e
LP
568 }
569
570 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
571 if (b->fd < 0) {
e3017af9 572 b->last_connect_error = errno;
de1c301e
LP
573 zero(b->sockaddr);
574 continue;
575 }
576
577 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
578 if (r < 0) {
579 if (errno == EINPROGRESS)
e3017af9 580 return 1;
de1c301e 581
e3017af9 582 b->last_connect_error = errno;
de1c301e
LP
583 close_nointr_nofail(b->fd);
584 b->fd = -1;
585 zero(b->sockaddr);
586 continue;
587 }
588
589 return bus_start_auth(b);
590 }
591}
592
593int sd_bus_open_system(sd_bus **ret) {
594 const char *e;
595 sd_bus *b;
596 int r;
597
598 if (!ret)
599 return -EINVAL;
600
601 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
602 if (e) {
603 r = sd_bus_open_address(e, &b);
604 if (r < 0)
605 return r;
89ffcd2a
LP
606 } else {
607 b = bus_new();
608 if (!b)
609 return -ENOMEM;
de1c301e 610
89ffcd2a
LP
611 b->sockaddr.un.sun_family = AF_UNIX;
612 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
613 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
de1c301e 614
89ffcd2a
LP
615 r = bus_start_connect(b);
616 if (r < 0) {
617 bus_free(b);
618 return r;
619 }
620 }
de1c301e 621
89ffcd2a 622 r = bus_send_hello(b);
de1c301e 623 if (r < 0) {
89ffcd2a 624 sd_bus_unref(b);
de1c301e
LP
625 return r;
626 }
627
628 *ret = b;
629 return 0;
630}
631
632int sd_bus_open_user(sd_bus **ret) {
633 const char *e;
634 sd_bus *b;
635 size_t l;
636 int r;
637
638 if (!ret)
639 return -EINVAL;
640
641 e = getenv("DBUS_SESSION_BUS_ADDRESS");
642 if (e) {
643 r = sd_bus_open_address(e, &b);
644 if (r < 0)
645 return r;
89ffcd2a
LP
646 } else {
647 e = getenv("XDG_RUNTIME_DIR");
648 if (!e)
649 return -ENOENT;
de1c301e 650
89ffcd2a
LP
651 l = strlen(e);
652 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
653 return -E2BIG;
de1c301e 654
89ffcd2a
LP
655 b = bus_new();
656 if (!b)
657 return -ENOMEM;
de1c301e 658
89ffcd2a
LP
659 b->sockaddr.un.sun_family = AF_UNIX;
660 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
661 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
de1c301e 662
89ffcd2a
LP
663 r = bus_start_connect(b);
664 if (r < 0) {
665 bus_free(b);
666 return r;
667 }
668 }
de1c301e 669
89ffcd2a 670 r = bus_send_hello(b);
de1c301e 671 if (r < 0) {
89ffcd2a 672 sd_bus_unref(b);
de1c301e
LP
673 return r;
674 }
675
676 *ret = b;
677 return 0;
678}
679
680int sd_bus_open_address(const char *address, sd_bus **ret) {
681 sd_bus *b;
682 int r;
683
684 if (!address)
685 return -EINVAL;
686 if (!ret)
687 return -EINVAL;
688
689 b = bus_new();
690 if (!b)
691 return -ENOMEM;
692
693 b->address = strdup(address);
694 if (!b->address) {
695 bus_free(b);
696 return -ENOMEM;
697 }
698
699 r = bus_start_connect(b);
700 if (r < 0) {
701 bus_free(b);
702 return r;
703 }
704
705 *ret = b;
706 return 0;
707}
708
709int sd_bus_open_fd(int fd, sd_bus **ret) {
710 sd_bus *b;
711 int r;
712
713 if (fd < 0)
714 return -EINVAL;
715 if (!ret)
716 return -EINVAL;
717
718 b = bus_new();
719 if (!b)
720 return -ENOMEM;
721
722 b->fd = fd;
723 fd_nonblock(b->fd, true);
724 fd_cloexec(b->fd, true);
725
726 r = bus_start_auth(b);
727 if (r < 0) {
728 bus_free(b);
729 return r;
730 }
731
732 *ret = b;
733 return 0;
734}
735
736void sd_bus_close(sd_bus *bus) {
737 if (!bus)
738 return;
739 if (bus->fd < 0)
740 return;
741
742 close_nointr_nofail(bus->fd);
743 bus->fd = -1;
744}
745
746sd_bus *sd_bus_ref(sd_bus *bus) {
747 if (!bus)
748 return NULL;
749
750 assert(bus->n_ref > 0);
751
752 bus->n_ref++;
753 return bus;
754}
755
756sd_bus *sd_bus_unref(sd_bus *bus) {
757 if (!bus)
758 return NULL;
759
760 assert(bus->n_ref > 0);
761 bus->n_ref--;
762
763 if (bus->n_ref <= 0)
764 bus_free(bus);
765
766 return NULL;
767}
768
e3017af9
LP
769int sd_bus_is_open(sd_bus *bus) {
770 if (!bus)
771 return -EINVAL;
772
773 return bus->fd >= 0;
774}
775
de1c301e
LP
776int sd_bus_is_running(sd_bus *bus) {
777 if (!bus)
778 return -EINVAL;
779
780 if (bus->fd < 0)
781 return -ENOTCONN;
782
783 return bus->state == BUS_RUNNING;
784}
785
786int sd_bus_can_send(sd_bus *bus, char type) {
787
788 if (!bus)
789 return -EINVAL;
89ffcd2a
LP
790 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
791 return -EAGAIN;
de1c301e
LP
792
793 if (type == SD_BUS_TYPE_UNIX_FD)
794 return bus->can_fds;
795
796 return bus_type_is_valid(type);
797}
798
799static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
800 assert(m);
801
89ffcd2a
LP
802 if (m->header->version > b->message_version)
803 return -EPERM;
804
de1c301e
LP
805 if (m->sealed)
806 return 0;
807
9a17484d 808 return bus_message_seal(m, ++b->serial);
de1c301e
LP
809}
810
811static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
812 struct msghdr mh;
813 struct iovec *iov;
814 ssize_t k;
815 size_t n;
816 unsigned j;
817
818 assert(bus);
819 assert(m);
820 assert(idx);
89ffcd2a 821 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
de1c301e 822
e3017af9
LP
823 if (*idx >= m->size)
824 return 0;
825
de1c301e
LP
826 n = m->n_iovec * sizeof(struct iovec);
827 iov = alloca(n);
828 memcpy(iov, m->iovec, n);
829
830 j = 0;
831 iovec_advance(iov, &j, *idx);
832
833 zero(mh);
834 mh.msg_iov = iov;
835 mh.msg_iovlen = m->n_iovec;
836
837 k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
838 if (k < 0)
e3017af9 839 return errno == EAGAIN ? 0 : -errno;
de1c301e
LP
840
841 *idx += (size_t) k;
e3017af9 842 return 1;
de1c301e
LP
843}
844
845static int message_read_need(sd_bus *bus, size_t *need) {
846 uint32_t a, b;
847 uint8_t e;
848
849 assert(bus);
850 assert(need);
89ffcd2a 851 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
de1c301e 852
89ffcd2a 853 if (bus->rbuffer_size < sizeof(struct bus_header)) {
e3017af9
LP
854 *need = sizeof(struct bus_header) + 8;
855
856 /* Minimum message size:
857 *
858 * Header +
859 *
860 * Method Call: +2 string headers
861 * Signal: +3 string headers
862 * Method Error: +1 string headers
863 * +1 uint32 headers
864 * Method Reply: +1 uint32 headers
865 *
866 * A string header is at least 9 bytes
867 * A uint32 header is at least 8 bytes
868 *
869 * Hence the minimum message size of a valid message
870 * is header + 8 bytes */
871
de1c301e
LP
872 return 0;
873 }
874
875 a = ((const uint32_t*) bus->rbuffer)[1];
876 b = ((const uint32_t*) bus->rbuffer)[3];
877
878 e = ((const uint8_t*) bus->rbuffer)[0];
879 if (e == SD_BUS_LITTLE_ENDIAN) {
880 a = le32toh(a);
881 b = le32toh(b);
882 } else if (e == SD_BUS_BIG_ENDIAN) {
883 a = be32toh(a);
884 b = be32toh(b);
885 } else
89ffcd2a 886 return -EBADMSG;
de1c301e 887
89ffcd2a 888 *need = sizeof(struct bus_header) + ALIGN_TO(b, 8) + a;
de1c301e
LP
889 return 0;
890}
891
892static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
893 sd_bus_message *t;
894 void *b = NULL;
895 int r;
896
897 assert(bus);
898 assert(m);
899 assert(bus->rbuffer_size >= size);
89ffcd2a 900 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
de1c301e 901
de1c301e
LP
902 if (bus->rbuffer_size > size) {
903 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
904 if (!b) {
905 free(t);
906 return -ENOMEM;
907 }
908 }
909
80a46c73
LP
910 r = bus_message_from_malloc(bus->rbuffer, size, &t);
911 if (r < 0) {
912 free(b);
913 return r;
914 }
de1c301e
LP
915
916 bus->rbuffer = b;
917 bus->rbuffer_size -= size;
918
de1c301e
LP
919 *m = t;
920 return 1;
921}
922
923static int message_read(sd_bus *bus, sd_bus_message **m) {
924 struct msghdr mh;
925 struct iovec iov;
926 ssize_t k;
927 size_t need;
928 int r;
929 void *b;
930
931 assert(bus);
932 assert(m);
89ffcd2a 933 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
de1c301e
LP
934
935 r = message_read_need(bus, &need);
936 if (r < 0)
937 return r;
938
939 if (bus->rbuffer_size >= need)
940 return message_make(bus, need, m);
941
942 b = realloc(bus->rbuffer, need);
943 if (!b)
944 return -ENOMEM;
945
89ffcd2a
LP
946 bus->rbuffer = b;
947
de1c301e
LP
948 zero(iov);
949 iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
950 iov.iov_len = need - bus->rbuffer_size;
951
952 zero(mh);
953 mh.msg_iov = &iov;
954 mh.msg_iovlen = 1;
955
956 k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
957 if (k < 0)
958 return errno == EAGAIN ? 0 : -errno;
959
960 bus->rbuffer_size += k;
961
962 r = message_read_need(bus, &need);
963 if (r < 0)
964 return r;
965
966 if (bus->rbuffer_size >= need)
967 return message_make(bus, need, m);
968
e3017af9 969 return 1;
de1c301e
LP
970}
971
972static int dispatch_wqueue(sd_bus *bus) {
e3017af9 973 int r, ret = 0;
de1c301e
LP
974
975 assert(bus);
89ffcd2a 976 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
de1c301e
LP
977
978 if (bus->fd < 0)
979 return -ENOTCONN;
980
981 while (bus->wqueue_size > 0) {
982
983 r = message_write(bus, bus->wqueue[0], &bus->windex);
984 if (r < 0) {
985 sd_bus_close(bus);
986 return r;
987 } else if (r == 0)
e3017af9
LP
988 /* Didn't do anything this time */
989 return ret;
990 else if (bus->windex >= bus->wqueue[0]->size) {
de1c301e
LP
991 /* Fully written. Let's drop the entry from
992 * the queue.
993 *
994 * This isn't particularly optimized, but
995 * well, this is supposed to be our worst-case
996 * buffer only, and the socket buffer is
997 * supposed to be our primary buffer, and if
998 * it got full, then all bets are off
999 * anyway. */
1000
1001 sd_bus_message_unref(bus->wqueue[0]);
1002 bus->wqueue_size --;
1003 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1004 bus->windex = 0;
1005
e3017af9 1006 ret = 1;
de1c301e
LP
1007 }
1008 }
1009
e3017af9 1010 return ret;
de1c301e
LP
1011}
1012
1013static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
e3017af9
LP
1014 sd_bus_message *z;
1015 int r, ret = 0;
de1c301e
LP
1016
1017 assert(bus);
1018 assert(m);
89ffcd2a 1019 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
de1c301e
LP
1020
1021 if (bus->fd < 0)
1022 return -ENOTCONN;
1023
1024 if (bus->rqueue_size > 0) {
1025 /* Dispatch a queued message */
1026
1027 *m = bus->rqueue[0];
1028 bus->rqueue_size --;
1029 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1030 return 1;
1031 }
1032
1033 /* Try to read a new message */
e3017af9
LP
1034 do {
1035 r = message_read(bus, &z);
1036 if (r < 0) {
1037 sd_bus_close(bus);
1038 return r;
1039 }
1040 if (r == 0)
1041 return ret;
de1c301e 1042
e3017af9
LP
1043 r = 1;
1044 } while (!z);
1045
1046 *m = z;
1047 return 1;
de1c301e
LP
1048}
1049
1050int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1051 int r;
1052
1053 if (!bus)
1054 return -EINVAL;
1055 if (bus->fd < 0)
1056 return -ENOTCONN;
1057 if (!m)
1058 return -EINVAL;
de1c301e
LP
1059
1060 r = bus_seal_message(bus, m);
1061 if (r < 0)
1062 return r;
1063
5407f2de
LP
1064 /* If this is a reply and no reply was requested, then let's
1065 * suppress this, if we can */
1066 if (m->dont_send && !serial)
1067 return 0;
1068
89ffcd2a 1069 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
de1c301e
LP
1070 size_t idx = 0;
1071
1072 r = message_write(bus, m, &idx);
1073 if (r < 0) {
1074 sd_bus_close(bus);
1075 return r;
e3017af9 1076 } else if (idx < m->size) {
de1c301e
LP
1077 /* Wasn't fully written. So let's remember how
1078 * much was written. Note that the first entry
1079 * of the wqueue array is always allocated so
1080 * that we always can remember how much was
1081 * written. */
1082 bus->wqueue[0] = sd_bus_message_ref(m);
1083 bus->wqueue_size = 1;
1084 bus->windex = idx;
1085 }
1086 } else {
1087 sd_bus_message **q;
1088
1089 /* Just append it to the queue. */
1090
1091 if (bus->wqueue_size >= WQUEUE_MAX)
1092 return -ENOBUFS;
1093
1094 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1095 if (!q)
1096 return -ENOMEM;
1097
1098 bus->wqueue = q;
1099 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1100 }
1101
1102 if (serial)
1103 *serial = BUS_MESSAGE_SERIAL(m);
1104
1105 return 0;
1106}
1107
1108static usec_t calc_elapse(uint64_t usec) {
1109 if (usec == (uint64_t) -1)
1110 return 0;
1111
1112 if (usec == 0)
e3017af9 1113 usec = BUS_DEFAULT_TIMEOUT;
de1c301e
LP
1114
1115 return now(CLOCK_MONOTONIC) + usec;
1116}
1117
e3017af9
LP
1118static int timeout_compare(const void *a, const void *b) {
1119 const struct reply_callback *x = a, *y = b;
1120
1121 if (x->timeout != 0 && y->timeout == 0)
1122 return -1;
1123
1124 if (x->timeout == 0 && y->timeout != 0)
1125 return 1;
1126
1127 if (x->timeout < y->timeout)
1128 return -1;
1129
1130 if (x->timeout > y->timeout)
1131 return 1;
1132
1133 return 0;
1134}
1135
de1c301e
LP
1136int sd_bus_send_with_reply(
1137 sd_bus *bus,
1138 sd_bus_message *m,
1139 sd_message_handler_t callback,
1140 void *userdata,
1141 uint64_t usec,
1142 uint64_t *serial) {
1143
1144 struct reply_callback *c;
1145 int r;
1146
1147 if (!bus)
1148 return -EINVAL;
89ffcd2a 1149 if (bus->fd < 0)
de1c301e
LP
1150 return -ENOTCONN;
1151 if (!m)
1152 return -EINVAL;
1153 if (!callback)
1154 return -EINVAL;
89ffcd2a 1155 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
de1c301e 1156 return -EINVAL;
89ffcd2a
LP
1157 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1158 return -EINVAL;
1159
1160 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1161 if (r < 0)
1162 return r;
de1c301e 1163
e3017af9
LP
1164 if (usec != (uint64_t) -1) {
1165 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1166 if (r < 0)
1167 return r;
1168 }
1169
de1c301e
LP
1170 r = bus_seal_message(bus, m);
1171 if (r < 0)
1172 return r;
1173
1174 c = new(struct reply_callback, 1);
1175 if (!c)
1176 return -ENOMEM;
1177
1178 c->callback = callback;
1179 c->userdata = userdata;
1180 c->serial = BUS_MESSAGE_SERIAL(m);
1181 c->timeout = calc_elapse(usec);
1182
1183 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1184 if (r < 0) {
1185 free(c);
1186 return r;
1187 }
1188
e3017af9
LP
1189 if (c->timeout != 0) {
1190 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1191 if (r < 0) {
1192 c->timeout = 0;
1193 sd_bus_send_with_reply_cancel(bus, c->serial);
1194 return r;
1195 }
1196 }
1197
de1c301e
LP
1198 r = sd_bus_send(bus, m, serial);
1199 if (r < 0) {
e3017af9 1200 sd_bus_send_with_reply_cancel(bus, c->serial);
de1c301e
LP
1201 return r;
1202 }
1203
1204 return r;
1205}
1206
1207int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
e3017af9 1208 struct reply_callback *c;
de1c301e
LP
1209
1210 if (!bus)
1211 return -EINVAL;
1212 if (serial == 0)
1213 return -EINVAL;
1214
1215 c = hashmap_remove(bus->reply_callbacks, &serial);
1216 if (!c)
1217 return 0;
1218
e3017af9
LP
1219 if (c->timeout != 0)
1220 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1221
de1c301e
LP
1222 free(c);
1223 return 1;
1224}
1225
89ffcd2a
LP
1226static int ensure_running(sd_bus *bus) {
1227 int r;
1228
1229 assert(bus);
1230
1231 r = sd_bus_is_running(bus);
1232 if (r != 0)
1233 return r;
1234
1235 for (;;) {
e3017af9
LP
1236 int k;
1237
89ffcd2a 1238 r = sd_bus_process(bus, NULL);
e3017af9 1239
89ffcd2a
LP
1240 if (r < 0)
1241 return r;
1242
e3017af9
LP
1243 k = sd_bus_is_running(bus);
1244 if (k != 0)
1245 return k;
1246
1247 if (r > 0)
1248 continue;
89ffcd2a
LP
1249
1250 r = sd_bus_wait(bus, (uint64_t) -1);
1251 if (r < 0)
1252 return r;
1253 }
1254}
1255
de1c301e
LP
1256int sd_bus_send_with_reply_and_block(
1257 sd_bus *bus,
1258 sd_bus_message *m,
1259 uint64_t usec,
1260 sd_bus_error *error,
1261 sd_bus_message **reply) {
1262
1263 int r;
1264 usec_t timeout;
1265 uint64_t serial;
1266 bool room = false;
1267
1268 if (!bus)
1269 return -EINVAL;
89ffcd2a 1270 if (bus->fd < 0)
de1c301e
LP
1271 return -ENOTCONN;
1272 if (!m)
1273 return -EINVAL;
89ffcd2a 1274 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
de1c301e 1275 return -EINVAL;
89ffcd2a 1276 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
de1c301e 1277 return -EINVAL;
89ffcd2a
LP
1278 if (bus_error_is_dirty(error))
1279 return -EINVAL;
1280
1281 r = ensure_running(bus);
1282 if (r < 0)
1283 return r;
de1c301e
LP
1284
1285 r = sd_bus_send(bus, m, &serial);
1286 if (r < 0)
1287 return r;
1288
1289 timeout = calc_elapse(usec);
1290
1291 for (;;) {
1292 usec_t left;
e3017af9 1293 sd_bus_message *incoming = NULL;
de1c301e
LP
1294
1295 if (!room) {
1296 sd_bus_message **q;
1297
1298 /* Make sure there's room for queuing this
1299 * locally, before we read the message */
1300
1301 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1302 if (!q)
1303 return -ENOMEM;
1304
1305 bus->rqueue = q;
1306 room = true;
1307 }
1308
1309 r = message_read(bus, &incoming);
1310 if (r < 0)
1311 return r;
e3017af9 1312 if (incoming) {
89ffcd2a 1313
de1c301e
LP
1314 if (incoming->reply_serial == serial) {
1315 /* Found a match! */
1316
1317 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1318 *reply = incoming;
1319 return 0;
1320 }
1321
1322 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1323 int k;
1324
1325 r = sd_bus_error_copy(error, &incoming->error);
1326 if (r < 0) {
1327 sd_bus_message_unref(incoming);
1328 return r;
1329 }
1330
1331 k = bus_error_to_errno(&incoming->error);
1332 sd_bus_message_unref(incoming);
1333 return k;
1334 }
1335
1336 sd_bus_message_unref(incoming);
1337 return -EIO;
1338 }
1339
1340 /* There's already guaranteed to be room for
1341 * this, so need to resize things here */
1342 bus->rqueue[bus->rqueue_size ++] = incoming;
1343 room = false;
1344
1345 /* Try to read more, right-away */
1346 continue;
1347 }
e3017af9
LP
1348 if (r != 0)
1349 continue;
de1c301e
LP
1350
1351 if (timeout > 0) {
1352 usec_t n;
1353
1354 n = now(CLOCK_MONOTONIC);
1355 if (n >= timeout)
1356 return -ETIMEDOUT;
1357
1358 left = timeout - n;
1359 } else
1360 left = (uint64_t) -1;
1361
e3017af9 1362 r = bus_poll(bus, true, left);
de1c301e
LP
1363 if (r < 0)
1364 return r;
1365
1366 r = dispatch_wqueue(bus);
1367 if (r < 0)
1368 return r;
1369 }
1370}
1371
1372int sd_bus_get_fd(sd_bus *bus) {
1373 if (!bus)
1374 return -EINVAL;
1375
1376 if (bus->fd < 0)
89ffcd2a 1377 return -ENOTCONN;
de1c301e
LP
1378
1379 return bus->fd;
1380}
1381
1382int sd_bus_get_events(sd_bus *bus) {
1383 int flags = 0;
1384
1385 if (!bus)
1386 return -EINVAL;
de1c301e 1387 if (bus->fd < 0)
89ffcd2a 1388 return -ENOTCONN;
de1c301e
LP
1389
1390 if (bus->state == BUS_OPENING)
1391 flags |= POLLOUT;
89ffcd2a
LP
1392 else if (bus->state == BUS_AUTHENTICATING) {
1393
1394 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1395 flags |= POLLOUT;
1396
1397 flags |= POLLIN;
1398
1399 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
de1c301e
LP
1400 if (bus->rqueue_size <= 0)
1401 flags |= POLLIN;
1402 if (bus->wqueue_size > 0)
1403 flags |= POLLOUT;
1404 }
1405
1406 return flags;
1407}
1408
e3017af9
LP
1409int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1410 struct reply_callback *c;
1411
1412 if (!bus)
1413 return -EINVAL;
1414 if (!timeout_usec)
1415 return -EINVAL;
1416 if (bus->fd < 0)
1417 return -ENOTCONN;
1418
1419 if (bus->state == BUS_AUTHENTICATING) {
1420 *timeout_usec = bus->auth_timeout;
1421 return 1;
1422 }
1423
1424 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1425 return 0;
1426
1427 c = prioq_peek(bus->reply_callbacks_prioq);
1428 if (!c)
1429 return 0;
1430
1431 *timeout_usec = c->timeout;
1432 return 1;
1433}
1434
1435static int process_timeout(sd_bus *bus) {
1436 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1437 struct reply_callback *c;
1438 usec_t n;
1439 int r;
1440
1441 assert(bus);
1442
1443 c = prioq_peek(bus->reply_callbacks_prioq);
1444 if (!c)
1445 return 0;
1446
1447 n = now(CLOCK_MONOTONIC);
1448 if (c->timeout > n)
1449 return 0;
1450
1451 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1452 hashmap_remove(bus->reply_callbacks, &c->serial);
1453
1454 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1455 free(c);
1456
1457 return r < 0 ? r : 1;
1458}
1459
1460static int process_message(sd_bus *bus, sd_bus_message *m) {
1461 struct filter_callback *l;
1462 int r;
1463
1464 assert(bus);
1465 assert(m);
1466
1467 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1468 struct reply_callback *c;
1469
1470 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1471 if (c) {
1472 if (c->timeout != 0)
1473 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1474
1475 r = c->callback(bus, 0, m, c->userdata);
1476 free(c);
1477
1478 if (r != 0)
1479 return r;
1480 }
1481 }
1482
1483 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1484 r = l->callback(bus, 0, m, l->userdata);
1485 if (r != 0)
1486 return r;
1487 }
1488
1489 return 0;
1490}
1491
de1c301e 1492int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
de1c301e
LP
1493 int r;
1494
e3017af9
LP
1495 /* Returns 0 when we didn't do anything. This should cause the
1496 * caller to invoke sd_bus_wait() before returning the next
1497 * time. Returns > 0 when we did something, which possibly
1498 * means *ret is filled in with an unprocessed message. */
1499
de1c301e
LP
1500 if (!bus)
1501 return -EINVAL;
1502 if (bus->fd < 0)
1503 return -ENOTCONN;
1504
1505 if (bus->state == BUS_OPENING) {
1506 struct pollfd p;
1507
1508 zero(p);
1509 p.fd = bus->fd;
1510 p.events = POLLOUT;
1511
1512 r = poll(&p, 1, 0);
1513 if (r < 0)
1514 return -errno;
1515
1516 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
89ffcd2a 1517 int error = 0;
de1c301e
LP
1518 socklen_t slen = sizeof(error);
1519
1520 r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1521 if (r < 0)
e3017af9
LP
1522 bus->last_connect_error = errno;
1523 else if (error != 0)
1524 bus->last_connect_error = error;
de1c301e 1525 else if (p.revents & (POLLERR|POLLHUP))
e3017af9
LP
1526 bus->last_connect_error = ECONNREFUSED;
1527 else {
1528 r = bus_start_auth(bus);
1529 goto null_message;
1530 }
de1c301e
LP
1531
1532 /* Try next address */
e3017af9
LP
1533 r = bus_start_connect(bus);
1534 goto null_message;
de1c301e
LP
1535 }
1536
e3017af9
LP
1537 r = 0;
1538 goto null_message;
de1c301e
LP
1539
1540 } else if (bus->state == BUS_AUTHENTICATING) {
1541
e3017af9
LP
1542 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1543 return -ETIMEDOUT;
1544
de1c301e 1545 r = bus_write_auth(bus);
e3017af9
LP
1546 if (r != 0)
1547 goto null_message;
de1c301e
LP
1548
1549 r = bus_read_auth(bus);
e3017af9 1550 goto null_message;
de1c301e
LP
1551
1552 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
89ffcd2a 1553 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
e3017af9
LP
1554 int k;
1555
1556 r = process_timeout(bus);
1557 if (r != 0)
1558 goto null_message;
de1c301e
LP
1559
1560 r = dispatch_wqueue(bus);
e3017af9
LP
1561 if (r != 0)
1562 goto null_message;
de1c301e 1563
e3017af9 1564 k = r;
de1c301e 1565 r = dispatch_rqueue(bus, &m);
e3017af9 1566 if (r < 0)
de1c301e 1567 return r;
e3017af9
LP
1568 if (!m) {
1569 if (r == 0)
1570 r = k;
1571 goto null_message;
de1c301e
LP
1572 }
1573
e3017af9
LP
1574 r = process_message(bus, m);
1575 if (r != 0)
1576 goto null_message;
de1c301e
LP
1577
1578 if (ret) {
1579 *ret = m;
89ffcd2a 1580 m = NULL;
de1c301e
LP
1581 return 1;
1582 }
1583
89ffcd2a
LP
1584 if (sd_bus_message_is_method_call(m, NULL, NULL)) {
1585 const sd_bus_error e = SD_BUS_ERROR_INIT_CONST("org.freedesktop.DBus.Error.UnknownObject", "Unknown object.");
1586 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1587
1588 r = sd_bus_message_new_method_error(bus, m, &e, &reply);
1589 if (r < 0)
1590 return r;
1591
1592 r = sd_bus_send(bus, reply, NULL);
1593 if (r < 0)
1594 return r;
1595 }
1596
e3017af9 1597 return 1;
de1c301e
LP
1598 }
1599
89ffcd2a 1600 assert_not_reached("Unknown state");
e3017af9
LP
1601
1602null_message:
1603 if (r >= 0 && ret)
1604 *ret = NULL;
1605
1606 return r;
de1c301e
LP
1607}
1608
e3017af9 1609static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
de1c301e
LP
1610 struct pollfd p;
1611 int r, e;
1612 struct timespec ts;
e3017af9
LP
1613 usec_t until, m;
1614
1615 assert(bus);
de1c301e 1616
de1c301e 1617 if (bus->fd < 0)
89ffcd2a
LP
1618 return -ENOTCONN;
1619
de1c301e
LP
1620 e = sd_bus_get_events(bus);
1621 if (e < 0)
1622 return e;
1623
e3017af9
LP
1624 if (need_more)
1625 e |= POLLIN;
1626
1627 r = sd_bus_get_timeout(bus, &until);
1628 if (r < 0)
1629 return r;
1630 if (r == 0)
1631 m = (uint64_t) -1;
1632 else {
1633 usec_t n;
1634 n = now(CLOCK_MONOTONIC);
1635 m = until > n ? until - n : 0;
1636 }
1637
1638 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1639 m = timeout_usec;
1640
de1c301e
LP
1641 zero(p);
1642 p.fd = bus->fd;
1643 p.events = e;
1644
e3017af9 1645 r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
de1c301e 1646 if (r < 0)
89ffcd2a 1647 return -errno;
de1c301e 1648
e3017af9
LP
1649 return r > 0 ? 1 : 0;
1650}
1651
1652int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1653
1654 if (!bus)
1655 return -EINVAL;
1656 if (bus->fd < 0)
1657 return -ENOTCONN;
1658 if (bus->rqueue_size > 0)
1659 return 0;
1660
1661 return bus_poll(bus, false, timeout_usec);
de1c301e
LP
1662}
1663
1664int sd_bus_flush(sd_bus *bus) {
1665 int r;
1666
1667 if (!bus)
1668 return -EINVAL;
1669 if (bus->fd < 0)
1670 return -ENOTCONN;
1671
89ffcd2a
LP
1672 r = ensure_running(bus);
1673 if (r < 0)
1674 return r;
1675
1676 if (bus->wqueue_size <= 0)
de1c301e
LP
1677 return 0;
1678
1679 for (;;) {
1680 r = dispatch_wqueue(bus);
1681 if (r < 0)
1682 return r;
1683
89ffcd2a 1684 if (bus->wqueue_size <= 0)
de1c301e
LP
1685 return 0;
1686
e3017af9 1687 r = bus_poll(bus, false, (uint64_t) -1);
de1c301e
LP
1688 if (r < 0)
1689 return r;
1690 }
1691}
1692
1693int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1694 struct filter_callback *f;
1695
1696 if (!bus)
1697 return -EINVAL;
1698 if (!callback)
1699 return -EINVAL;
1700
1701 f = new(struct filter_callback, 1);
1702 if (!f)
1703 return -ENOMEM;
1704 f->callback = callback;
1705 f->userdata = userdata;
1706
1707 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1708 return 0;
1709}
1710
1711int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1712 struct filter_callback *f;
1713
1714 if (!bus)
1715 return -EINVAL;
1716 if (!callback)
1717 return -EINVAL;
1718
1719 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1720 if (f->callback == callback && f->userdata == userdata) {
1721 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1722 free(f);
1723 return 1;
1724 }
1725 }
1726
1727 return 0;
1728}