1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
35 #include "bus-internal.h"
36 #include "bus-message.h"
38 #include "bus-socket.h"
40 static int bus_poll(sd_bus
*bus
, bool need_more
, uint64_t timeout_usec
);
42 static void bus_free(sd_bus
*b
) {
43 struct filter_callback
*f
;
44 struct object_callback
*c
;
50 close_nointr_nofail(b
->fd
);
58 strv_free(b
->exec_argv
);
60 close_many(b
->fds
, b
->n_fds
);
63 for (i
= 0; i
< b
->rqueue_size
; i
++)
64 sd_bus_message_unref(b
->rqueue
[i
]);
67 for (i
= 0; i
< b
->wqueue_size
; i
++)
68 sd_bus_message_unref(b
->wqueue
[i
]);
71 hashmap_free_free(b
->reply_callbacks
);
72 prioq_free(b
->reply_callbacks_prioq
);
74 while ((f
= b
->filter_callbacks
)) {
75 LIST_REMOVE(struct filter_callback
, callbacks
, b
->filter_callbacks
, f
);
79 while ((c
= hashmap_steal_first(b
->object_callbacks
))) {
84 hashmap_free(b
->object_callbacks
);
89 int sd_bus_new(sd_bus
**ret
) {
101 r
->message_version
= 1;
102 r
->negotiate_fds
= true;
104 /* We guarantee that wqueue always has space for at least one
106 r
->wqueue
= new(sd_bus_message
*, 1);
116 int sd_bus_set_address(sd_bus
*bus
, const char *address
) {
121 if (bus
->state
!= BUS_UNSET
)
136 int sd_bus_set_fd(sd_bus
*bus
, int fd
) {
139 if (bus
->state
!= BUS_UNSET
)
148 int sd_bus_set_exec(sd_bus
*bus
, const char *path
, char *const argv
[]) {
153 if (bus
->state
!= BUS_UNSET
)
157 if (strv_isempty(argv
))
170 free(bus
->exec_path
);
171 strv_free(bus
->exec_argv
);
179 int sd_bus_set_hello(sd_bus
*bus
, int b
) {
182 if (bus
->state
!= BUS_UNSET
)
185 bus
->send_hello
= !!b
;
189 int sd_bus_set_negotiate_fds(sd_bus
*bus
, int b
) {
192 if (bus
->state
!= BUS_UNSET
)
195 bus
->negotiate_fds
= !!b
;
199 static int hello_callback(sd_bus
*bus
, int error
, sd_bus_message
*reply
, void *userdata
) {
204 assert(bus
->state
== BUS_HELLO
);
211 r
= sd_bus_message_read(reply
, "s", &s
);
215 if (!service_name_is_valid(s
) || s
[0] != ':')
218 bus
->unique_name
= strdup(s
);
219 if (!bus
->unique_name
)
222 bus
->state
= BUS_RUNNING
;
227 static int bus_send_hello(sd_bus
*bus
) {
228 _cleanup_bus_message_unref_ sd_bus_message
*m
= NULL
;
233 if (!bus
->send_hello
)
236 r
= sd_bus_message_new_method_call(
238 "org.freedesktop.DBus",
240 "org.freedesktop.DBus",
246 r
= sd_bus_send_with_reply(bus
, m
, hello_callback
, NULL
, 0, NULL
);
253 int bus_start_running(sd_bus
*bus
) {
256 if (bus
->send_hello
) {
257 bus
->state
= BUS_HELLO
;
261 bus
->state
= BUS_RUNNING
;
265 static int parse_address_key(const char **p
, const char *key
, char **value
) {
276 if (strncmp(*p
, key
, l
) != 0)
289 while (*a
!= ';' && *a
!= ',' && *a
!= 0) {
307 c
= (char) ((x
<< 4) | y
);
314 t
= realloc(r
, n
+ 2);
342 static void skip_address_key(const char **p
) {
346 *p
+= strcspn(*p
, ",");
352 static int parse_unix_address(sd_bus
*b
, const char **p
, char **guid
) {
353 _cleanup_free_
char *path
= NULL
, *abstract
= NULL
;
362 while (**p
!= 0 && **p
!= ';') {
363 r
= parse_address_key(p
, "guid", guid
);
369 r
= parse_address_key(p
, "path", &path
);
375 r
= parse_address_key(p
, "abstract", &abstract
);
384 if (!path
&& !abstract
)
387 if (path
&& abstract
)
392 if (l
> sizeof(b
->sockaddr
.un
.sun_path
))
395 b
->sockaddr
.un
.sun_family
= AF_UNIX
;
396 strncpy(b
->sockaddr
.un
.sun_path
, path
, sizeof(b
->sockaddr
.un
.sun_path
));
397 b
->sockaddr_size
= offsetof(struct sockaddr_un
, sun_path
) + l
;
398 } else if (abstract
) {
399 l
= strlen(abstract
);
400 if (l
> sizeof(b
->sockaddr
.un
.sun_path
) - 1)
403 b
->sockaddr
.un
.sun_family
= AF_UNIX
;
404 b
->sockaddr
.un
.sun_path
[0] = 0;
405 strncpy(b
->sockaddr
.un
.sun_path
+1, abstract
, sizeof(b
->sockaddr
.un
.sun_path
)-1);
406 b
->sockaddr_size
= offsetof(struct sockaddr_un
, sun_path
) + 1 + l
;
412 static int parse_tcp_address(sd_bus
*b
, const char **p
, char **guid
) {
413 _cleanup_free_
char *host
= NULL
, *port
= NULL
, *family
= NULL
;
414 struct addrinfo hints
, *result
;
422 while (**p
!= 0 && **p
!= ';') {
423 r
= parse_address_key(p
, "guid", guid
);
429 r
= parse_address_key(p
, "host", &host
);
435 r
= parse_address_key(p
, "port", &port
);
441 r
= parse_address_key(p
, "family", &family
);
454 hints
.ai_socktype
= SOCK_STREAM
;
455 hints
.ai_flags
= AI_ADDRCONFIG
;
458 if (streq(family
, "ipv4"))
459 hints
.ai_family
= AF_INET
;
460 else if (streq(family
, "ipv6"))
461 hints
.ai_family
= AF_INET6
;
466 r
= getaddrinfo(host
, port
, &hints
, &result
);
470 return -EADDRNOTAVAIL
;
472 memcpy(&b
->sockaddr
, result
->ai_addr
, result
->ai_addrlen
);
473 b
->sockaddr_size
= result
->ai_addrlen
;
475 freeaddrinfo(result
);
480 static int parse_exec_address(sd_bus
*b
, const char **p
, char **guid
) {
482 unsigned n_argv
= 0, j
;
491 while (**p
!= 0 && **p
!= ';') {
492 r
= parse_address_key(p
, "guid", guid
);
498 r
= parse_address_key(p
, "path", &path
);
504 if (startswith(*p
, "argv")) {
508 ul
= strtoul(*p
+ 4, (char**) p
, 10);
509 if (errno
!= 0 || **p
!= '=' || ul
> 256) {
519 x
= realloc(argv
, sizeof(char*) * (ul
+ 2));
525 memset(x
+ n_argv
, 0, sizeof(char*) * (ul
- n_argv
+ 2));
531 r
= parse_address_key(p
, NULL
, argv
+ ul
);
544 /* Make sure there are no holes in the array, with the
545 * exception of argv[0] */
546 for (j
= 1; j
< n_argv
; j
++)
552 if (argv
&& argv
[0] == NULL
) {
553 argv
[0] = strdup(path
);
565 for (j
= 0; j
< n_argv
; j
++)
573 static void bus_reset_parsed_address(sd_bus
*b
) {
577 b
->sockaddr_size
= 0;
578 strv_free(b
->exec_argv
);
582 b
->peer
= SD_ID128_NULL
;
585 static int bus_parse_next_address(sd_bus
*b
) {
586 _cleanup_free_
char *guid
= NULL
;
594 if (b
->address
[b
->address_index
] == 0)
597 bus_reset_parsed_address(b
);
599 a
= b
->address
+ b
->address_index
;
608 if (startswith(a
, "unix:")) {
611 r
= parse_unix_address(b
, &a
, &guid
);
616 } else if (startswith(a
, "tcp:")) {
619 r
= parse_tcp_address(b
, &a
, &guid
);
625 } else if (startswith(a
, "unixexec:")) {
628 r
= parse_exec_address(b
, &a
, &guid
);
642 r
= sd_id128_from_string(guid
, &b
->peer
);
647 b
->address_index
= a
- b
->address
;
651 static int bus_start_address(sd_bus
*b
) {
658 close_nointr_nofail(b
->fd
);
662 if (b
->sockaddr
.sa
.sa_family
!= AF_UNSPEC
) {
664 r
= bus_socket_connect(b
);
668 b
->last_connect_error
= -r
;
670 } else if (b
->exec_path
) {
672 r
= bus_socket_exec(b
);
676 b
->last_connect_error
= -r
;
679 r
= bus_parse_next_address(b
);
683 return b
->last_connect_error
? -b
->last_connect_error
: -ECONNREFUSED
;
687 int bus_next_address(sd_bus
*b
) {
690 bus_reset_parsed_address(b
);
691 return bus_start_address(b
);
694 static int bus_start_fd(sd_bus
*b
) {
699 r
= fd_nonblock(b
->fd
, true);
703 r
= fd_cloexec(b
->fd
, true);
707 return bus_socket_take_fd(b
);
710 int sd_bus_start(sd_bus
*bus
) {
715 if (bus
->state
!= BUS_UNSET
)
718 bus
->state
= BUS_OPENING
;
721 r
= bus_start_fd(bus
);
722 else if (bus
->address
)
723 r
= bus_start_address(bus
);
730 return bus_send_hello(bus
);
733 int sd_bus_open_system(sd_bus
**ret
) {
745 e
= getenv("DBUS_SYSTEM_BUS_ADDRESS");
747 r
= sd_bus_set_address(b
, e
);
751 b
->sockaddr
.un
.sun_family
= AF_UNIX
;
752 strncpy(b
->sockaddr
.un
.sun_path
, "/run/dbus/system_bus_socket", sizeof(b
->sockaddr
.un
.sun_path
));
753 b
->sockaddr_size
= offsetof(struct sockaddr_un
, sun_path
) + sizeof("/run/dbus/system_bus_socket") - 1;
756 b
->send_hello
= true;
770 int sd_bus_open_user(sd_bus
**ret
) {
783 e
= getenv("DBUS_SESSION_BUS_ADDRESS");
785 r
= sd_bus_set_address(b
, e
);
789 e
= getenv("XDG_RUNTIME_DIR");
796 if (l
+ 4 > sizeof(b
->sockaddr
.un
.sun_path
)) {
801 b
->sockaddr
.un
.sun_family
= AF_UNIX
;
802 memcpy(mempcpy(b
->sockaddr
.un
.sun_path
, e
, l
), "/bus", 4);
803 b
->sockaddr_size
= offsetof(struct sockaddr_un
, sun_path
) + l
+ 4;
806 b
->send_hello
= true;
820 void sd_bus_close(sd_bus
*bus
) {
826 close_nointr_nofail(bus
->fd
);
830 sd_bus
*sd_bus_ref(sd_bus
*bus
) {
834 assert(bus
->n_ref
> 0);
840 sd_bus
*sd_bus_unref(sd_bus
*bus
) {
844 assert(bus
->n_ref
> 0);
853 int sd_bus_is_open(sd_bus
*bus
) {
857 return bus
->state
!= BUS_UNSET
&& bus
->fd
>= 0;
860 int sd_bus_can_send(sd_bus
*bus
, char type
) {
868 if (type
== SD_BUS_TYPE_UNIX_FD
) {
869 if (!bus
->negotiate_fds
)
872 r
= bus_ensure_running(bus
);
879 return bus_type_is_valid(type
);
882 int sd_bus_get_peer(sd_bus
*bus
, sd_id128_t
*peer
) {
890 r
= bus_ensure_running(bus
);
898 static int bus_seal_message(sd_bus
*b
, sd_bus_message
*m
) {
901 if (m
->header
->version
> b
->message_version
)
907 return bus_message_seal(m
, ++b
->serial
);
910 static int dispatch_wqueue(sd_bus
*bus
) {
914 assert(bus
->state
== BUS_RUNNING
|| bus
->state
== BUS_HELLO
);
919 while (bus
->wqueue_size
> 0) {
921 r
= bus_socket_write_message(bus
, bus
->wqueue
[0], &bus
->windex
);
926 /* Didn't do anything this time */
928 else if (bus
->windex
>= bus
->wqueue
[0]->size
) {
929 /* Fully written. Let's drop the entry from
932 * This isn't particularly optimized, but
933 * well, this is supposed to be our worst-case
934 * buffer only, and the socket buffer is
935 * supposed to be our primary buffer, and if
936 * it got full, then all bets are off
939 sd_bus_message_unref(bus
->wqueue
[0]);
941 memmove(bus
->wqueue
, bus
->wqueue
+ 1, sizeof(sd_bus_message
*) * bus
->wqueue_size
);
951 static int dispatch_rqueue(sd_bus
*bus
, sd_bus_message
**m
) {
952 sd_bus_message
*z
= NULL
;
957 assert(bus
->state
== BUS_RUNNING
|| bus
->state
== BUS_HELLO
);
962 if (bus
->rqueue_size
> 0) {
963 /* Dispatch a queued message */
967 memmove(bus
->rqueue
, bus
->rqueue
+ 1, sizeof(sd_bus_message
*) * bus
->rqueue_size
);
971 /* Try to read a new message */
973 r
= bus_socket_read_message(bus
, &z
);
988 int sd_bus_send(sd_bus
*bus
, sd_bus_message
*m
, uint64_t *serial
) {
993 if (bus
->state
== BUS_UNSET
)
1001 r
= sd_bus_can_send(bus
, SD_BUS_TYPE_UNIX_FD
);
1008 /* If the serial number isn't kept, then we know that no reply
1010 if (!serial
&& !m
->sealed
)
1011 m
->header
->flags
|= SD_BUS_MESSAGE_NO_REPLY_EXPECTED
;
1013 r
= bus_seal_message(bus
, m
);
1017 /* If this is a reply and no reply was requested, then let's
1018 * suppress this, if we can */
1019 if (m
->dont_send
&& !serial
)
1022 if ((bus
->state
== BUS_RUNNING
|| bus
->state
== BUS_HELLO
) && bus
->wqueue_size
<= 0) {
1025 r
= bus_socket_write_message(bus
, m
, &idx
);
1029 } else if (idx
< m
->size
) {
1030 /* Wasn't fully written. So let's remember how
1031 * much was written. Note that the first entry
1032 * of the wqueue array is always allocated so
1033 * that we always can remember how much was
1035 bus
->wqueue
[0] = sd_bus_message_ref(m
);
1036 bus
->wqueue_size
= 1;
1042 /* Just append it to the queue. */
1044 if (bus
->wqueue_size
>= BUS_WQUEUE_MAX
)
1047 q
= realloc(bus
->wqueue
, sizeof(sd_bus_message
*) * (bus
->wqueue_size
+ 1));
1052 q
[bus
->wqueue_size
++] = sd_bus_message_ref(m
);
1056 *serial
= BUS_MESSAGE_SERIAL(m
);
1061 static usec_t
calc_elapse(uint64_t usec
) {
1062 if (usec
== (uint64_t) -1)
1066 usec
= BUS_DEFAULT_TIMEOUT
;
1068 return now(CLOCK_MONOTONIC
) + usec
;
1071 static int timeout_compare(const void *a
, const void *b
) {
1072 const struct reply_callback
*x
= a
, *y
= b
;
1074 if (x
->timeout
!= 0 && y
->timeout
== 0)
1077 if (x
->timeout
== 0 && y
->timeout
!= 0)
1080 if (x
->timeout
< y
->timeout
)
1083 if (x
->timeout
> y
->timeout
)
1089 int sd_bus_send_with_reply(
1092 sd_message_handler_t callback
,
1097 struct reply_callback
*c
;
1102 if (bus
->state
== BUS_UNSET
)
1110 if (m
->header
->type
!= SD_BUS_MESSAGE_TYPE_METHOD_CALL
)
1112 if (m
->header
->flags
& SD_BUS_MESSAGE_NO_REPLY_EXPECTED
)
1115 r
= hashmap_ensure_allocated(&bus
->reply_callbacks
, uint64_hash_func
, uint64_compare_func
);
1119 if (usec
!= (uint64_t) -1) {
1120 r
= prioq_ensure_allocated(&bus
->reply_callbacks_prioq
, timeout_compare
);
1125 r
= bus_seal_message(bus
, m
);
1129 c
= new(struct reply_callback
, 1);
1133 c
->callback
= callback
;
1134 c
->userdata
= userdata
;
1135 c
->serial
= BUS_MESSAGE_SERIAL(m
);
1136 c
->timeout
= calc_elapse(usec
);
1138 r
= hashmap_put(bus
->reply_callbacks
, &c
->serial
, c
);
1144 if (c
->timeout
!= 0) {
1145 r
= prioq_put(bus
->reply_callbacks_prioq
, c
, &c
->prioq_idx
);
1148 sd_bus_send_with_reply_cancel(bus
, c
->serial
);
1153 r
= sd_bus_send(bus
, m
, serial
);
1155 sd_bus_send_with_reply_cancel(bus
, c
->serial
);
1162 int sd_bus_send_with_reply_cancel(sd_bus
*bus
, uint64_t serial
) {
1163 struct reply_callback
*c
;
1170 c
= hashmap_remove(bus
->reply_callbacks
, &serial
);
1174 if (c
->timeout
!= 0)
1175 prioq_remove(bus
->reply_callbacks_prioq
, c
, &c
->prioq_idx
);
1181 int bus_ensure_running(sd_bus
*bus
) {
1188 if (bus
->state
== BUS_UNSET
)
1191 if (bus
->state
== BUS_RUNNING
)
1195 r
= sd_bus_process(bus
, NULL
);
1198 if (bus
->state
== BUS_RUNNING
)
1203 r
= sd_bus_wait(bus
, (uint64_t) -1);
1209 int sd_bus_send_with_reply_and_block(
1213 sd_bus_error
*error
,
1214 sd_bus_message
**reply
) {
1225 if (bus
->state
== BUS_UNSET
)
1229 if (m
->header
->type
!= SD_BUS_MESSAGE_TYPE_METHOD_CALL
)
1231 if (m
->header
->flags
& SD_BUS_MESSAGE_NO_REPLY_EXPECTED
)
1233 if (bus_error_is_dirty(error
))
1236 r
= bus_ensure_running(bus
);
1240 r
= sd_bus_send(bus
, m
, &serial
);
1244 timeout
= calc_elapse(usec
);
1248 sd_bus_message
*incoming
= NULL
;
1253 if (bus
->rqueue_size
>= BUS_RQUEUE_MAX
)
1256 /* Make sure there's room for queuing this
1257 * locally, before we read the message */
1259 q
= realloc(bus
->rqueue
, (bus
->rqueue_size
+ 1) * sizeof(sd_bus_message
*));
1267 r
= bus_socket_read_message(bus
, &incoming
);
1272 if (incoming
->reply_serial
== serial
) {
1273 /* Found a match! */
1275 if (incoming
->header
->type
== SD_BUS_MESSAGE_TYPE_METHOD_RETURN
) {
1280 if (incoming
->header
->type
== SD_BUS_MESSAGE_TYPE_METHOD_ERROR
) {
1283 r
= sd_bus_error_copy(error
, &incoming
->error
);
1285 sd_bus_message_unref(incoming
);
1289 k
= bus_error_to_errno(&incoming
->error
);
1290 sd_bus_message_unref(incoming
);
1294 sd_bus_message_unref(incoming
);
1298 /* There's already guaranteed to be room for
1299 * this, so need to resize things here */
1300 bus
->rqueue
[bus
->rqueue_size
++] = incoming
;
1303 /* Try to read more, right-away */
1312 n
= now(CLOCK_MONOTONIC
);
1318 left
= (uint64_t) -1;
1320 r
= bus_poll(bus
, true, left
);
1324 r
= dispatch_wqueue(bus
);
1330 int sd_bus_get_fd(sd_bus
*bus
) {
1340 int sd_bus_get_events(sd_bus
*bus
) {
1345 if (bus
->state
== BUS_UNSET
)
1350 if (bus
->state
== BUS_OPENING
)
1352 else if (bus
->state
== BUS_AUTHENTICATING
) {
1354 if (bus
->auth_index
< ELEMENTSOF(bus
->auth_iovec
))
1359 } else if (bus
->state
== BUS_RUNNING
|| bus
->state
== BUS_HELLO
) {
1360 if (bus
->rqueue_size
<= 0)
1362 if (bus
->wqueue_size
> 0)
1369 int sd_bus_get_timeout(sd_bus
*bus
, uint64_t *timeout_usec
) {
1370 struct reply_callback
*c
;
1376 if (bus
->state
== BUS_UNSET
)
1381 if (bus
->state
== BUS_AUTHENTICATING
) {
1382 *timeout_usec
= bus
->auth_timeout
;
1386 if (bus
->state
!= BUS_RUNNING
&& bus
->state
!= BUS_HELLO
)
1389 c
= prioq_peek(bus
->reply_callbacks_prioq
);
1393 *timeout_usec
= c
->timeout
;
1397 static int process_timeout(sd_bus
*bus
) {
1398 struct reply_callback
*c
;
1404 c
= prioq_peek(bus
->reply_callbacks_prioq
);
1408 n
= now(CLOCK_MONOTONIC
);
1412 assert_se(prioq_pop(bus
->reply_callbacks_prioq
) == c
);
1413 hashmap_remove(bus
->reply_callbacks
, &c
->serial
);
1415 r
= c
->callback(bus
, ETIMEDOUT
, NULL
, c
->userdata
);
1418 return r
< 0 ? r
: 1;
1421 static int process_reply(sd_bus
*bus
, sd_bus_message
*m
) {
1422 struct reply_callback
*c
;
1428 if (m
->header
->type
!= SD_BUS_MESSAGE_TYPE_METHOD_RETURN
&&
1429 m
->header
->type
!= SD_BUS_MESSAGE_TYPE_METHOD_ERROR
)
1432 c
= hashmap_remove(bus
->reply_callbacks
, &m
->reply_serial
);
1436 if (c
->timeout
!= 0)
1437 prioq_remove(bus
->reply_callbacks_prioq
, c
, &c
->prioq_idx
);
1439 r
= c
->callback(bus
, 0, m
, c
->userdata
);
1445 static int process_filter(sd_bus
*bus
, sd_bus_message
*m
) {
1446 struct filter_callback
*l
;
1449 LIST_FOREACH(callbacks
, l
, bus
->filter_callbacks
) {
1450 r
= l
->callback(bus
, 0, m
, l
->userdata
);
1458 static int process_builtin(sd_bus
*bus
, sd_bus_message
*m
) {
1459 _cleanup_bus_message_unref_ sd_bus_message
*reply
= NULL
;
1465 if (m
->header
->type
!= SD_BUS_MESSAGE_TYPE_METHOD_CALL
)
1468 if (!streq_ptr(m
->interface
, "org.freedesktop.DBus.Peer"))
1471 if (m
->header
->flags
& SD_BUS_MESSAGE_NO_REPLY_EXPECTED
)
1474 if (streq_ptr(m
->member
, "Ping"))
1475 r
= sd_bus_message_new_method_return(bus
, m
, &reply
);
1476 else if (streq_ptr(m
->member
, "GetMachineId")) {
1480 r
= sd_id128_get_machine(&id
);
1484 r
= sd_bus_message_new_method_return(bus
, m
, &reply
);
1488 r
= sd_bus_message_append(reply
, "s", sd_id128_to_string(id
, sid
));
1490 _cleanup_bus_error_free_ sd_bus_error error
= SD_BUS_ERROR_INIT
;
1492 sd_bus_error_set(&error
,
1493 "org.freedesktop.DBus.Error.UnknownMethod",
1494 "Unknown method '%s' on interface '%s'.", m
->member
, m
->interface
);
1496 r
= sd_bus_message_new_method_error(bus
, m
, &error
, &reply
);
1502 r
= sd_bus_send(bus
, reply
, NULL
);
1509 static int process_object(sd_bus
*bus
, sd_bus_message
*m
) {
1510 _cleanup_bus_error_free_ sd_bus_error error
= SD_BUS_ERROR_INIT
;
1511 _cleanup_bus_message_unref_ sd_bus_message
*reply
= NULL
;
1512 struct object_callback
*c
;
1520 if (m
->header
->type
!= SD_BUS_MESSAGE_TYPE_METHOD_CALL
)
1523 if (hashmap_isempty(bus
->object_callbacks
))
1526 c
= hashmap_get(bus
->object_callbacks
, m
->path
);
1528 r
= c
->callback(bus
, 0, m
, c
->userdata
);
1535 /* Look for fallback prefixes */
1536 p
= strdupa(m
->path
);
1540 e
= strrchr(p
, '/');
1546 c
= hashmap_get(bus
->object_callbacks
, p
);
1547 if (c
&& c
->is_fallback
) {
1548 r
= c
->callback(bus
, 0, m
, c
->userdata
);
1559 sd_bus_error_set(&error
,
1560 "org.freedesktop.DBus.Error.UnknownMethod",
1561 "Unknown method '%s' or interface '%s'.", m
->member
, m
->interface
);
1563 r
= sd_bus_message_new_method_error(bus
, m
, &error
, &reply
);
1567 r
= sd_bus_send(bus
, reply
, NULL
);
1574 static int process_message(sd_bus
*bus
, sd_bus_message
*m
) {
1580 r
= process_reply(bus
, m
);
1584 r
= process_filter(bus
, m
);
1588 r
= process_builtin(bus
, m
);
1592 return process_object(bus
, m
);
1595 static int process_running(sd_bus
*bus
, sd_bus_message
**ret
) {
1596 _cleanup_bus_message_unref_ sd_bus_message
*m
= NULL
;
1600 assert(bus
->state
== BUS_RUNNING
|| bus
->state
== BUS_HELLO
);
1602 r
= process_timeout(bus
);
1606 r
= dispatch_wqueue(bus
);
1610 r
= dispatch_rqueue(bus
, &m
);
1616 r
= process_message(bus
, m
);
1626 if (m
->header
->type
== SD_BUS_MESSAGE_TYPE_METHOD_CALL
) {
1627 _cleanup_bus_message_unref_ sd_bus_message
*reply
= NULL
;
1628 _cleanup_bus_error_free_ sd_bus_error error
= SD_BUS_ERROR_INIT
;
1630 sd_bus_error_set(&error
, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m
->path
);
1632 r
= sd_bus_message_new_method_error(bus
, m
, &error
, &reply
);
1636 r
= sd_bus_send(bus
, reply
, NULL
);
1650 int sd_bus_process(sd_bus
*bus
, sd_bus_message
**ret
) {
1653 /* Returns 0 when we didn't do anything. This should cause the
1654 * caller to invoke sd_bus_wait() before returning the next
1655 * time. Returns > 0 when we did something, which possibly
1656 * means *ret is filled in with an unprocessed message. */
1663 switch (bus
->state
) {
1669 r
= bus_socket_process_opening(bus
);
1676 case BUS_AUTHENTICATING
:
1678 r
= bus_socket_process_authenticating(bus
);
1688 return process_running(bus
, ret
);
1691 assert_not_reached("Unknown state");
1694 static int bus_poll(sd_bus
*bus
, bool need_more
, uint64_t timeout_usec
) {
1705 e
= sd_bus_get_events(bus
);
1712 r
= sd_bus_get_timeout(bus
, &until
);
1719 n
= now(CLOCK_MONOTONIC
);
1720 m
= until
> n
? until
- n
: 0;
1723 if (timeout_usec
!= (uint64_t) -1 && (m
== (uint64_t) -1 || timeout_usec
< m
))
1730 r
= ppoll(&p
, 1, m
== (uint64_t) -1 ? NULL
: timespec_store(&ts
, m
), NULL
);
1734 return r
> 0 ? 1 : 0;
1737 int sd_bus_wait(sd_bus
*bus
, uint64_t timeout_usec
) {
1741 if (bus
->state
== BUS_UNSET
)
1745 if (bus
->rqueue_size
> 0)
1748 return bus_poll(bus
, false, timeout_usec
);
1751 int sd_bus_flush(sd_bus
*bus
) {
1756 if (bus
->state
== BUS_UNSET
)
1761 r
= bus_ensure_running(bus
);
1765 if (bus
->wqueue_size
<= 0)
1769 r
= dispatch_wqueue(bus
);
1773 if (bus
->wqueue_size
<= 0)
1776 r
= bus_poll(bus
, false, (uint64_t) -1);
1782 int sd_bus_add_filter(sd_bus
*bus
, sd_message_handler_t callback
, void *userdata
) {
1783 struct filter_callback
*f
;
1790 f
= new(struct filter_callback
, 1);
1793 f
->callback
= callback
;
1794 f
->userdata
= userdata
;
1796 LIST_PREPEND(struct filter_callback
, callbacks
, bus
->filter_callbacks
, f
);
1800 int sd_bus_remove_filter(sd_bus
*bus
, sd_message_handler_t callback
, void *userdata
) {
1801 struct filter_callback
*f
;
1808 LIST_FOREACH(callbacks
, f
, bus
->filter_callbacks
) {
1809 if (f
->callback
== callback
&& f
->userdata
== userdata
) {
1810 LIST_REMOVE(struct filter_callback
, callbacks
, bus
->filter_callbacks
, f
);
1819 static int bus_add_object(
1823 sd_message_handler_t callback
,
1826 struct object_callback
*c
;
1836 r
= hashmap_ensure_allocated(&bus
->object_callbacks
, string_hash_func
, string_compare_func
);
1840 c
= new(struct object_callback
, 1);
1844 c
->path
= strdup(path
);
1850 c
->callback
= callback
;
1851 c
->userdata
= userdata
;
1852 c
->is_fallback
= fallback
;
1854 r
= hashmap_put(bus
->object_callbacks
, c
->path
, c
);
1864 static int bus_remove_object(
1868 sd_message_handler_t callback
,
1871 struct object_callback
*c
;
1880 c
= hashmap_get(bus
->object_callbacks
, path
);
1884 if (c
->callback
!= callback
|| c
->userdata
!= userdata
|| c
->is_fallback
!= fallback
)
1887 assert_se(c
== hashmap_remove(bus
->object_callbacks
, c
->path
));
1895 int sd_bus_add_object(sd_bus
*bus
, const char *path
, sd_message_handler_t callback
, void *userdata
) {
1896 return bus_add_object(bus
, false, path
, callback
, userdata
);
1899 int sd_bus_remove_object(sd_bus
*bus
, const char *path
, sd_message_handler_t callback
, void *userdata
) {
1900 return bus_remove_object(bus
, false, path
, callback
, userdata
);
1903 int sd_bus_add_fallback(sd_bus
*bus
, const char *prefix
, sd_message_handler_t callback
, void *userdata
) {
1904 return bus_add_object(bus
, true, prefix
, callback
, userdata
);
1907 int sd_bus_remove_fallback(sd_bus
*bus
, const char *prefix
, sd_message_handler_t callback
, void *userdata
) {
1908 return bus_remove_object(bus
, true, prefix
, callback
, userdata
);