1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
36 #include "bus-internal.h"
37 #include "bus-message.h"
39 #include "bus-socket.h"
41 static int bus_poll(sd_bus
*bus
, bool need_more
, uint64_t timeout_usec
);
43 static void bus_free(sd_bus
*b
) {
44 struct filter_callback
*f
;
45 struct object_callback
*c
;
51 close_nointr_nofail(b
->fd
);
59 strv_free(b
->exec_argv
);
61 close_many(b
->fds
, b
->n_fds
);
64 for (i
= 0; i
< b
->rqueue_size
; i
++)
65 sd_bus_message_unref(b
->rqueue
[i
]);
68 for (i
= 0; i
< b
->wqueue_size
; i
++)
69 sd_bus_message_unref(b
->wqueue
[i
]);
72 hashmap_free_free(b
->reply_callbacks
);
73 prioq_free(b
->reply_callbacks_prioq
);
75 while ((f
= b
->filter_callbacks
)) {
76 LIST_REMOVE(struct filter_callback
, callbacks
, b
->filter_callbacks
, f
);
80 while ((c
= hashmap_steal_first(b
->object_callbacks
))) {
85 hashmap_free(b
->object_callbacks
);
90 int sd_bus_new(sd_bus
**ret
) {
102 r
->message_version
= 1;
103 r
->negotiate_fds
= true;
105 /* We guarantee that wqueue always has space for at least one
107 r
->wqueue
= new(sd_bus_message
*, 1);
117 int sd_bus_set_address(sd_bus
*bus
, const char *address
) {
122 if (bus
->state
!= BUS_UNSET
)
137 int sd_bus_set_fd(sd_bus
*bus
, int fd
) {
140 if (bus
->state
!= BUS_UNSET
)
149 int sd_bus_set_exec(sd_bus
*bus
, const char *path
, char *const argv
[]) {
154 if (bus
->state
!= BUS_UNSET
)
158 if (strv_isempty(argv
))
171 free(bus
->exec_path
);
172 strv_free(bus
->exec_argv
);
180 int sd_bus_set_bus_client(sd_bus
*bus
, int b
) {
183 if (bus
->state
!= BUS_UNSET
)
186 bus
->bus_client
= !!b
;
190 int sd_bus_set_negotiate_fds(sd_bus
*bus
, int b
) {
193 if (bus
->state
!= BUS_UNSET
)
196 bus
->negotiate_fds
= !!b
;
200 static int hello_callback(sd_bus
*bus
, int error
, sd_bus_message
*reply
, void *userdata
) {
205 assert(bus
->state
== BUS_HELLO
);
212 r
= sd_bus_message_read(reply
, "s", &s
);
216 if (!service_name_is_valid(s
) || s
[0] != ':')
219 bus
->unique_name
= strdup(s
);
220 if (!bus
->unique_name
)
223 bus
->state
= BUS_RUNNING
;
228 static int bus_send_hello(sd_bus
*bus
) {
229 _cleanup_bus_message_unref_ sd_bus_message
*m
= NULL
;
234 if (!bus
->bus_client
)
237 r
= sd_bus_message_new_method_call(
239 "org.freedesktop.DBus",
241 "org.freedesktop.DBus",
247 return sd_bus_send_with_reply(bus
, m
, hello_callback
, NULL
, 0, &bus
->hello_serial
);
250 int bus_start_running(sd_bus
*bus
) {
253 if (bus
->bus_client
) {
254 bus
->state
= BUS_HELLO
;
258 bus
->state
= BUS_RUNNING
;
262 static int parse_address_key(const char **p
, const char *key
, char **value
) {
273 if (strncmp(*p
, key
, l
) != 0)
286 while (*a
!= ';' && *a
!= ',' && *a
!= 0) {
304 c
= (char) ((x
<< 4) | y
);
311 t
= realloc(r
, n
+ 2);
339 static void skip_address_key(const char **p
) {
343 *p
+= strcspn(*p
, ",");
349 static int parse_unix_address(sd_bus
*b
, const char **p
, char **guid
) {
350 _cleanup_free_
char *path
= NULL
, *abstract
= NULL
;
359 while (**p
!= 0 && **p
!= ';') {
360 r
= parse_address_key(p
, "guid", guid
);
366 r
= parse_address_key(p
, "path", &path
);
372 r
= parse_address_key(p
, "abstract", &abstract
);
381 if (!path
&& !abstract
)
384 if (path
&& abstract
)
389 if (l
> sizeof(b
->sockaddr
.un
.sun_path
))
392 b
->sockaddr
.un
.sun_family
= AF_UNIX
;
393 strncpy(b
->sockaddr
.un
.sun_path
, path
, sizeof(b
->sockaddr
.un
.sun_path
));
394 b
->sockaddr_size
= offsetof(struct sockaddr_un
, sun_path
) + l
;
395 } else if (abstract
) {
396 l
= strlen(abstract
);
397 if (l
> sizeof(b
->sockaddr
.un
.sun_path
) - 1)
400 b
->sockaddr
.un
.sun_family
= AF_UNIX
;
401 b
->sockaddr
.un
.sun_path
[0] = 0;
402 strncpy(b
->sockaddr
.un
.sun_path
+1, abstract
, sizeof(b
->sockaddr
.un
.sun_path
)-1);
403 b
->sockaddr_size
= offsetof(struct sockaddr_un
, sun_path
) + 1 + l
;
409 static int parse_tcp_address(sd_bus
*b
, const char **p
, char **guid
) {
410 _cleanup_free_
char *host
= NULL
, *port
= NULL
, *family
= NULL
;
411 struct addrinfo hints
, *result
;
419 while (**p
!= 0 && **p
!= ';') {
420 r
= parse_address_key(p
, "guid", guid
);
426 r
= parse_address_key(p
, "host", &host
);
432 r
= parse_address_key(p
, "port", &port
);
438 r
= parse_address_key(p
, "family", &family
);
451 hints
.ai_socktype
= SOCK_STREAM
;
452 hints
.ai_flags
= AI_ADDRCONFIG
;
455 if (streq(family
, "ipv4"))
456 hints
.ai_family
= AF_INET
;
457 else if (streq(family
, "ipv6"))
458 hints
.ai_family
= AF_INET6
;
463 r
= getaddrinfo(host
, port
, &hints
, &result
);
467 return -EADDRNOTAVAIL
;
469 memcpy(&b
->sockaddr
, result
->ai_addr
, result
->ai_addrlen
);
470 b
->sockaddr_size
= result
->ai_addrlen
;
472 freeaddrinfo(result
);
477 static int parse_exec_address(sd_bus
*b
, const char **p
, char **guid
) {
479 unsigned n_argv
= 0, j
;
488 while (**p
!= 0 && **p
!= ';') {
489 r
= parse_address_key(p
, "guid", guid
);
495 r
= parse_address_key(p
, "path", &path
);
501 if (startswith(*p
, "argv")) {
505 ul
= strtoul(*p
+ 4, (char**) p
, 10);
506 if (errno
> 0 || **p
!= '=' || ul
> 256) {
516 x
= realloc(argv
, sizeof(char*) * (ul
+ 2));
522 memset(x
+ n_argv
, 0, sizeof(char*) * (ul
- n_argv
+ 2));
528 r
= parse_address_key(p
, NULL
, argv
+ ul
);
543 /* Make sure there are no holes in the array, with the
544 * exception of argv[0] */
545 for (j
= 1; j
< n_argv
; j
++)
551 if (argv
&& argv
[0] == NULL
) {
552 argv
[0] = strdup(path
);
564 for (j
= 0; j
< n_argv
; j
++)
572 static void bus_reset_parsed_address(sd_bus
*b
) {
576 b
->sockaddr_size
= 0;
577 strv_free(b
->exec_argv
);
581 b
->peer
= SD_ID128_NULL
;
584 static int bus_parse_next_address(sd_bus
*b
) {
585 _cleanup_free_
char *guid
= NULL
;
593 if (b
->address
[b
->address_index
] == 0)
596 bus_reset_parsed_address(b
);
598 a
= b
->address
+ b
->address_index
;
607 if (startswith(a
, "unix:")) {
610 r
= parse_unix_address(b
, &a
, &guid
);
615 } else if (startswith(a
, "tcp:")) {
618 r
= parse_tcp_address(b
, &a
, &guid
);
624 } else if (startswith(a
, "unixexec:")) {
627 r
= parse_exec_address(b
, &a
, &guid
);
641 r
= sd_id128_from_string(guid
, &b
->peer
);
646 b
->address_index
= a
- b
->address
;
650 static int bus_start_address(sd_bus
*b
) {
657 close_nointr_nofail(b
->fd
);
661 if (b
->sockaddr
.sa
.sa_family
!= AF_UNSPEC
) {
663 r
= bus_socket_connect(b
);
667 b
->last_connect_error
= -r
;
669 } else if (b
->exec_path
) {
671 r
= bus_socket_exec(b
);
675 b
->last_connect_error
= -r
;
678 r
= bus_parse_next_address(b
);
682 return b
->last_connect_error
? -b
->last_connect_error
: -ECONNREFUSED
;
686 int bus_next_address(sd_bus
*b
) {
689 bus_reset_parsed_address(b
);
690 return bus_start_address(b
);
693 static int bus_start_fd(sd_bus
*b
) {
698 r
= fd_nonblock(b
->fd
, true);
702 r
= fd_cloexec(b
->fd
, true);
706 return bus_socket_take_fd(b
);
709 int sd_bus_start(sd_bus
*bus
) {
714 if (bus
->state
!= BUS_UNSET
)
717 bus
->state
= BUS_OPENING
;
720 r
= bus_start_fd(bus
);
721 else if (bus
->address
)
722 r
= bus_start_address(bus
);
729 return bus_send_hello(bus
);
732 int sd_bus_open_system(sd_bus
**ret
) {
744 e
= getenv("DBUS_SYSTEM_BUS_ADDRESS");
746 r
= sd_bus_set_address(b
, e
);
750 b
->sockaddr
.un
.sun_family
= AF_UNIX
;
751 strncpy(b
->sockaddr
.un
.sun_path
, "/run/dbus/system_bus_socket", sizeof(b
->sockaddr
.un
.sun_path
));
752 b
->sockaddr_size
= offsetof(struct sockaddr_un
, sun_path
) + sizeof("/run/dbus/system_bus_socket") - 1;
755 b
->bus_client
= true;
769 int sd_bus_open_user(sd_bus
**ret
) {
782 e
= getenv("DBUS_SESSION_BUS_ADDRESS");
784 r
= sd_bus_set_address(b
, e
);
788 e
= getenv("XDG_RUNTIME_DIR");
795 if (l
+ 4 > sizeof(b
->sockaddr
.un
.sun_path
)) {
800 b
->sockaddr
.un
.sun_family
= AF_UNIX
;
801 memcpy(mempcpy(b
->sockaddr
.un
.sun_path
, e
, l
), "/bus", 4);
802 b
->sockaddr_size
= offsetof(struct sockaddr_un
, sun_path
) + l
+ 4;
805 b
->bus_client
= true;
819 void sd_bus_close(sd_bus
*bus
) {
825 close_nointr_nofail(bus
->fd
);
829 sd_bus
*sd_bus_ref(sd_bus
*bus
) {
833 assert(bus
->n_ref
> 0);
839 sd_bus
*sd_bus_unref(sd_bus
*bus
) {
843 assert(bus
->n_ref
> 0);
852 int sd_bus_is_open(sd_bus
*bus
) {
856 return bus
->state
!= BUS_UNSET
&& bus
->fd
>= 0;
859 int sd_bus_can_send(sd_bus
*bus
, char type
) {
867 if (type
== SD_BUS_TYPE_UNIX_FD
) {
868 if (!bus
->negotiate_fds
)
871 r
= bus_ensure_running(bus
);
878 return bus_type_is_valid(type
);
881 int sd_bus_get_peer(sd_bus
*bus
, sd_id128_t
*peer
) {
889 r
= bus_ensure_running(bus
);
897 static int bus_seal_message(sd_bus
*b
, sd_bus_message
*m
) {
900 if (m
->header
->version
> b
->message_version
)
906 return bus_message_seal(m
, ++b
->serial
);
909 static int dispatch_wqueue(sd_bus
*bus
) {
913 assert(bus
->state
== BUS_RUNNING
|| bus
->state
== BUS_HELLO
);
918 while (bus
->wqueue_size
> 0) {
920 r
= bus_socket_write_message(bus
, bus
->wqueue
[0], &bus
->windex
);
925 /* Didn't do anything this time */
927 else if (bus
->windex
>= bus
->wqueue
[0]->size
) {
928 /* Fully written. Let's drop the entry from
931 * This isn't particularly optimized, but
932 * well, this is supposed to be our worst-case
933 * buffer only, and the socket buffer is
934 * supposed to be our primary buffer, and if
935 * it got full, then all bets are off
938 sd_bus_message_unref(bus
->wqueue
[0]);
940 memmove(bus
->wqueue
, bus
->wqueue
+ 1, sizeof(sd_bus_message
*) * bus
->wqueue_size
);
950 static int dispatch_rqueue(sd_bus
*bus
, sd_bus_message
**m
) {
951 sd_bus_message
*z
= NULL
;
956 assert(bus
->state
== BUS_RUNNING
|| bus
->state
== BUS_HELLO
);
961 if (bus
->rqueue_size
> 0) {
962 /* Dispatch a queued message */
966 memmove(bus
->rqueue
, bus
->rqueue
+ 1, sizeof(sd_bus_message
*) * bus
->rqueue_size
);
970 /* Try to read a new message */
972 r
= bus_socket_read_message(bus
, &z
);
987 int sd_bus_send(sd_bus
*bus
, sd_bus_message
*m
, uint64_t *serial
) {
992 if (bus
->state
== BUS_UNSET
)
1000 r
= sd_bus_can_send(bus
, SD_BUS_TYPE_UNIX_FD
);
1007 /* If the serial number isn't kept, then we know that no reply
1009 if (!serial
&& !m
->sealed
)
1010 m
->header
->flags
|= SD_BUS_MESSAGE_NO_REPLY_EXPECTED
;
1012 r
= bus_seal_message(bus
, m
);
1016 /* If this is a reply and no reply was requested, then let's
1017 * suppress this, if we can */
1018 if (m
->dont_send
&& !serial
)
1021 if ((bus
->state
== BUS_RUNNING
|| bus
->state
== BUS_HELLO
) && bus
->wqueue_size
<= 0) {
1024 r
= bus_socket_write_message(bus
, m
, &idx
);
1028 } else if (idx
< m
->size
) {
1029 /* Wasn't fully written. So let's remember how
1030 * much was written. Note that the first entry
1031 * of the wqueue array is always allocated so
1032 * that we always can remember how much was
1034 bus
->wqueue
[0] = sd_bus_message_ref(m
);
1035 bus
->wqueue_size
= 1;
1041 /* Just append it to the queue. */
1043 if (bus
->wqueue_size
>= BUS_WQUEUE_MAX
)
1046 q
= realloc(bus
->wqueue
, sizeof(sd_bus_message
*) * (bus
->wqueue_size
+ 1));
1051 q
[bus
->wqueue_size
++] = sd_bus_message_ref(m
);
1055 *serial
= BUS_MESSAGE_SERIAL(m
);
1060 static usec_t
calc_elapse(uint64_t usec
) {
1061 if (usec
== (uint64_t) -1)
1065 usec
= BUS_DEFAULT_TIMEOUT
;
1067 return now(CLOCK_MONOTONIC
) + usec
;
1070 static int timeout_compare(const void *a
, const void *b
) {
1071 const struct reply_callback
*x
= a
, *y
= b
;
1073 if (x
->timeout
!= 0 && y
->timeout
== 0)
1076 if (x
->timeout
== 0 && y
->timeout
!= 0)
1079 if (x
->timeout
< y
->timeout
)
1082 if (x
->timeout
> y
->timeout
)
1088 int sd_bus_send_with_reply(
1091 sd_message_handler_t callback
,
1096 struct reply_callback
*c
;
1101 if (bus
->state
== BUS_UNSET
)
1109 if (m
->header
->type
!= SD_BUS_MESSAGE_TYPE_METHOD_CALL
)
1111 if (m
->header
->flags
& SD_BUS_MESSAGE_NO_REPLY_EXPECTED
)
1114 r
= hashmap_ensure_allocated(&bus
->reply_callbacks
, uint64_hash_func
, uint64_compare_func
);
1118 if (usec
!= (uint64_t) -1) {
1119 r
= prioq_ensure_allocated(&bus
->reply_callbacks_prioq
, timeout_compare
);
1124 r
= bus_seal_message(bus
, m
);
1128 c
= new(struct reply_callback
, 1);
1132 c
->callback
= callback
;
1133 c
->userdata
= userdata
;
1134 c
->serial
= BUS_MESSAGE_SERIAL(m
);
1135 c
->timeout
= calc_elapse(usec
);
1137 r
= hashmap_put(bus
->reply_callbacks
, &c
->serial
, c
);
1143 if (c
->timeout
!= 0) {
1144 r
= prioq_put(bus
->reply_callbacks_prioq
, c
, &c
->prioq_idx
);
1147 sd_bus_send_with_reply_cancel(bus
, c
->serial
);
1152 r
= sd_bus_send(bus
, m
, serial
);
1154 sd_bus_send_with_reply_cancel(bus
, c
->serial
);
1161 int sd_bus_send_with_reply_cancel(sd_bus
*bus
, uint64_t serial
) {
1162 struct reply_callback
*c
;
1169 c
= hashmap_remove(bus
->reply_callbacks
, &serial
);
1173 if (c
->timeout
!= 0)
1174 prioq_remove(bus
->reply_callbacks_prioq
, c
, &c
->prioq_idx
);
1180 int bus_ensure_running(sd_bus
*bus
) {
1187 if (bus
->state
== BUS_UNSET
)
1190 if (bus
->state
== BUS_RUNNING
)
1194 r
= sd_bus_process(bus
, NULL
);
1197 if (bus
->state
== BUS_RUNNING
)
1202 r
= sd_bus_wait(bus
, (uint64_t) -1);
1208 int sd_bus_send_with_reply_and_block(
1212 sd_bus_error
*error
,
1213 sd_bus_message
**reply
) {
1224 if (bus
->state
== BUS_UNSET
)
1228 if (m
->header
->type
!= SD_BUS_MESSAGE_TYPE_METHOD_CALL
)
1230 if (m
->header
->flags
& SD_BUS_MESSAGE_NO_REPLY_EXPECTED
)
1232 if (bus_error_is_dirty(error
))
1235 r
= bus_ensure_running(bus
);
1239 r
= sd_bus_send(bus
, m
, &serial
);
1243 timeout
= calc_elapse(usec
);
1247 sd_bus_message
*incoming
= NULL
;
1252 if (bus
->rqueue_size
>= BUS_RQUEUE_MAX
)
1255 /* Make sure there's room for queuing this
1256 * locally, before we read the message */
1258 q
= realloc(bus
->rqueue
, (bus
->rqueue_size
+ 1) * sizeof(sd_bus_message
*));
1266 r
= bus_socket_read_message(bus
, &incoming
);
1271 if (incoming
->reply_serial
== serial
) {
1272 /* Found a match! */
1274 if (incoming
->header
->type
== SD_BUS_MESSAGE_TYPE_METHOD_RETURN
) {
1279 if (incoming
->header
->type
== SD_BUS_MESSAGE_TYPE_METHOD_ERROR
) {
1282 r
= sd_bus_error_copy(error
, &incoming
->error
);
1284 sd_bus_message_unref(incoming
);
1288 k
= bus_error_to_errno(&incoming
->error
);
1289 sd_bus_message_unref(incoming
);
1293 sd_bus_message_unref(incoming
);
1297 /* There's already guaranteed to be room for
1298 * this, so need to resize things here */
1299 bus
->rqueue
[bus
->rqueue_size
++] = incoming
;
1302 /* Try to read more, right-away */
1311 n
= now(CLOCK_MONOTONIC
);
1317 left
= (uint64_t) -1;
1319 r
= bus_poll(bus
, true, left
);
1323 r
= dispatch_wqueue(bus
);
1329 int sd_bus_get_fd(sd_bus
*bus
) {
1339 int sd_bus_get_events(sd_bus
*bus
) {
1344 if (bus
->state
== BUS_UNSET
)
1349 if (bus
->state
== BUS_OPENING
)
1351 else if (bus
->state
== BUS_AUTHENTICATING
) {
1353 if (bus
->auth_index
< ELEMENTSOF(bus
->auth_iovec
))
1358 } else if (bus
->state
== BUS_RUNNING
|| bus
->state
== BUS_HELLO
) {
1359 if (bus
->rqueue_size
<= 0)
1361 if (bus
->wqueue_size
> 0)
1368 int sd_bus_get_timeout(sd_bus
*bus
, uint64_t *timeout_usec
) {
1369 struct reply_callback
*c
;
1375 if (bus
->state
== BUS_UNSET
)
1380 if (bus
->state
== BUS_AUTHENTICATING
) {
1381 *timeout_usec
= bus
->auth_timeout
;
1385 if (bus
->state
!= BUS_RUNNING
&& bus
->state
!= BUS_HELLO
)
1388 c
= prioq_peek(bus
->reply_callbacks_prioq
);
1392 *timeout_usec
= c
->timeout
;
1396 static int process_timeout(sd_bus
*bus
) {
1397 struct reply_callback
*c
;
1403 c
= prioq_peek(bus
->reply_callbacks_prioq
);
1407 n
= now(CLOCK_MONOTONIC
);
1411 assert_se(prioq_pop(bus
->reply_callbacks_prioq
) == c
);
1412 hashmap_remove(bus
->reply_callbacks
, &c
->serial
);
1414 r
= c
->callback(bus
, ETIMEDOUT
, NULL
, c
->userdata
);
1417 return r
< 0 ? r
: 1;
1420 static int process_hello(sd_bus
*bus
, sd_bus_message
*m
) {
1424 if (bus
->state
!= BUS_HELLO
)
1427 /* Let's make sure the first message on the bus is the HELLO
1428 * reply. But note that we don't actually parse the message
1429 * here (we leave that to the usual reply handling), we just
1430 * verify we don't let any earlier msg through. */
1432 if (m
->header
->type
!= SD_BUS_MESSAGE_TYPE_METHOD_RETURN
&&
1433 m
->header
->type
!= SD_BUS_MESSAGE_TYPE_METHOD_ERROR
)
1436 if (m
->reply_serial
!= bus
->hello_serial
)
1442 static int process_reply(sd_bus
*bus
, sd_bus_message
*m
) {
1443 struct reply_callback
*c
;
1449 if (m
->header
->type
!= SD_BUS_MESSAGE_TYPE_METHOD_RETURN
&&
1450 m
->header
->type
!= SD_BUS_MESSAGE_TYPE_METHOD_ERROR
)
1453 c
= hashmap_remove(bus
->reply_callbacks
, &m
->reply_serial
);
1457 if (c
->timeout
!= 0)
1458 prioq_remove(bus
->reply_callbacks_prioq
, c
, &c
->prioq_idx
);
1460 r
= c
->callback(bus
, 0, m
, c
->userdata
);
1466 static int process_filter(sd_bus
*bus
, sd_bus_message
*m
) {
1467 struct filter_callback
*l
;
1470 LIST_FOREACH(callbacks
, l
, bus
->filter_callbacks
) {
1471 r
= l
->callback(bus
, 0, m
, l
->userdata
);
1479 static int process_builtin(sd_bus
*bus
, sd_bus_message
*m
) {
1480 _cleanup_bus_message_unref_ sd_bus_message
*reply
= NULL
;
1486 if (m
->header
->type
!= SD_BUS_MESSAGE_TYPE_METHOD_CALL
)
1489 if (!streq_ptr(m
->interface
, "org.freedesktop.DBus.Peer"))
1492 if (m
->header
->flags
& SD_BUS_MESSAGE_NO_REPLY_EXPECTED
)
1495 if (streq_ptr(m
->member
, "Ping"))
1496 r
= sd_bus_message_new_method_return(bus
, m
, &reply
);
1497 else if (streq_ptr(m
->member
, "GetMachineId")) {
1501 r
= sd_id128_get_machine(&id
);
1505 r
= sd_bus_message_new_method_return(bus
, m
, &reply
);
1509 r
= sd_bus_message_append(reply
, "s", sd_id128_to_string(id
, sid
));
1511 _cleanup_bus_error_free_ sd_bus_error error
= SD_BUS_ERROR_INIT
;
1513 sd_bus_error_set(&error
,
1514 "org.freedesktop.DBus.Error.UnknownMethod",
1515 "Unknown method '%s' on interface '%s'.", m
->member
, m
->interface
);
1517 r
= sd_bus_message_new_method_error(bus
, m
, &error
, &reply
);
1523 r
= sd_bus_send(bus
, reply
, NULL
);
1530 static int process_object(sd_bus
*bus
, sd_bus_message
*m
) {
1531 _cleanup_bus_error_free_ sd_bus_error error
= SD_BUS_ERROR_INIT
;
1532 _cleanup_bus_message_unref_ sd_bus_message
*reply
= NULL
;
1533 struct object_callback
*c
;
1541 if (m
->header
->type
!= SD_BUS_MESSAGE_TYPE_METHOD_CALL
)
1544 if (hashmap_isempty(bus
->object_callbacks
))
1547 c
= hashmap_get(bus
->object_callbacks
, m
->path
);
1549 r
= c
->callback(bus
, 0, m
, c
->userdata
);
1556 /* Look for fallback prefixes */
1557 p
= strdupa(m
->path
);
1561 e
= strrchr(p
, '/');
1567 c
= hashmap_get(bus
->object_callbacks
, p
);
1568 if (c
&& c
->is_fallback
) {
1569 r
= c
->callback(bus
, 0, m
, c
->userdata
);
1577 /* We found some handlers but none wanted to take this, then
1578 * return this -- with one exception, we can handle
1579 * introspection minimally ourselves */
1580 if (!found
|| sd_bus_message_is_method_call(m
, "org.freedesktop.DBus.Introspectable", "Introspect"))
1583 sd_bus_error_set(&error
,
1584 "org.freedesktop.DBus.Error.UnknownMethod",
1585 "Unknown method '%s' or interface '%s'.", m
->member
, m
->interface
);
1587 r
= sd_bus_message_new_method_error(bus
, m
, &error
, &reply
);
1591 r
= sd_bus_send(bus
, reply
, NULL
);
1598 static int process_introspect(sd_bus
*bus
, sd_bus_message
*m
) {
1599 _cleanup_bus_message_unref_ sd_bus_message
*reply
= NULL
;
1600 _cleanup_free_
char *introspection
= NULL
;
1601 _cleanup_set_free_free_ Set
*s
= NULL
;
1602 _cleanup_fclose_
FILE *f
= NULL
;
1603 struct object_callback
*c
;
1612 if (!sd_bus_message_is_method_call(m
, "org.freedesktop.DBus.Introspectable", "Introspect"))
1618 s
= set_new(string_hash_func
, string_compare_func
);
1622 HASHMAP_FOREACH(c
, bus
->object_callbacks
, i
) {
1626 if (streq(c
->path
, "/"))
1629 if (streq(m
->path
, "/"))
1632 e
= startswith(c
->path
, m
->path
);
1633 if (!e
|| *e
!= '/')
1654 f
= open_memstream(&introspection
, &size
);
1658 fputs(SD_BUS_INTROSPECT_DOCTYPE
, f
);
1659 fputs("<node>\n", f
);
1660 fputs(SD_BUS_INTROSPECT_INTERFACE_PEER
, f
);
1661 fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE
, f
);
1663 while ((node
= set_steal_first(s
))) {
1664 fprintf(f
, " <node name=\"%s\"/>\n", node
);
1668 fputs("</node>\n", f
);
1675 r
= sd_bus_message_new_method_return(bus
, m
, &reply
);
1679 r
= sd_bus_message_append(reply
, "s", introspection
);
1683 r
= sd_bus_send(bus
, reply
, NULL
);
1690 static int process_message(sd_bus
*bus
, sd_bus_message
*m
) {
1696 r
= process_hello(bus
, m
);
1700 r
= process_reply(bus
, m
);
1704 r
= process_filter(bus
, m
);
1708 r
= process_builtin(bus
, m
);
1712 r
= process_object(bus
, m
);
1716 return process_introspect(bus
, m
);
1719 static int process_running(sd_bus
*bus
, sd_bus_message
**ret
) {
1720 _cleanup_bus_message_unref_ sd_bus_message
*m
= NULL
;
1724 assert(bus
->state
== BUS_RUNNING
|| bus
->state
== BUS_HELLO
);
1726 r
= process_timeout(bus
);
1730 r
= dispatch_wqueue(bus
);
1734 r
= dispatch_rqueue(bus
, &m
);
1740 r
= process_message(bus
, m
);
1750 if (m
->header
->type
== SD_BUS_MESSAGE_TYPE_METHOD_CALL
) {
1751 _cleanup_bus_message_unref_ sd_bus_message
*reply
= NULL
;
1752 _cleanup_bus_error_free_ sd_bus_error error
= SD_BUS_ERROR_INIT
;
1754 sd_bus_error_set(&error
, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m
->path
);
1756 r
= sd_bus_message_new_method_error(bus
, m
, &error
, &reply
);
1760 r
= sd_bus_send(bus
, reply
, NULL
);
1774 int sd_bus_process(sd_bus
*bus
, sd_bus_message
**ret
) {
1777 /* Returns 0 when we didn't do anything. This should cause the
1778 * caller to invoke sd_bus_wait() before returning the next
1779 * time. Returns > 0 when we did something, which possibly
1780 * means *ret is filled in with an unprocessed message. */
1787 switch (bus
->state
) {
1793 r
= bus_socket_process_opening(bus
);
1800 case BUS_AUTHENTICATING
:
1802 r
= bus_socket_process_authenticating(bus
);
1812 return process_running(bus
, ret
);
1815 assert_not_reached("Unknown state");
1818 static int bus_poll(sd_bus
*bus
, bool need_more
, uint64_t timeout_usec
) {
1829 e
= sd_bus_get_events(bus
);
1836 r
= sd_bus_get_timeout(bus
, &until
);
1843 n
= now(CLOCK_MONOTONIC
);
1844 m
= until
> n
? until
- n
: 0;
1847 if (timeout_usec
!= (uint64_t) -1 && (m
== (uint64_t) -1 || timeout_usec
< m
))
1854 r
= ppoll(&p
, 1, m
== (uint64_t) -1 ? NULL
: timespec_store(&ts
, m
), NULL
);
1858 return r
> 0 ? 1 : 0;
1861 int sd_bus_wait(sd_bus
*bus
, uint64_t timeout_usec
) {
1865 if (bus
->state
== BUS_UNSET
)
1869 if (bus
->rqueue_size
> 0)
1872 return bus_poll(bus
, false, timeout_usec
);
1875 int sd_bus_flush(sd_bus
*bus
) {
1880 if (bus
->state
== BUS_UNSET
)
1885 r
= bus_ensure_running(bus
);
1889 if (bus
->wqueue_size
<= 0)
1893 r
= dispatch_wqueue(bus
);
1897 if (bus
->wqueue_size
<= 0)
1900 r
= bus_poll(bus
, false, (uint64_t) -1);
1906 int sd_bus_add_filter(sd_bus
*bus
, sd_message_handler_t callback
, void *userdata
) {
1907 struct filter_callback
*f
;
1914 f
= new(struct filter_callback
, 1);
1917 f
->callback
= callback
;
1918 f
->userdata
= userdata
;
1920 LIST_PREPEND(struct filter_callback
, callbacks
, bus
->filter_callbacks
, f
);
1924 int sd_bus_remove_filter(sd_bus
*bus
, sd_message_handler_t callback
, void *userdata
) {
1925 struct filter_callback
*f
;
1932 LIST_FOREACH(callbacks
, f
, bus
->filter_callbacks
) {
1933 if (f
->callback
== callback
&& f
->userdata
== userdata
) {
1934 LIST_REMOVE(struct filter_callback
, callbacks
, bus
->filter_callbacks
, f
);
1943 static int bus_add_object(
1947 sd_message_handler_t callback
,
1950 struct object_callback
*c
;
1960 r
= hashmap_ensure_allocated(&bus
->object_callbacks
, string_hash_func
, string_compare_func
);
1964 c
= new(struct object_callback
, 1);
1968 c
->path
= strdup(path
);
1974 c
->callback
= callback
;
1975 c
->userdata
= userdata
;
1976 c
->is_fallback
= fallback
;
1978 r
= hashmap_put(bus
->object_callbacks
, c
->path
, c
);
1988 static int bus_remove_object(
1992 sd_message_handler_t callback
,
1995 struct object_callback
*c
;
2004 c
= hashmap_get(bus
->object_callbacks
, path
);
2008 if (c
->callback
!= callback
|| c
->userdata
!= userdata
|| c
->is_fallback
!= fallback
)
2011 assert_se(c
== hashmap_remove(bus
->object_callbacks
, c
->path
));
2019 int sd_bus_add_object(sd_bus
*bus
, const char *path
, sd_message_handler_t callback
, void *userdata
) {
2020 return bus_add_object(bus
, false, path
, callback
, userdata
);
2023 int sd_bus_remove_object(sd_bus
*bus
, const char *path
, sd_message_handler_t callback
, void *userdata
) {
2024 return bus_remove_object(bus
, false, path
, callback
, userdata
);
2027 int sd_bus_add_fallback(sd_bus
*bus
, const char *prefix
, sd_message_handler_t callback
, void *userdata
) {
2028 return bus_add_object(bus
, true, prefix
, callback
, userdata
);
2031 int sd_bus_remove_fallback(sd_bus
*bus
, const char *prefix
, sd_message_handler_t callback
, void *userdata
) {
2032 return bus_remove_object(bus
, true, prefix
, callback
, userdata
);