]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/libsystemd-bus/sd-bus.c
bus: implicitly set no_reply flag on outgoing messages if the serial number is not...
[thirdparty/systemd.git] / src / libsystemd-bus / sd-bus.c
CommitLineData
de1c301e
LP
1/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3/***
4 This file is part of systemd.
5
6 Copyright 2013 Lennart Poettering
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20***/
21
22#include <endian.h>
23#include <assert.h>
24#include <stdlib.h>
25#include <unistd.h>
26#include <netdb.h>
27#include <sys/poll.h>
28#include <byteswap.h>
29
30#include "util.h"
31#include "macro.h"
32
33#include "sd-bus.h"
34#include "bus-internal.h"
35#include "bus-message.h"
36#include "bus-type.h"
37
38#define WQUEUE_MAX 128
39
e3017af9
LP
40static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
41
de1c301e
LP
42static void bus_free(sd_bus *b) {
43 struct filter_callback *f;
89ffcd2a 44 unsigned i;
de1c301e
LP
45
46 assert(b);
47
48 if (b->fd >= 0)
49 close_nointr_nofail(b->fd);
50
51 free(b->rbuffer);
89ffcd2a
LP
52 free(b->unique_name);
53 free(b->auth_uid);
54 free(b->address);
55
56 for (i = 0; i < b->rqueue_size; i++)
57 sd_bus_message_unref(b->rqueue[i]);
de1c301e 58 free(b->rqueue);
89ffcd2a
LP
59
60 for (i = 0; i < b->wqueue_size; i++)
61 sd_bus_message_unref(b->wqueue[i]);
de1c301e 62 free(b->wqueue);
de1c301e
LP
63
64 hashmap_free_free(b->reply_callbacks);
e3017af9 65 prioq_free(b->reply_callbacks_prioq);
de1c301e
LP
66
67 while ((f = b->filter_callbacks)) {
68 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
69 free(f);
70 }
71
72 free(b);
73}
74
75static sd_bus* bus_new(void) {
76 sd_bus *r;
77
78 r = new0(sd_bus, 1);
79 if (!r)
80 return NULL;
81
82 r->n_ref = 1;
83 r->fd = -1;
84 r->message_version = 1;
85
86 /* We guarantee that wqueue always has space for at least one
87 * entry */
88 r->wqueue = new(sd_bus_message*, 1);
89 if (!r->wqueue) {
90 free(r);
91 return NULL;
92 }
93
94 return r;
95};
96
e3017af9 97static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
de1c301e
LP
98 const char *s;
99 int r;
100
101 assert(bus);
e3017af9
LP
102
103 if (error != 0)
104 return -error;
105
de1c301e
LP
106 assert(reply);
107
108 bus->state = BUS_RUNNING;
109
110 r = sd_bus_message_read(reply, "s", &s);
111 if (r < 0)
112 return r;
113
114 bus->unique_name = strdup(s);
115 if (!bus->unique_name)
116 return -ENOMEM;
117
118 return 1;
119}
120
121static int bus_send_hello(sd_bus *bus) {
122 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
123 int r;
124
125 assert(bus);
126
127 r = sd_bus_message_new_method_call(
128 bus,
129 "org.freedesktop.DBus",
130 "/",
131 "org.freedesktop.DBus",
132 "Hello",
133 &m);
134 if (r < 0)
135 return r;
136
e3017af9 137 r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
de1c301e
LP
138 if (r < 0)
139 return r;
140
89ffcd2a
LP
141 bus->sent_hello = true;
142 return r;
de1c301e
LP
143}
144
145static int bus_start_running(sd_bus *bus) {
de1c301e
LP
146 assert(bus);
147
89ffcd2a 148 if (bus->sent_hello) {
de1c301e 149 bus->state = BUS_HELLO;
e3017af9 150 return 1;
de1c301e
LP
151 }
152
153 bus->state = BUS_RUNNING;
e3017af9 154 return 1;
de1c301e
LP
155}
156
157static int parse_address_key(const char **p, const char *key, char **value) {
158 size_t l, n = 0;
159 const char *a;
160 char *r = NULL;
161
162 assert(p);
163 assert(*p);
164 assert(key);
165 assert(value);
166
167 l = strlen(key);
89ffcd2a 168 if (strncmp(*p, key, l) != 0)
de1c301e
LP
169 return 0;
170
171 if ((*p)[l] != '=')
172 return 0;
173
174 if (*value)
175 return -EINVAL;
176
177 a = *p + l + 1;
89ffcd2a 178 while (*a != ',' && *a != 0) {
de1c301e
LP
179 char c, *t;
180
181 if (*a == '%') {
182 int x, y;
183
184 x = unhexchar(a[1]);
185 if (x < 0) {
186 free(r);
187 return x;
188 }
189
190 y = unhexchar(a[2]);
191 if (y < 0) {
192 free(r);
193 return y;
194 }
195
de1c301e 196 c = (char) ((x << 4) | y);
89ffcd2a
LP
197 a += 3;
198 } else {
de1c301e 199 c = *a;
89ffcd2a
LP
200 a++;
201 }
de1c301e 202
89ffcd2a 203 t = realloc(r, n + 2);
de1c301e
LP
204 if (!t) {
205 free(r);
206 return -ENOMEM;
207 }
208
209 r = t;
210 r[n++] = c;
211 }
212
89ffcd2a
LP
213 if (!r) {
214 r = strdup("");
215 if (!r)
216 return -ENOMEM;
217 } else
218 r[n] = 0;
219
220 if (*a == ',')
221 a++;
222
de1c301e
LP
223 *p = a;
224 *value = r;
225 return 1;
226}
227
228static void skip_address_key(const char **p) {
229 assert(p);
230 assert(*p);
231
89ffcd2a
LP
232 *p += strcspn(*p, ",");
233
234 if (**p == ',')
235 (*p) ++;
de1c301e
LP
236}
237
238static int bus_parse_next_address(sd_bus *b) {
239 const char *a, *p;
240 _cleanup_free_ char *guid = NULL;
241 int r;
242
243 assert(b);
244
245 if (!b->address)
246 return 0;
247 if (b->address[b->address_index] == 0)
248 return 0;
249
250 a = b->address + b->address_index;
251
252 zero(b->sockaddr);
253 b->sockaddr_size = 0;
254 b->peer = SD_ID128_NULL;
255
256 if (startswith(a, "unix:")) {
257 _cleanup_free_ char *path = NULL, *abstract = NULL;
258
259 p = a + 5;
89ffcd2a 260 while (*p != 0) {
de1c301e
LP
261 r = parse_address_key(&p, "guid", &guid);
262 if (r < 0)
263 return r;
264 else if (r > 0)
265 continue;
266
267 r = parse_address_key(&p, "path", &path);
268 if (r < 0)
269 return r;
270 else if (r > 0)
271 continue;
272
273 r = parse_address_key(&p, "abstract", &abstract);
274 if (r < 0)
275 return r;
276 else if (r > 0)
277 continue;
278
279 skip_address_key(&p);
280 }
281
282 if (!path && !abstract)
283 return -EINVAL;
284
285 if (path && abstract)
286 return -EINVAL;
287
288 if (path) {
289 size_t l;
290
291 l = strlen(path);
292 if (l > sizeof(b->sockaddr.un.sun_path))
293 return -E2BIG;
294
295 b->sockaddr.un.sun_family = AF_UNIX;
296 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
297 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
298 } else if (abstract) {
299 size_t l;
300
89ffcd2a 301 l = strlen(abstract);
de1c301e
LP
302 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
303 return -E2BIG;
304
305 b->sockaddr.un.sun_family = AF_UNIX;
306 b->sockaddr.un.sun_path[0] = 0;
89ffcd2a 307 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
de1c301e
LP
308 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
309 }
310
311 } else if (startswith(a, "tcp:")) {
312 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
313 struct addrinfo hints, *result;
314
315 p = a + 4;
89ffcd2a 316 while (*p != 0) {
de1c301e
LP
317 r = parse_address_key(&p, "guid", &guid);
318 if (r < 0)
319 return r;
320 else if (r > 0)
321 continue;
322
323 r = parse_address_key(&p, "host", &host);
324 if (r < 0)
325 return r;
326 else if (r > 0)
327 continue;
328
329 r = parse_address_key(&p, "port", &port);
330 if (r < 0)
331 return r;
332 else if (r > 0)
333 continue;
334
335 r = parse_address_key(&p, "family", &family);
336 if (r < 0)
337 return r;
338 else if (r > 0)
339 continue;
340
341 skip_address_key(&p);
342 }
343
344 if (!host || !port)
345 return -EINVAL;
346
347 zero(hints);
348 hints.ai_socktype = SOCK_STREAM;
349 hints.ai_flags = AI_ADDRCONFIG;
350
351 if (family) {
352 if (streq(family, "ipv4"))
353 hints.ai_family = AF_INET;
354 else if (streq(family, "ipv6"))
355 hints.ai_family = AF_INET6;
356 else
357 return -EINVAL;
358 }
359
360 r = getaddrinfo(host, port, &hints, &result);
361 if (r == EAI_SYSTEM)
362 return -errno;
363 else if (r != 0)
364 return -EADDRNOTAVAIL;
365
366 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
367 b->sockaddr_size = result->ai_addrlen;
368
369 freeaddrinfo(result);
370 }
371
372 if (guid) {
373 r = sd_id128_from_string(guid, &b->peer);
374 if (r < 0)
375 return r;
376 }
377
378 b->address_index = p - b->address;
379 return 1;
380}
381
382static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
383
384 while (size > 0) {
385 struct iovec *i = iov + *idx;
386
387 if (i->iov_len > size) {
388 i->iov_base = (uint8_t*) i->iov_base + size;
389 i->iov_len -= size;
390 return;
391 }
392
393 size -= i->iov_len;
394
395 i->iov_base = NULL;
396 i->iov_len = 0;
397
398 (*idx) ++;
399 }
400}
401
402static int bus_write_auth(sd_bus *b) {
403 struct msghdr mh;
404 ssize_t k;
405
406 assert(b);
407 assert(b->state == BUS_AUTHENTICATING);
408
409 if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
410 return 0;
411
e3017af9
LP
412 if (b->auth_timeout == 0)
413 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
414
de1c301e
LP
415 zero(mh);
416 mh.msg_iov = b->auth_iovec + b->auth_index;
417 mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
418
419 k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
420 if (k < 0)
421 return errno == EAGAIN ? 0 : -errno;
422
423 iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
424
425 return 1;
426}
427
428static int bus_auth_verify(sd_bus *b) {
429 char *e, *f;
430 sd_id128_t peer;
431 unsigned i;
432 int r;
433
434 /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
435 * that's it */
436
437 e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
438 if (!e)
439 return 0;
440
89ffcd2a 441 f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
de1c301e
LP
442 if (!f)
443 return 0;
444
445 if (e - (char*) b->rbuffer != 3 + 32)
446 return -EPERM;
447
448 if (memcmp(b->rbuffer, "OK ", 3))
449 return -EPERM;
450
451 for (i = 0; i < 32; i += 2) {
452 int x, y;
453
454 x = unhexchar(((char*) b->rbuffer)[3 + i]);
89ffcd2a 455 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
de1c301e
LP
456
457 if (x < 0 || y < 0)
458 return -EINVAL;
459
460 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
461 }
462
463 if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
464 !sd_id128_equal(b->peer, peer))
465 return -EPERM;
466
467 b->peer = peer;
468
469 b->can_fds =
470 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
471 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
472
e3017af9
LP
473 b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
474 memmove(b->rbuffer, f + 2, b->rbuffer_size);
89ffcd2a 475
de1c301e
LP
476 r = bus_start_running(b);
477 if (r < 0)
478 return r;
479
480 return 1;
481}
482
483static int bus_read_auth(sd_bus *b) {
484 struct msghdr mh;
485 struct iovec iov;
486 size_t n;
487 ssize_t k;
488 int r;
89ffcd2a 489 void *p;
de1c301e
LP
490
491 assert(b);
492
493 r = bus_auth_verify(b);
494 if (r != 0)
495 return r;
496
497 n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
89ffcd2a
LP
498 p = realloc(b->rbuffer, n);
499 if (!p)
500 return -ENOMEM;
501
502 b->rbuffer = p;
de1c301e
LP
503
504 zero(iov);
505 iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
506 iov.iov_len = n - b->rbuffer_size;
507
508 zero(mh);
509 mh.msg_iov = &iov;
510 mh.msg_iovlen = 1;
511
512 k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
513 if (k < 0)
514 return errno == EAGAIN ? 0 : -errno;
515
516 b->rbuffer_size += k;
517
518 r = bus_auth_verify(b);
519 if (r != 0)
520 return r;
521
e3017af9 522 return 1;
de1c301e
LP
523}
524
525static int bus_start_auth(sd_bus *b) {
89ffcd2a 526 static const char auth_prefix[] = "\0AUTH EXTERNAL ";
de1c301e
LP
527 static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
528
529 char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
530 size_t l;
531
532 assert(b);
533
534 b->state = BUS_AUTHENTICATING;
535
536 snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
537 char_array_0(text);
538
539 l = strlen(text);
540 b->auth_uid = hexmem(text, l);
541 if (!b->auth_uid)
542 return -ENOMEM;
543
544 b->auth_iovec[0].iov_base = (void*) auth_prefix;
545 b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
546 b->auth_iovec[1].iov_base = (void*) b->auth_uid;
547 b->auth_iovec[1].iov_len = l * 2;
548 b->auth_iovec[2].iov_base = (void*) auth_suffix;
549 b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
550 b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
551
552 return bus_write_auth(b);
553}
554
555static int bus_start_connect(sd_bus *b) {
556 int r;
557
558 assert(b);
559 assert(b->fd < 0);
560
561 for (;;) {
562 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
563 r = bus_parse_next_address(b);
564 if (r < 0)
565 return r;
566 if (r == 0)
e3017af9 567 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
de1c301e
LP
568 }
569
570 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
571 if (b->fd < 0) {
e3017af9 572 b->last_connect_error = errno;
de1c301e
LP
573 zero(b->sockaddr);
574 continue;
575 }
576
577 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
578 if (r < 0) {
579 if (errno == EINPROGRESS)
e3017af9 580 return 1;
de1c301e 581
e3017af9 582 b->last_connect_error = errno;
de1c301e
LP
583 close_nointr_nofail(b->fd);
584 b->fd = -1;
585 zero(b->sockaddr);
586 continue;
587 }
588
589 return bus_start_auth(b);
590 }
591}
592
593int sd_bus_open_system(sd_bus **ret) {
594 const char *e;
595 sd_bus *b;
596 int r;
597
598 if (!ret)
599 return -EINVAL;
600
601 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
602 if (e) {
603 r = sd_bus_open_address(e, &b);
604 if (r < 0)
605 return r;
89ffcd2a
LP
606 } else {
607 b = bus_new();
608 if (!b)
609 return -ENOMEM;
de1c301e 610
89ffcd2a
LP
611 b->sockaddr.un.sun_family = AF_UNIX;
612 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
613 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
de1c301e 614
89ffcd2a
LP
615 r = bus_start_connect(b);
616 if (r < 0) {
617 bus_free(b);
618 return r;
619 }
620 }
de1c301e 621
89ffcd2a 622 r = bus_send_hello(b);
de1c301e 623 if (r < 0) {
89ffcd2a 624 sd_bus_unref(b);
de1c301e
LP
625 return r;
626 }
627
628 *ret = b;
629 return 0;
630}
631
632int sd_bus_open_user(sd_bus **ret) {
633 const char *e;
634 sd_bus *b;
635 size_t l;
636 int r;
637
638 if (!ret)
639 return -EINVAL;
640
641 e = getenv("DBUS_SESSION_BUS_ADDRESS");
642 if (e) {
643 r = sd_bus_open_address(e, &b);
644 if (r < 0)
645 return r;
89ffcd2a
LP
646 } else {
647 e = getenv("XDG_RUNTIME_DIR");
648 if (!e)
649 return -ENOENT;
de1c301e 650
89ffcd2a
LP
651 l = strlen(e);
652 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
653 return -E2BIG;
de1c301e 654
89ffcd2a
LP
655 b = bus_new();
656 if (!b)
657 return -ENOMEM;
de1c301e 658
89ffcd2a
LP
659 b->sockaddr.un.sun_family = AF_UNIX;
660 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
661 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
de1c301e 662
89ffcd2a
LP
663 r = bus_start_connect(b);
664 if (r < 0) {
665 bus_free(b);
666 return r;
667 }
668 }
de1c301e 669
89ffcd2a 670 r = bus_send_hello(b);
de1c301e 671 if (r < 0) {
89ffcd2a 672 sd_bus_unref(b);
de1c301e
LP
673 return r;
674 }
675
676 *ret = b;
677 return 0;
678}
679
680int sd_bus_open_address(const char *address, sd_bus **ret) {
681 sd_bus *b;
682 int r;
683
684 if (!address)
685 return -EINVAL;
686 if (!ret)
687 return -EINVAL;
688
689 b = bus_new();
690 if (!b)
691 return -ENOMEM;
692
693 b->address = strdup(address);
694 if (!b->address) {
695 bus_free(b);
696 return -ENOMEM;
697 }
698
699 r = bus_start_connect(b);
700 if (r < 0) {
701 bus_free(b);
702 return r;
703 }
704
705 *ret = b;
706 return 0;
707}
708
709int sd_bus_open_fd(int fd, sd_bus **ret) {
710 sd_bus *b;
711 int r;
712
713 if (fd < 0)
714 return -EINVAL;
715 if (!ret)
716 return -EINVAL;
717
718 b = bus_new();
719 if (!b)
720 return -ENOMEM;
721
722 b->fd = fd;
723 fd_nonblock(b->fd, true);
724 fd_cloexec(b->fd, true);
725
726 r = bus_start_auth(b);
727 if (r < 0) {
728 bus_free(b);
729 return r;
730 }
731
732 *ret = b;
733 return 0;
734}
735
736void sd_bus_close(sd_bus *bus) {
737 if (!bus)
738 return;
739 if (bus->fd < 0)
740 return;
741
742 close_nointr_nofail(bus->fd);
743 bus->fd = -1;
744}
745
746sd_bus *sd_bus_ref(sd_bus *bus) {
747 if (!bus)
748 return NULL;
749
750 assert(bus->n_ref > 0);
751
752 bus->n_ref++;
753 return bus;
754}
755
756sd_bus *sd_bus_unref(sd_bus *bus) {
757 if (!bus)
758 return NULL;
759
760 assert(bus->n_ref > 0);
761 bus->n_ref--;
762
763 if (bus->n_ref <= 0)
764 bus_free(bus);
765
766 return NULL;
767}
768
e3017af9
LP
769int sd_bus_is_open(sd_bus *bus) {
770 if (!bus)
771 return -EINVAL;
772
773 return bus->fd >= 0;
774}
775
de1c301e
LP
776int sd_bus_is_running(sd_bus *bus) {
777 if (!bus)
778 return -EINVAL;
779
780 if (bus->fd < 0)
781 return -ENOTCONN;
782
783 return bus->state == BUS_RUNNING;
784}
785
786int sd_bus_can_send(sd_bus *bus, char type) {
787
788 if (!bus)
789 return -EINVAL;
89ffcd2a
LP
790 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
791 return -EAGAIN;
de1c301e
LP
792
793 if (type == SD_BUS_TYPE_UNIX_FD)
794 return bus->can_fds;
795
796 return bus_type_is_valid(type);
797}
798
799static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
800 assert(m);
801
89ffcd2a
LP
802 if (m->header->version > b->message_version)
803 return -EPERM;
804
de1c301e
LP
805 if (m->sealed)
806 return 0;
807
9a17484d 808 return bus_message_seal(m, ++b->serial);
de1c301e
LP
809}
810
811static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
812 struct msghdr mh;
813 struct iovec *iov;
814 ssize_t k;
815 size_t n;
816 unsigned j;
817
818 assert(bus);
819 assert(m);
820 assert(idx);
89ffcd2a 821 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
de1c301e 822
e3017af9
LP
823 if (*idx >= m->size)
824 return 0;
825
de1c301e
LP
826 n = m->n_iovec * sizeof(struct iovec);
827 iov = alloca(n);
828 memcpy(iov, m->iovec, n);
829
830 j = 0;
831 iovec_advance(iov, &j, *idx);
832
833 zero(mh);
834 mh.msg_iov = iov;
835 mh.msg_iovlen = m->n_iovec;
836
837 k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
838 if (k < 0)
e3017af9 839 return errno == EAGAIN ? 0 : -errno;
de1c301e
LP
840
841 *idx += (size_t) k;
e3017af9 842 return 1;
de1c301e
LP
843}
844
845static int message_read_need(sd_bus *bus, size_t *need) {
846 uint32_t a, b;
847 uint8_t e;
848
849 assert(bus);
850 assert(need);
89ffcd2a 851 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
de1c301e 852
89ffcd2a 853 if (bus->rbuffer_size < sizeof(struct bus_header)) {
e3017af9
LP
854 *need = sizeof(struct bus_header) + 8;
855
856 /* Minimum message size:
857 *
858 * Header +
859 *
860 * Method Call: +2 string headers
861 * Signal: +3 string headers
862 * Method Error: +1 string headers
863 * +1 uint32 headers
864 * Method Reply: +1 uint32 headers
865 *
866 * A string header is at least 9 bytes
867 * A uint32 header is at least 8 bytes
868 *
869 * Hence the minimum message size of a valid message
870 * is header + 8 bytes */
871
de1c301e
LP
872 return 0;
873 }
874
875 a = ((const uint32_t*) bus->rbuffer)[1];
876 b = ((const uint32_t*) bus->rbuffer)[3];
877
878 e = ((const uint8_t*) bus->rbuffer)[0];
879 if (e == SD_BUS_LITTLE_ENDIAN) {
880 a = le32toh(a);
881 b = le32toh(b);
882 } else if (e == SD_BUS_BIG_ENDIAN) {
883 a = be32toh(a);
884 b = be32toh(b);
885 } else
89ffcd2a 886 return -EBADMSG;
de1c301e 887
89ffcd2a 888 *need = sizeof(struct bus_header) + ALIGN_TO(b, 8) + a;
de1c301e
LP
889 return 0;
890}
891
892static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
893 sd_bus_message *t;
894 void *b = NULL;
895 int r;
896
897 assert(bus);
898 assert(m);
899 assert(bus->rbuffer_size >= size);
89ffcd2a 900 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
de1c301e 901
de1c301e
LP
902 if (bus->rbuffer_size > size) {
903 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
904 if (!b) {
905 free(t);
906 return -ENOMEM;
907 }
908 }
909
80a46c73
LP
910 r = bus_message_from_malloc(bus->rbuffer, size, &t);
911 if (r < 0) {
912 free(b);
913 return r;
914 }
de1c301e
LP
915
916 bus->rbuffer = b;
917 bus->rbuffer_size -= size;
918
de1c301e
LP
919 *m = t;
920 return 1;
921}
922
923static int message_read(sd_bus *bus, sd_bus_message **m) {
924 struct msghdr mh;
925 struct iovec iov;
926 ssize_t k;
927 size_t need;
928 int r;
929 void *b;
930
931 assert(bus);
932 assert(m);
89ffcd2a 933 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
de1c301e
LP
934
935 r = message_read_need(bus, &need);
936 if (r < 0)
937 return r;
938
939 if (bus->rbuffer_size >= need)
940 return message_make(bus, need, m);
941
942 b = realloc(bus->rbuffer, need);
943 if (!b)
944 return -ENOMEM;
945
89ffcd2a
LP
946 bus->rbuffer = b;
947
de1c301e
LP
948 zero(iov);
949 iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
950 iov.iov_len = need - bus->rbuffer_size;
951
952 zero(mh);
953 mh.msg_iov = &iov;
954 mh.msg_iovlen = 1;
955
956 k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
957 if (k < 0)
958 return errno == EAGAIN ? 0 : -errno;
959
960 bus->rbuffer_size += k;
961
962 r = message_read_need(bus, &need);
963 if (r < 0)
964 return r;
965
966 if (bus->rbuffer_size >= need)
967 return message_make(bus, need, m);
968
e3017af9 969 return 1;
de1c301e
LP
970}
971
972static int dispatch_wqueue(sd_bus *bus) {
e3017af9 973 int r, ret = 0;
de1c301e
LP
974
975 assert(bus);
89ffcd2a 976 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
de1c301e
LP
977
978 if (bus->fd < 0)
979 return -ENOTCONN;
980
981 while (bus->wqueue_size > 0) {
982
983 r = message_write(bus, bus->wqueue[0], &bus->windex);
984 if (r < 0) {
985 sd_bus_close(bus);
986 return r;
987 } else if (r == 0)
e3017af9
LP
988 /* Didn't do anything this time */
989 return ret;
990 else if (bus->windex >= bus->wqueue[0]->size) {
de1c301e
LP
991 /* Fully written. Let's drop the entry from
992 * the queue.
993 *
994 * This isn't particularly optimized, but
995 * well, this is supposed to be our worst-case
996 * buffer only, and the socket buffer is
997 * supposed to be our primary buffer, and if
998 * it got full, then all bets are off
999 * anyway. */
1000
1001 sd_bus_message_unref(bus->wqueue[0]);
1002 bus->wqueue_size --;
1003 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1004 bus->windex = 0;
1005
e3017af9 1006 ret = 1;
de1c301e
LP
1007 }
1008 }
1009
e3017af9 1010 return ret;
de1c301e
LP
1011}
1012
1013static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
e3017af9
LP
1014 sd_bus_message *z;
1015 int r, ret = 0;
de1c301e
LP
1016
1017 assert(bus);
1018 assert(m);
89ffcd2a 1019 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
de1c301e
LP
1020
1021 if (bus->fd < 0)
1022 return -ENOTCONN;
1023
1024 if (bus->rqueue_size > 0) {
1025 /* Dispatch a queued message */
1026
1027 *m = bus->rqueue[0];
1028 bus->rqueue_size --;
1029 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1030 return 1;
1031 }
1032
1033 /* Try to read a new message */
e3017af9
LP
1034 do {
1035 r = message_read(bus, &z);
1036 if (r < 0) {
1037 sd_bus_close(bus);
1038 return r;
1039 }
1040 if (r == 0)
1041 return ret;
de1c301e 1042
e3017af9
LP
1043 r = 1;
1044 } while (!z);
1045
1046 *m = z;
1047 return 1;
de1c301e
LP
1048}
1049
1050int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1051 int r;
1052
1053 if (!bus)
1054 return -EINVAL;
1055 if (bus->fd < 0)
1056 return -ENOTCONN;
1057 if (!m)
1058 return -EINVAL;
de1c301e 1059
29f6aadd
LP
1060 /* If the serial number isn't kept, then we know that no reply
1061 * is expected */
1062 if (!serial && !m->sealed)
1063 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1064
de1c301e
LP
1065 r = bus_seal_message(bus, m);
1066 if (r < 0)
1067 return r;
1068
5407f2de
LP
1069 /* If this is a reply and no reply was requested, then let's
1070 * suppress this, if we can */
1071 if (m->dont_send && !serial)
1072 return 0;
1073
89ffcd2a 1074 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
de1c301e
LP
1075 size_t idx = 0;
1076
1077 r = message_write(bus, m, &idx);
1078 if (r < 0) {
1079 sd_bus_close(bus);
1080 return r;
e3017af9 1081 } else if (idx < m->size) {
de1c301e
LP
1082 /* Wasn't fully written. So let's remember how
1083 * much was written. Note that the first entry
1084 * of the wqueue array is always allocated so
1085 * that we always can remember how much was
1086 * written. */
1087 bus->wqueue[0] = sd_bus_message_ref(m);
1088 bus->wqueue_size = 1;
1089 bus->windex = idx;
1090 }
1091 } else {
1092 sd_bus_message **q;
1093
1094 /* Just append it to the queue. */
1095
1096 if (bus->wqueue_size >= WQUEUE_MAX)
1097 return -ENOBUFS;
1098
1099 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1100 if (!q)
1101 return -ENOMEM;
1102
1103 bus->wqueue = q;
1104 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1105 }
1106
1107 if (serial)
1108 *serial = BUS_MESSAGE_SERIAL(m);
1109
1110 return 0;
1111}
1112
1113static usec_t calc_elapse(uint64_t usec) {
1114 if (usec == (uint64_t) -1)
1115 return 0;
1116
1117 if (usec == 0)
e3017af9 1118 usec = BUS_DEFAULT_TIMEOUT;
de1c301e
LP
1119
1120 return now(CLOCK_MONOTONIC) + usec;
1121}
1122
e3017af9
LP
1123static int timeout_compare(const void *a, const void *b) {
1124 const struct reply_callback *x = a, *y = b;
1125
1126 if (x->timeout != 0 && y->timeout == 0)
1127 return -1;
1128
1129 if (x->timeout == 0 && y->timeout != 0)
1130 return 1;
1131
1132 if (x->timeout < y->timeout)
1133 return -1;
1134
1135 if (x->timeout > y->timeout)
1136 return 1;
1137
1138 return 0;
1139}
1140
de1c301e
LP
1141int sd_bus_send_with_reply(
1142 sd_bus *bus,
1143 sd_bus_message *m,
1144 sd_message_handler_t callback,
1145 void *userdata,
1146 uint64_t usec,
1147 uint64_t *serial) {
1148
1149 struct reply_callback *c;
1150 int r;
1151
1152 if (!bus)
1153 return -EINVAL;
89ffcd2a 1154 if (bus->fd < 0)
de1c301e
LP
1155 return -ENOTCONN;
1156 if (!m)
1157 return -EINVAL;
1158 if (!callback)
1159 return -EINVAL;
89ffcd2a 1160 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
de1c301e 1161 return -EINVAL;
89ffcd2a
LP
1162 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1163 return -EINVAL;
1164
1165 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1166 if (r < 0)
1167 return r;
de1c301e 1168
e3017af9
LP
1169 if (usec != (uint64_t) -1) {
1170 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1171 if (r < 0)
1172 return r;
1173 }
1174
de1c301e
LP
1175 r = bus_seal_message(bus, m);
1176 if (r < 0)
1177 return r;
1178
1179 c = new(struct reply_callback, 1);
1180 if (!c)
1181 return -ENOMEM;
1182
1183 c->callback = callback;
1184 c->userdata = userdata;
1185 c->serial = BUS_MESSAGE_SERIAL(m);
1186 c->timeout = calc_elapse(usec);
1187
1188 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1189 if (r < 0) {
1190 free(c);
1191 return r;
1192 }
1193
e3017af9
LP
1194 if (c->timeout != 0) {
1195 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1196 if (r < 0) {
1197 c->timeout = 0;
1198 sd_bus_send_with_reply_cancel(bus, c->serial);
1199 return r;
1200 }
1201 }
1202
de1c301e
LP
1203 r = sd_bus_send(bus, m, serial);
1204 if (r < 0) {
e3017af9 1205 sd_bus_send_with_reply_cancel(bus, c->serial);
de1c301e
LP
1206 return r;
1207 }
1208
1209 return r;
1210}
1211
1212int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
e3017af9 1213 struct reply_callback *c;
de1c301e
LP
1214
1215 if (!bus)
1216 return -EINVAL;
1217 if (serial == 0)
1218 return -EINVAL;
1219
1220 c = hashmap_remove(bus->reply_callbacks, &serial);
1221 if (!c)
1222 return 0;
1223
e3017af9
LP
1224 if (c->timeout != 0)
1225 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1226
de1c301e
LP
1227 free(c);
1228 return 1;
1229}
1230
89ffcd2a
LP
1231static int ensure_running(sd_bus *bus) {
1232 int r;
1233
1234 assert(bus);
1235
1236 r = sd_bus_is_running(bus);
1237 if (r != 0)
1238 return r;
1239
1240 for (;;) {
e3017af9
LP
1241 int k;
1242
89ffcd2a 1243 r = sd_bus_process(bus, NULL);
e3017af9 1244
89ffcd2a
LP
1245 if (r < 0)
1246 return r;
1247
e3017af9
LP
1248 k = sd_bus_is_running(bus);
1249 if (k != 0)
1250 return k;
1251
1252 if (r > 0)
1253 continue;
89ffcd2a
LP
1254
1255 r = sd_bus_wait(bus, (uint64_t) -1);
1256 if (r < 0)
1257 return r;
1258 }
1259}
1260
de1c301e
LP
1261int sd_bus_send_with_reply_and_block(
1262 sd_bus *bus,
1263 sd_bus_message *m,
1264 uint64_t usec,
1265 sd_bus_error *error,
1266 sd_bus_message **reply) {
1267
1268 int r;
1269 usec_t timeout;
1270 uint64_t serial;
1271 bool room = false;
1272
1273 if (!bus)
1274 return -EINVAL;
89ffcd2a 1275 if (bus->fd < 0)
de1c301e
LP
1276 return -ENOTCONN;
1277 if (!m)
1278 return -EINVAL;
89ffcd2a 1279 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
de1c301e 1280 return -EINVAL;
89ffcd2a 1281 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
de1c301e 1282 return -EINVAL;
89ffcd2a
LP
1283 if (bus_error_is_dirty(error))
1284 return -EINVAL;
1285
1286 r = ensure_running(bus);
1287 if (r < 0)
1288 return r;
de1c301e
LP
1289
1290 r = sd_bus_send(bus, m, &serial);
1291 if (r < 0)
1292 return r;
1293
1294 timeout = calc_elapse(usec);
1295
1296 for (;;) {
1297 usec_t left;
e3017af9 1298 sd_bus_message *incoming = NULL;
de1c301e
LP
1299
1300 if (!room) {
1301 sd_bus_message **q;
1302
1303 /* Make sure there's room for queuing this
1304 * locally, before we read the message */
1305
1306 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1307 if (!q)
1308 return -ENOMEM;
1309
1310 bus->rqueue = q;
1311 room = true;
1312 }
1313
1314 r = message_read(bus, &incoming);
1315 if (r < 0)
1316 return r;
e3017af9 1317 if (incoming) {
89ffcd2a 1318
de1c301e
LP
1319 if (incoming->reply_serial == serial) {
1320 /* Found a match! */
1321
1322 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1323 *reply = incoming;
1324 return 0;
1325 }
1326
1327 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1328 int k;
1329
1330 r = sd_bus_error_copy(error, &incoming->error);
1331 if (r < 0) {
1332 sd_bus_message_unref(incoming);
1333 return r;
1334 }
1335
1336 k = bus_error_to_errno(&incoming->error);
1337 sd_bus_message_unref(incoming);
1338 return k;
1339 }
1340
1341 sd_bus_message_unref(incoming);
1342 return -EIO;
1343 }
1344
1345 /* There's already guaranteed to be room for
1346 * this, so need to resize things here */
1347 bus->rqueue[bus->rqueue_size ++] = incoming;
1348 room = false;
1349
1350 /* Try to read more, right-away */
1351 continue;
1352 }
e3017af9
LP
1353 if (r != 0)
1354 continue;
de1c301e
LP
1355
1356 if (timeout > 0) {
1357 usec_t n;
1358
1359 n = now(CLOCK_MONOTONIC);
1360 if (n >= timeout)
1361 return -ETIMEDOUT;
1362
1363 left = timeout - n;
1364 } else
1365 left = (uint64_t) -1;
1366
e3017af9 1367 r = bus_poll(bus, true, left);
de1c301e
LP
1368 if (r < 0)
1369 return r;
1370
1371 r = dispatch_wqueue(bus);
1372 if (r < 0)
1373 return r;
1374 }
1375}
1376
1377int sd_bus_get_fd(sd_bus *bus) {
1378 if (!bus)
1379 return -EINVAL;
1380
1381 if (bus->fd < 0)
89ffcd2a 1382 return -ENOTCONN;
de1c301e
LP
1383
1384 return bus->fd;
1385}
1386
1387int sd_bus_get_events(sd_bus *bus) {
1388 int flags = 0;
1389
1390 if (!bus)
1391 return -EINVAL;
de1c301e 1392 if (bus->fd < 0)
89ffcd2a 1393 return -ENOTCONN;
de1c301e
LP
1394
1395 if (bus->state == BUS_OPENING)
1396 flags |= POLLOUT;
89ffcd2a
LP
1397 else if (bus->state == BUS_AUTHENTICATING) {
1398
1399 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1400 flags |= POLLOUT;
1401
1402 flags |= POLLIN;
1403
1404 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
de1c301e
LP
1405 if (bus->rqueue_size <= 0)
1406 flags |= POLLIN;
1407 if (bus->wqueue_size > 0)
1408 flags |= POLLOUT;
1409 }
1410
1411 return flags;
1412}
1413
e3017af9
LP
1414int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1415 struct reply_callback *c;
1416
1417 if (!bus)
1418 return -EINVAL;
1419 if (!timeout_usec)
1420 return -EINVAL;
1421 if (bus->fd < 0)
1422 return -ENOTCONN;
1423
1424 if (bus->state == BUS_AUTHENTICATING) {
1425 *timeout_usec = bus->auth_timeout;
1426 return 1;
1427 }
1428
1429 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1430 return 0;
1431
1432 c = prioq_peek(bus->reply_callbacks_prioq);
1433 if (!c)
1434 return 0;
1435
1436 *timeout_usec = c->timeout;
1437 return 1;
1438}
1439
1440static int process_timeout(sd_bus *bus) {
1441 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1442 struct reply_callback *c;
1443 usec_t n;
1444 int r;
1445
1446 assert(bus);
1447
1448 c = prioq_peek(bus->reply_callbacks_prioq);
1449 if (!c)
1450 return 0;
1451
1452 n = now(CLOCK_MONOTONIC);
1453 if (c->timeout > n)
1454 return 0;
1455
1456 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1457 hashmap_remove(bus->reply_callbacks, &c->serial);
1458
1459 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1460 free(c);
1461
1462 return r < 0 ? r : 1;
1463}
1464
1465static int process_message(sd_bus *bus, sd_bus_message *m) {
1466 struct filter_callback *l;
1467 int r;
1468
1469 assert(bus);
1470 assert(m);
1471
1472 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1473 struct reply_callback *c;
1474
1475 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1476 if (c) {
1477 if (c->timeout != 0)
1478 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1479
1480 r = c->callback(bus, 0, m, c->userdata);
1481 free(c);
1482
1483 if (r != 0)
1484 return r;
1485 }
1486 }
1487
1488 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1489 r = l->callback(bus, 0, m, l->userdata);
1490 if (r != 0)
1491 return r;
1492 }
1493
1494 return 0;
1495}
1496
de1c301e 1497int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
de1c301e
LP
1498 int r;
1499
e3017af9
LP
1500 /* Returns 0 when we didn't do anything. This should cause the
1501 * caller to invoke sd_bus_wait() before returning the next
1502 * time. Returns > 0 when we did something, which possibly
1503 * means *ret is filled in with an unprocessed message. */
1504
de1c301e
LP
1505 if (!bus)
1506 return -EINVAL;
1507 if (bus->fd < 0)
1508 return -ENOTCONN;
1509
1510 if (bus->state == BUS_OPENING) {
1511 struct pollfd p;
1512
1513 zero(p);
1514 p.fd = bus->fd;
1515 p.events = POLLOUT;
1516
1517 r = poll(&p, 1, 0);
1518 if (r < 0)
1519 return -errno;
1520
1521 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
89ffcd2a 1522 int error = 0;
de1c301e
LP
1523 socklen_t slen = sizeof(error);
1524
1525 r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1526 if (r < 0)
e3017af9
LP
1527 bus->last_connect_error = errno;
1528 else if (error != 0)
1529 bus->last_connect_error = error;
de1c301e 1530 else if (p.revents & (POLLERR|POLLHUP))
e3017af9
LP
1531 bus->last_connect_error = ECONNREFUSED;
1532 else {
1533 r = bus_start_auth(bus);
1534 goto null_message;
1535 }
de1c301e
LP
1536
1537 /* Try next address */
e3017af9
LP
1538 r = bus_start_connect(bus);
1539 goto null_message;
de1c301e
LP
1540 }
1541
e3017af9
LP
1542 r = 0;
1543 goto null_message;
de1c301e
LP
1544
1545 } else if (bus->state == BUS_AUTHENTICATING) {
1546
e3017af9
LP
1547 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1548 return -ETIMEDOUT;
1549
de1c301e 1550 r = bus_write_auth(bus);
e3017af9
LP
1551 if (r != 0)
1552 goto null_message;
de1c301e
LP
1553
1554 r = bus_read_auth(bus);
e3017af9 1555 goto null_message;
de1c301e
LP
1556
1557 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
89ffcd2a 1558 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
e3017af9
LP
1559 int k;
1560
1561 r = process_timeout(bus);
1562 if (r != 0)
1563 goto null_message;
de1c301e
LP
1564
1565 r = dispatch_wqueue(bus);
e3017af9
LP
1566 if (r != 0)
1567 goto null_message;
de1c301e 1568
e3017af9 1569 k = r;
de1c301e 1570 r = dispatch_rqueue(bus, &m);
e3017af9 1571 if (r < 0)
de1c301e 1572 return r;
e3017af9
LP
1573 if (!m) {
1574 if (r == 0)
1575 r = k;
1576 goto null_message;
de1c301e
LP
1577 }
1578
e3017af9
LP
1579 r = process_message(bus, m);
1580 if (r != 0)
1581 goto null_message;
de1c301e
LP
1582
1583 if (ret) {
1584 *ret = m;
89ffcd2a 1585 m = NULL;
de1c301e
LP
1586 return 1;
1587 }
1588
89ffcd2a
LP
1589 if (sd_bus_message_is_method_call(m, NULL, NULL)) {
1590 const sd_bus_error e = SD_BUS_ERROR_INIT_CONST("org.freedesktop.DBus.Error.UnknownObject", "Unknown object.");
1591 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1592
1593 r = sd_bus_message_new_method_error(bus, m, &e, &reply);
1594 if (r < 0)
1595 return r;
1596
1597 r = sd_bus_send(bus, reply, NULL);
1598 if (r < 0)
1599 return r;
1600 }
1601
e3017af9 1602 return 1;
de1c301e
LP
1603 }
1604
89ffcd2a 1605 assert_not_reached("Unknown state");
e3017af9
LP
1606
1607null_message:
1608 if (r >= 0 && ret)
1609 *ret = NULL;
1610
1611 return r;
de1c301e
LP
1612}
1613
e3017af9 1614static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
de1c301e
LP
1615 struct pollfd p;
1616 int r, e;
1617 struct timespec ts;
e3017af9
LP
1618 usec_t until, m;
1619
1620 assert(bus);
de1c301e 1621
de1c301e 1622 if (bus->fd < 0)
89ffcd2a
LP
1623 return -ENOTCONN;
1624
de1c301e
LP
1625 e = sd_bus_get_events(bus);
1626 if (e < 0)
1627 return e;
1628
e3017af9
LP
1629 if (need_more)
1630 e |= POLLIN;
1631
1632 r = sd_bus_get_timeout(bus, &until);
1633 if (r < 0)
1634 return r;
1635 if (r == 0)
1636 m = (uint64_t) -1;
1637 else {
1638 usec_t n;
1639 n = now(CLOCK_MONOTONIC);
1640 m = until > n ? until - n : 0;
1641 }
1642
1643 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1644 m = timeout_usec;
1645
de1c301e
LP
1646 zero(p);
1647 p.fd = bus->fd;
1648 p.events = e;
1649
e3017af9 1650 r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
de1c301e 1651 if (r < 0)
89ffcd2a 1652 return -errno;
de1c301e 1653
e3017af9
LP
1654 return r > 0 ? 1 : 0;
1655}
1656
1657int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1658
1659 if (!bus)
1660 return -EINVAL;
1661 if (bus->fd < 0)
1662 return -ENOTCONN;
1663 if (bus->rqueue_size > 0)
1664 return 0;
1665
1666 return bus_poll(bus, false, timeout_usec);
de1c301e
LP
1667}
1668
1669int sd_bus_flush(sd_bus *bus) {
1670 int r;
1671
1672 if (!bus)
1673 return -EINVAL;
1674 if (bus->fd < 0)
1675 return -ENOTCONN;
1676
89ffcd2a
LP
1677 r = ensure_running(bus);
1678 if (r < 0)
1679 return r;
1680
1681 if (bus->wqueue_size <= 0)
de1c301e
LP
1682 return 0;
1683
1684 for (;;) {
1685 r = dispatch_wqueue(bus);
1686 if (r < 0)
1687 return r;
1688
89ffcd2a 1689 if (bus->wqueue_size <= 0)
de1c301e
LP
1690 return 0;
1691
e3017af9 1692 r = bus_poll(bus, false, (uint64_t) -1);
de1c301e
LP
1693 if (r < 0)
1694 return r;
1695 }
1696}
1697
1698int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1699 struct filter_callback *f;
1700
1701 if (!bus)
1702 return -EINVAL;
1703 if (!callback)
1704 return -EINVAL;
1705
1706 f = new(struct filter_callback, 1);
1707 if (!f)
1708 return -ENOMEM;
1709 f->callback = callback;
1710 f->userdata = userdata;
1711
1712 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1713 return 0;
1714}
1715
1716int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1717 struct filter_callback *f;
1718
1719 if (!bus)
1720 return -EINVAL;
1721 if (!callback)
1722 return -EINVAL;
1723
1724 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1725 if (f->callback == callback && f->userdata == userdata) {
1726 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1727 free(f);
1728 return 1;
1729 }
1730 }
1731
1732 return 0;
1733}