1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
39 #include "bus-internal.h"
40 #include "bus-message.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
49 static int bus_poll(sd_bus
*bus
, bool need_more
, uint64_t timeout_usec
);
51 static void bus_close_fds(sd_bus
*b
) {
55 close_nointr_nofail(b
->input_fd
);
57 if (b
->output_fd
>= 0 && b
->output_fd
!= b
->input_fd
)
58 close_nointr_nofail(b
->output_fd
);
60 b
->input_fd
= b
->output_fd
= -1;
63 static void bus_node_destroy(sd_bus
*b
, struct node
*n
) {
64 struct node_callback
*c
;
65 struct node_vtable
*v
;
66 struct node_enumerator
*e
;
74 bus_node_destroy(b
, n
->child
);
76 while ((c
= n
->callbacks
)) {
77 LIST_REMOVE(callbacks
, n
->callbacks
, c
);
81 while ((v
= n
->vtables
)) {
82 LIST_REMOVE(vtables
, n
->vtables
, v
);
87 while ((e
= n
->enumerators
)) {
88 LIST_REMOVE(enumerators
, n
->enumerators
, e
);
93 LIST_REMOVE(siblings
, n
->parent
->child
, n
);
95 assert_se(hashmap_remove(b
->nodes
, n
->path
) == n
);
100 static void bus_free(sd_bus
*b
) {
101 struct filter_callback
*f
;
110 munmap(b
->kdbus_buffer
, KDBUS_POOL_SIZE
);
113 free(b
->unique_name
);
114 free(b
->auth_buffer
);
119 strv_free(b
->exec_argv
);
121 close_many(b
->fds
, b
->n_fds
);
124 for (i
= 0; i
< b
->rqueue_size
; i
++)
125 sd_bus_message_unref(b
->rqueue
[i
]);
128 for (i
= 0; i
< b
->wqueue_size
; i
++)
129 sd_bus_message_unref(b
->wqueue
[i
]);
132 hashmap_free_free(b
->reply_callbacks
);
133 prioq_free(b
->reply_callbacks_prioq
);
135 while ((f
= b
->filter_callbacks
)) {
136 LIST_REMOVE(callbacks
, b
->filter_callbacks
, f
);
140 bus_match_free(&b
->match_callbacks
);
142 hashmap_free_free(b
->vtable_methods
);
143 hashmap_free_free(b
->vtable_properties
);
145 while ((n
= hashmap_first(b
->nodes
)))
146 bus_node_destroy(b
, n
);
148 hashmap_free(b
->nodes
);
150 bus_kernel_flush_memfd(b
);
152 assert_se(pthread_mutex_destroy(&b
->memfd_cache_mutex
) == 0);
157 int sd_bus_new(sd_bus
**ret
) {
167 r
->n_ref
= REFCNT_INIT
;
168 r
->input_fd
= r
->output_fd
= -1;
169 r
->message_version
= 1;
170 r
->hello_flags
|= KDBUS_HELLO_ACCEPT_FD
;
171 r
->original_pid
= getpid();
173 assert_se(pthread_mutex_init(&r
->memfd_cache_mutex
, NULL
) == 0);
175 /* We guarantee that wqueue always has space for at least one
177 r
->wqueue
= new(sd_bus_message
*, 1);
187 int sd_bus_set_address(sd_bus
*bus
, const char *address
) {
192 if (bus
->state
!= BUS_UNSET
)
196 if (bus_pid_changed(bus
))
209 int sd_bus_set_fd(sd_bus
*bus
, int input_fd
, int output_fd
) {
212 if (bus
->state
!= BUS_UNSET
)
218 if (bus_pid_changed(bus
))
221 bus
->input_fd
= input_fd
;
222 bus
->output_fd
= output_fd
;
226 int sd_bus_set_exec(sd_bus
*bus
, const char *path
, char *const argv
[]) {
231 if (bus
->state
!= BUS_UNSET
)
235 if (strv_isempty(argv
))
237 if (bus_pid_changed(bus
))
250 free(bus
->exec_path
);
251 strv_free(bus
->exec_argv
);
259 int sd_bus_set_bus_client(sd_bus
*bus
, int b
) {
262 if (bus
->state
!= BUS_UNSET
)
264 if (bus_pid_changed(bus
))
267 bus
->bus_client
= !!b
;
271 int sd_bus_negotiate_fds(sd_bus
*bus
, int b
) {
274 if (bus
->state
!= BUS_UNSET
)
276 if (bus_pid_changed(bus
))
279 SET_FLAG(bus
->hello_flags
, KDBUS_HELLO_ACCEPT_FD
, b
);
283 int sd_bus_negotiate_attach_comm(sd_bus
*bus
, int b
) {
286 if (bus
->state
!= BUS_UNSET
)
288 if (bus_pid_changed(bus
))
291 SET_FLAG(bus
->hello_flags
, KDBUS_HELLO_ATTACH_COMM
, b
);
295 int sd_bus_negotiate_attach_exe(sd_bus
*bus
, int b
) {
298 if (bus
->state
!= BUS_UNSET
)
300 if (bus_pid_changed(bus
))
303 SET_FLAG(bus
->hello_flags
, KDBUS_HELLO_ATTACH_EXE
, b
);
307 int sd_bus_negotiate_attach_cmdline(sd_bus
*bus
, int b
) {
310 if (bus
->state
!= BUS_UNSET
)
312 if (bus_pid_changed(bus
))
315 SET_FLAG(bus
->hello_flags
, KDBUS_HELLO_ATTACH_CMDLINE
, b
);
319 int sd_bus_negotiate_attach_cgroup(sd_bus
*bus
, int b
) {
322 if (bus
->state
!= BUS_UNSET
)
324 if (bus_pid_changed(bus
))
327 SET_FLAG(bus
->hello_flags
, KDBUS_HELLO_ATTACH_CGROUP
, b
);
331 int sd_bus_negotiate_attach_caps(sd_bus
*bus
, int b
) {
334 if (bus
->state
!= BUS_UNSET
)
336 if (bus_pid_changed(bus
))
339 SET_FLAG(bus
->hello_flags
, KDBUS_HELLO_ATTACH_CAPS
, b
);
343 int sd_bus_negotiate_attach_selinux_context(sd_bus
*bus
, int b
) {
346 if (bus
->state
!= BUS_UNSET
)
348 if (bus_pid_changed(bus
))
351 SET_FLAG(bus
->hello_flags
, KDBUS_HELLO_ATTACH_SECLABEL
, b
);
355 int sd_bus_negotiate_attach_audit(sd_bus
*bus
, int b
) {
358 if (bus
->state
!= BUS_UNSET
)
360 if (bus_pid_changed(bus
))
363 SET_FLAG(bus
->hello_flags
, KDBUS_HELLO_ATTACH_AUDIT
, b
);
367 int sd_bus_set_server(sd_bus
*bus
, int b
, sd_id128_t server_id
) {
370 if (!b
&& !sd_id128_equal(server_id
, SD_ID128_NULL
))
372 if (bus
->state
!= BUS_UNSET
)
374 if (bus_pid_changed(bus
))
377 bus
->is_server
= !!b
;
378 bus
->server_id
= server_id
;
382 int sd_bus_set_anonymous(sd_bus
*bus
, int b
) {
385 if (bus
->state
!= BUS_UNSET
)
387 if (bus_pid_changed(bus
))
390 bus
->anonymous_auth
= !!b
;
394 static int hello_callback(sd_bus
*bus
, sd_bus_message
*reply
, void *userdata
) {
399 assert(bus
->state
== BUS_HELLO
);
402 r
= bus_message_to_errno(reply
);
406 r
= sd_bus_message_read(reply
, "s", &s
);
410 if (!service_name_is_valid(s
) || s
[0] != ':')
413 bus
->unique_name
= strdup(s
);
414 if (!bus
->unique_name
)
417 bus
->state
= BUS_RUNNING
;
422 static int bus_send_hello(sd_bus
*bus
) {
423 _cleanup_bus_message_unref_ sd_bus_message
*m
= NULL
;
428 if (!bus
->bus_client
|| bus
->is_kernel
)
431 r
= sd_bus_message_new_method_call(
433 "org.freedesktop.DBus",
435 "org.freedesktop.DBus",
441 return sd_bus_send_with_reply(bus
, m
, hello_callback
, NULL
, 0, &bus
->hello_serial
);
444 int bus_start_running(sd_bus
*bus
) {
447 if (bus
->bus_client
&& !bus
->is_kernel
) {
448 bus
->state
= BUS_HELLO
;
452 bus
->state
= BUS_RUNNING
;
456 static int parse_address_key(const char **p
, const char *key
, char **value
) {
467 if (strncmp(*p
, key
, l
) != 0)
480 while (*a
!= ';' && *a
!= ',' && *a
!= 0) {
498 c
= (char) ((x
<< 4) | y
);
505 t
= realloc(r
, n
+ 2);
533 static void skip_address_key(const char **p
) {
537 *p
+= strcspn(*p
, ",");
543 static int parse_unix_address(sd_bus
*b
, const char **p
, char **guid
) {
544 _cleanup_free_
char *path
= NULL
, *abstract
= NULL
;
553 while (**p
!= 0 && **p
!= ';') {
554 r
= parse_address_key(p
, "guid", guid
);
560 r
= parse_address_key(p
, "path", &path
);
566 r
= parse_address_key(p
, "abstract", &abstract
);
575 if (!path
&& !abstract
)
578 if (path
&& abstract
)
583 if (l
> sizeof(b
->sockaddr
.un
.sun_path
))
586 b
->sockaddr
.un
.sun_family
= AF_UNIX
;
587 strncpy(b
->sockaddr
.un
.sun_path
, path
, sizeof(b
->sockaddr
.un
.sun_path
));
588 b
->sockaddr_size
= offsetof(struct sockaddr_un
, sun_path
) + l
;
589 } else if (abstract
) {
590 l
= strlen(abstract
);
591 if (l
> sizeof(b
->sockaddr
.un
.sun_path
) - 1)
594 b
->sockaddr
.un
.sun_family
= AF_UNIX
;
595 b
->sockaddr
.un
.sun_path
[0] = 0;
596 strncpy(b
->sockaddr
.un
.sun_path
+1, abstract
, sizeof(b
->sockaddr
.un
.sun_path
)-1);
597 b
->sockaddr_size
= offsetof(struct sockaddr_un
, sun_path
) + 1 + l
;
603 static int parse_tcp_address(sd_bus
*b
, const char **p
, char **guid
) {
604 _cleanup_free_
char *host
= NULL
, *port
= NULL
, *family
= NULL
;
606 struct addrinfo
*result
, hints
= {
607 .ai_socktype
= SOCK_STREAM
,
608 .ai_flags
= AI_ADDRCONFIG
,
616 while (**p
!= 0 && **p
!= ';') {
617 r
= parse_address_key(p
, "guid", guid
);
623 r
= parse_address_key(p
, "host", &host
);
629 r
= parse_address_key(p
, "port", &port
);
635 r
= parse_address_key(p
, "family", &family
);
648 if (streq(family
, "ipv4"))
649 hints
.ai_family
= AF_INET
;
650 else if (streq(family
, "ipv6"))
651 hints
.ai_family
= AF_INET6
;
656 r
= getaddrinfo(host
, port
, &hints
, &result
);
660 return -EADDRNOTAVAIL
;
662 memcpy(&b
->sockaddr
, result
->ai_addr
, result
->ai_addrlen
);
663 b
->sockaddr_size
= result
->ai_addrlen
;
665 freeaddrinfo(result
);
670 static int parse_exec_address(sd_bus
*b
, const char **p
, char **guid
) {
672 unsigned n_argv
= 0, j
;
681 while (**p
!= 0 && **p
!= ';') {
682 r
= parse_address_key(p
, "guid", guid
);
688 r
= parse_address_key(p
, "path", &path
);
694 if (startswith(*p
, "argv")) {
698 ul
= strtoul(*p
+ 4, (char**) p
, 10);
699 if (errno
> 0 || **p
!= '=' || ul
> 256) {
709 x
= realloc(argv
, sizeof(char*) * (ul
+ 2));
715 memset(x
+ n_argv
, 0, sizeof(char*) * (ul
- n_argv
+ 2));
721 r
= parse_address_key(p
, NULL
, argv
+ ul
);
736 /* Make sure there are no holes in the array, with the
737 * exception of argv[0] */
738 for (j
= 1; j
< n_argv
; j
++)
744 if (argv
&& argv
[0] == NULL
) {
745 argv
[0] = strdup(path
);
757 for (j
= 0; j
< n_argv
; j
++)
765 static int parse_kernel_address(sd_bus
*b
, const char **p
, char **guid
) {
766 _cleanup_free_
char *path
= NULL
;
774 while (**p
!= 0 && **p
!= ';') {
775 r
= parse_address_key(p
, "guid", guid
);
781 r
= parse_address_key(p
, "path", &path
);
800 static void bus_reset_parsed_address(sd_bus
*b
) {
804 b
->sockaddr_size
= 0;
805 strv_free(b
->exec_argv
);
809 b
->server_id
= SD_ID128_NULL
;
814 static int bus_parse_next_address(sd_bus
*b
) {
815 _cleanup_free_
char *guid
= NULL
;
823 if (b
->address
[b
->address_index
] == 0)
826 bus_reset_parsed_address(b
);
828 a
= b
->address
+ b
->address_index
;
837 if (startswith(a
, "unix:")) {
840 r
= parse_unix_address(b
, &a
, &guid
);
845 } else if (startswith(a
, "tcp:")) {
848 r
= parse_tcp_address(b
, &a
, &guid
);
854 } else if (startswith(a
, "unixexec:")) {
857 r
= parse_exec_address(b
, &a
, &guid
);
863 } else if (startswith(a
, "kernel:")) {
866 r
= parse_kernel_address(b
, &a
, &guid
);
879 r
= sd_id128_from_string(guid
, &b
->server_id
);
884 b
->address_index
= a
- b
->address
;
888 static int bus_start_address(sd_bus
*b
) {
896 if (b
->sockaddr
.sa
.sa_family
!= AF_UNSPEC
) {
898 r
= bus_socket_connect(b
);
902 b
->last_connect_error
= -r
;
904 } else if (b
->exec_path
) {
906 r
= bus_socket_exec(b
);
910 b
->last_connect_error
= -r
;
911 } else if (b
->kernel
) {
913 r
= bus_kernel_connect(b
);
917 b
->last_connect_error
= -r
;
920 r
= bus_parse_next_address(b
);
924 return b
->last_connect_error
? -b
->last_connect_error
: -ECONNREFUSED
;
928 int bus_next_address(sd_bus
*b
) {
931 bus_reset_parsed_address(b
);
932 return bus_start_address(b
);
935 static int bus_start_fd(sd_bus
*b
) {
940 assert(b
->input_fd
>= 0);
941 assert(b
->output_fd
>= 0);
943 r
= fd_nonblock(b
->input_fd
, true);
947 r
= fd_cloexec(b
->input_fd
, true);
951 if (b
->input_fd
!= b
->output_fd
) {
952 r
= fd_nonblock(b
->output_fd
, true);
956 r
= fd_cloexec(b
->output_fd
, true);
961 if (fstat(b
->input_fd
, &st
) < 0)
964 if (S_ISCHR(b
->input_fd
))
965 return bus_kernel_take_fd(b
);
967 return bus_socket_take_fd(b
);
970 int sd_bus_start(sd_bus
*bus
) {
975 if (bus
->state
!= BUS_UNSET
)
977 if (bus_pid_changed(bus
))
980 bus
->state
= BUS_OPENING
;
982 if (bus
->is_server
&& bus
->bus_client
)
985 if (bus
->input_fd
>= 0)
986 r
= bus_start_fd(bus
);
987 else if (bus
->address
|| bus
->sockaddr
.sa
.sa_family
!= AF_UNSPEC
|| bus
->exec_path
|| bus
->kernel
)
988 r
= bus_start_address(bus
);
995 return bus_send_hello(bus
);
998 int sd_bus_open_system(sd_bus
**ret
) {
1010 e
= secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1012 r
= sd_bus_set_address(b
, e
);
1016 b
->sockaddr
.un
.sun_family
= AF_UNIX
;
1017 strncpy(b
->sockaddr
.un
.sun_path
, "/run/dbus/system_bus_socket", sizeof(b
->sockaddr
.un
.sun_path
));
1018 b
->sockaddr_size
= offsetof(struct sockaddr_un
, sun_path
) + sizeof("/run/dbus/system_bus_socket") - 1;
1021 b
->bus_client
= true;
1023 r
= sd_bus_start(b
);
1035 int sd_bus_open_user(sd_bus
**ret
) {
1048 e
= secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1050 r
= sd_bus_set_address(b
, e
);
1054 e
= secure_getenv("XDG_RUNTIME_DIR");
1061 if (l
+ 4 > sizeof(b
->sockaddr
.un
.sun_path
)) {
1066 b
->sockaddr
.un
.sun_family
= AF_UNIX
;
1067 memcpy(mempcpy(b
->sockaddr
.un
.sun_path
, e
, l
), "/bus", 4);
1068 b
->sockaddr_size
= offsetof(struct sockaddr_un
, sun_path
) + l
+ 4;
1071 b
->bus_client
= true;
1073 r
= sd_bus_start(b
);
1085 void sd_bus_close(sd_bus
*bus
) {
1088 if (bus
->state
== BUS_CLOSED
)
1090 if (bus_pid_changed(bus
))
1093 bus
->state
= BUS_CLOSED
;
1095 if (!bus
->is_kernel
)
1098 /* We'll leave the fd open in case this is a kernel bus, since
1099 * there might still be memblocks around that reference this
1100 * bus, and they might need to invoke the
1101 * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
1105 sd_bus
*sd_bus_ref(sd_bus
*bus
) {
1109 assert_se(REFCNT_INC(bus
->n_ref
) >= 2);
1114 sd_bus
*sd_bus_unref(sd_bus
*bus
) {
1118 if (REFCNT_DEC(bus
->n_ref
) <= 0)
1124 int sd_bus_is_open(sd_bus
*bus
) {
1127 if (bus_pid_changed(bus
))
1130 return BUS_IS_OPEN(bus
->state
);
1133 int sd_bus_can_send(sd_bus
*bus
, char type
) {
1138 if (bus
->state
== BUS_UNSET
)
1140 if (bus_pid_changed(bus
))
1143 if (type
== SD_BUS_TYPE_UNIX_FD
) {
1144 if (!(bus
->hello_flags
& KDBUS_HELLO_ACCEPT_FD
))
1147 r
= bus_ensure_running(bus
);
1151 return bus
->can_fds
;
1154 return bus_type_is_valid(type
);
1157 int sd_bus_get_server_id(sd_bus
*bus
, sd_id128_t
*server_id
) {
1164 if (bus_pid_changed(bus
))
1167 r
= bus_ensure_running(bus
);
1171 *server_id
= bus
->server_id
;
1175 static int bus_seal_message(sd_bus
*b
, sd_bus_message
*m
) {
1178 if (m
->header
->version
> b
->message_version
)
1184 return bus_message_seal(m
, ++b
->serial
);
1187 static int dispatch_wqueue(sd_bus
*bus
) {
1191 assert(bus
->state
== BUS_RUNNING
|| bus
->state
== BUS_HELLO
);
1193 while (bus
->wqueue_size
> 0) {
1196 r
= bus_kernel_write_message(bus
, bus
->wqueue
[0]);
1198 r
= bus_socket_write_message(bus
, bus
->wqueue
[0], &bus
->windex
);
1204 /* Didn't do anything this time */
1206 else if (bus
->is_kernel
|| bus
->windex
>= BUS_MESSAGE_SIZE(bus
->wqueue
[0])) {
1207 /* Fully written. Let's drop the entry from
1210 * This isn't particularly optimized, but
1211 * well, this is supposed to be our worst-case
1212 * buffer only, and the socket buffer is
1213 * supposed to be our primary buffer, and if
1214 * it got full, then all bets are off
1217 sd_bus_message_unref(bus
->wqueue
[0]);
1218 bus
->wqueue_size
--;
1219 memmove(bus
->wqueue
, bus
->wqueue
+ 1, sizeof(sd_bus_message
*) * bus
->wqueue_size
);
1229 static int dispatch_rqueue(sd_bus
*bus
, sd_bus_message
**m
) {
1230 sd_bus_message
*z
= NULL
;
1235 assert(bus
->state
== BUS_RUNNING
|| bus
->state
== BUS_HELLO
);
1237 if (bus
->rqueue_size
> 0) {
1238 /* Dispatch a queued message */
1240 *m
= bus
->rqueue
[0];
1241 bus
->rqueue_size
--;
1242 memmove(bus
->rqueue
, bus
->rqueue
+ 1, sizeof(sd_bus_message
*) * bus
->rqueue_size
);
1246 /* Try to read a new message */
1249 r
= bus_kernel_read_message(bus
, &z
);
1251 r
= bus_socket_read_message(bus
, &z
);
1267 int sd_bus_send(sd_bus
*bus
, sd_bus_message
*m
, uint64_t *serial
) {
1272 if (!BUS_IS_OPEN(bus
->state
))
1276 if (bus_pid_changed(bus
))
1280 r
= sd_bus_can_send(bus
, SD_BUS_TYPE_UNIX_FD
);
1287 /* If the serial number isn't kept, then we know that no reply
1289 if (!serial
&& !m
->sealed
)
1290 m
->header
->flags
|= SD_BUS_MESSAGE_NO_REPLY_EXPECTED
;
1292 r
= bus_seal_message(bus
, m
);
1296 /* If this is a reply and no reply was requested, then let's
1297 * suppress this, if we can */
1298 if (m
->dont_send
&& !serial
)
1301 if ((bus
->state
== BUS_RUNNING
|| bus
->state
== BUS_HELLO
) && bus
->wqueue_size
<= 0) {
1305 r
= bus_kernel_write_message(bus
, m
);
1307 r
= bus_socket_write_message(bus
, m
, &idx
);
1312 } else if (!bus
->is_kernel
&& idx
< BUS_MESSAGE_SIZE(m
)) {
1313 /* Wasn't fully written. So let's remember how
1314 * much was written. Note that the first entry
1315 * of the wqueue array is always allocated so
1316 * that we always can remember how much was
1318 bus
->wqueue
[0] = sd_bus_message_ref(m
);
1319 bus
->wqueue_size
= 1;
1325 /* Just append it to the queue. */
1327 if (bus
->wqueue_size
>= BUS_WQUEUE_MAX
)
1330 q
= realloc(bus
->wqueue
, sizeof(sd_bus_message
*) * (bus
->wqueue_size
+ 1));
1335 q
[bus
->wqueue_size
++] = sd_bus_message_ref(m
);
1339 *serial
= BUS_MESSAGE_SERIAL(m
);
1344 static usec_t
calc_elapse(uint64_t usec
) {
1345 if (usec
== (uint64_t) -1)
1349 usec
= BUS_DEFAULT_TIMEOUT
;
1351 return now(CLOCK_MONOTONIC
) + usec
;
1354 static int timeout_compare(const void *a
, const void *b
) {
1355 const struct reply_callback
*x
= a
, *y
= b
;
1357 if (x
->timeout
!= 0 && y
->timeout
== 0)
1360 if (x
->timeout
== 0 && y
->timeout
!= 0)
1363 if (x
->timeout
< y
->timeout
)
1366 if (x
->timeout
> y
->timeout
)
1372 int sd_bus_send_with_reply(
1375 sd_bus_message_handler_t callback
,
1380 struct reply_callback
*c
;
1385 if (!BUS_IS_OPEN(bus
->state
))
1391 if (m
->header
->type
!= SD_BUS_MESSAGE_TYPE_METHOD_CALL
)
1393 if (m
->header
->flags
& SD_BUS_MESSAGE_NO_REPLY_EXPECTED
)
1395 if (bus_pid_changed(bus
))
1398 r
= hashmap_ensure_allocated(&bus
->reply_callbacks
, uint64_hash_func
, uint64_compare_func
);
1402 if (usec
!= (uint64_t) -1) {
1403 r
= prioq_ensure_allocated(&bus
->reply_callbacks_prioq
, timeout_compare
);
1408 r
= bus_seal_message(bus
, m
);
1412 c
= new0(struct reply_callback
, 1);
1416 c
->callback
= callback
;
1417 c
->userdata
= userdata
;
1418 c
->serial
= BUS_MESSAGE_SERIAL(m
);
1419 c
->timeout
= calc_elapse(usec
);
1421 r
= hashmap_put(bus
->reply_callbacks
, &c
->serial
, c
);
1427 if (c
->timeout
!= 0) {
1428 r
= prioq_put(bus
->reply_callbacks_prioq
, c
, &c
->prioq_idx
);
1431 sd_bus_send_with_reply_cancel(bus
, c
->serial
);
1436 r
= sd_bus_send(bus
, m
, serial
);
1438 sd_bus_send_with_reply_cancel(bus
, c
->serial
);
1445 int sd_bus_send_with_reply_cancel(sd_bus
*bus
, uint64_t serial
) {
1446 struct reply_callback
*c
;
1452 if (bus_pid_changed(bus
))
1455 c
= hashmap_remove(bus
->reply_callbacks
, &serial
);
1459 if (c
->timeout
!= 0)
1460 prioq_remove(bus
->reply_callbacks_prioq
, c
, &c
->prioq_idx
);
1466 int bus_ensure_running(sd_bus
*bus
) {
1471 if (bus
->state
== BUS_UNSET
|| bus
->state
== BUS_CLOSED
)
1473 if (bus
->state
== BUS_RUNNING
)
1477 r
= sd_bus_process(bus
, NULL
);
1480 if (bus
->state
== BUS_RUNNING
)
1485 r
= sd_bus_wait(bus
, (uint64_t) -1);
1491 int sd_bus_send_with_reply_and_block(
1495 sd_bus_error
*error
,
1496 sd_bus_message
**reply
) {
1505 if (!BUS_IS_OPEN(bus
->state
))
1509 if (m
->header
->type
!= SD_BUS_MESSAGE_TYPE_METHOD_CALL
)
1511 if (m
->header
->flags
& SD_BUS_MESSAGE_NO_REPLY_EXPECTED
)
1513 if (bus_error_is_dirty(error
))
1515 if (bus_pid_changed(bus
))
1518 r
= bus_ensure_running(bus
);
1522 r
= sd_bus_send(bus
, m
, &serial
);
1526 timeout
= calc_elapse(usec
);
1530 sd_bus_message
*incoming
= NULL
;
1535 if (bus
->rqueue_size
>= BUS_RQUEUE_MAX
)
1538 /* Make sure there's room for queuing this
1539 * locally, before we read the message */
1541 q
= realloc(bus
->rqueue
, (bus
->rqueue_size
+ 1) * sizeof(sd_bus_message
*));
1550 r
= bus_kernel_read_message(bus
, &incoming
);
1552 r
= bus_socket_read_message(bus
, &incoming
);
1557 if (incoming
->reply_serial
== serial
) {
1558 /* Found a match! */
1560 if (incoming
->header
->type
== SD_BUS_MESSAGE_TYPE_METHOD_RETURN
) {
1565 sd_bus_message_unref(incoming
);
1570 if (incoming
->header
->type
== SD_BUS_MESSAGE_TYPE_METHOD_ERROR
) {
1573 r
= sd_bus_error_copy(error
, &incoming
->error
);
1575 sd_bus_message_unref(incoming
);
1579 k
= bus_error_to_errno(&incoming
->error
);
1580 sd_bus_message_unref(incoming
);
1584 sd_bus_message_unref(incoming
);
1588 /* There's already guaranteed to be room for
1589 * this, so need to resize things here */
1590 bus
->rqueue
[bus
->rqueue_size
++] = incoming
;
1593 /* Try to read more, right-away */
1602 n
= now(CLOCK_MONOTONIC
);
1608 left
= (uint64_t) -1;
1610 r
= bus_poll(bus
, true, left
);
1614 r
= dispatch_wqueue(bus
);
1620 int sd_bus_get_fd(sd_bus
*bus
) {
1623 if (!BUS_IS_OPEN(bus
->state
))
1625 if (bus
->input_fd
!= bus
->output_fd
)
1627 if (bus_pid_changed(bus
))
1630 return bus
->input_fd
;
1633 int sd_bus_get_events(sd_bus
*bus
) {
1638 if (!BUS_IS_OPEN(bus
->state
))
1640 if (bus_pid_changed(bus
))
1643 if (bus
->state
== BUS_OPENING
)
1645 else if (bus
->state
== BUS_AUTHENTICATING
) {
1647 if (bus_socket_auth_needs_write(bus
))
1652 } else if (bus
->state
== BUS_RUNNING
|| bus
->state
== BUS_HELLO
) {
1653 if (bus
->rqueue_size
<= 0)
1655 if (bus
->wqueue_size
> 0)
1662 int sd_bus_get_timeout(sd_bus
*bus
, uint64_t *timeout_usec
) {
1663 struct reply_callback
*c
;
1669 if (!BUS_IS_OPEN(bus
->state
))
1671 if (bus_pid_changed(bus
))
1674 if (bus
->state
== BUS_AUTHENTICATING
) {
1675 *timeout_usec
= bus
->auth_timeout
;
1679 if (bus
->state
!= BUS_RUNNING
&& bus
->state
!= BUS_HELLO
) {
1680 *timeout_usec
= (uint64_t) -1;
1684 c
= prioq_peek(bus
->reply_callbacks_prioq
);
1686 *timeout_usec
= (uint64_t) -1;
1690 *timeout_usec
= c
->timeout
;
1694 static int process_timeout(sd_bus
*bus
) {
1695 _cleanup_bus_message_unref_ sd_bus_message
* m
= NULL
;
1696 struct reply_callback
*c
;
1702 c
= prioq_peek(bus
->reply_callbacks_prioq
);
1706 n
= now(CLOCK_MONOTONIC
);
1710 r
= bus_message_new_synthetic_error(
1713 &SD_BUS_ERROR_MAKE("org.freedesktop.DBus.Error.Timeout", "Timed out"),
1718 assert_se(prioq_pop(bus
->reply_callbacks_prioq
) == c
);
1719 hashmap_remove(bus
->reply_callbacks
, &c
->serial
);
1721 r
= c
->callback(bus
, m
, c
->userdata
);
1724 return r
< 0 ? r
: 1;
1727 static int process_hello(sd_bus
*bus
, sd_bus_message
*m
) {
1731 if (bus
->state
!= BUS_HELLO
)
1734 /* Let's make sure the first message on the bus is the HELLO
1735 * reply. But note that we don't actually parse the message
1736 * here (we leave that to the usual handling), we just verify
1737 * we don't let any earlier msg through. */
1739 if (m
->header
->type
!= SD_BUS_MESSAGE_TYPE_METHOD_RETURN
&&
1740 m
->header
->type
!= SD_BUS_MESSAGE_TYPE_METHOD_ERROR
)
1743 if (m
->reply_serial
!= bus
->hello_serial
)
1749 static int process_reply(sd_bus
*bus
, sd_bus_message
*m
) {
1750 struct reply_callback
*c
;
1756 if (m
->header
->type
!= SD_BUS_MESSAGE_TYPE_METHOD_RETURN
&&
1757 m
->header
->type
!= SD_BUS_MESSAGE_TYPE_METHOD_ERROR
)
1760 c
= hashmap_remove(bus
->reply_callbacks
, &m
->reply_serial
);
1764 if (c
->timeout
!= 0)
1765 prioq_remove(bus
->reply_callbacks_prioq
, c
, &c
->prioq_idx
);
1767 r
= sd_bus_message_rewind(m
, true);
1771 r
= c
->callback(bus
, m
, c
->userdata
);
1777 static int process_filter(sd_bus
*bus
, sd_bus_message
*m
) {
1778 struct filter_callback
*l
;
1785 bus
->filter_callbacks_modified
= false;
1787 LIST_FOREACH(callbacks
, l
, bus
->filter_callbacks
) {
1789 if (bus
->filter_callbacks_modified
)
1792 /* Don't run this more than once per iteration */
1793 if (l
->last_iteration
== bus
->iteration_counter
)
1796 l
->last_iteration
= bus
->iteration_counter
;
1798 r
= sd_bus_message_rewind(m
, true);
1802 r
= l
->callback(bus
, m
, l
->userdata
);
1808 } while (bus
->filter_callbacks_modified
);
1813 static int process_match(sd_bus
*bus
, sd_bus_message
*m
) {
1820 bus
->match_callbacks_modified
= false;
1822 r
= bus_match_run(bus
, &bus
->match_callbacks
, m
);
1826 } while (bus
->match_callbacks_modified
);
1831 static int process_builtin(sd_bus
*bus
, sd_bus_message
*m
) {
1832 _cleanup_bus_message_unref_ sd_bus_message
*reply
= NULL
;
1838 if (m
->header
->type
!= SD_BUS_MESSAGE_TYPE_METHOD_CALL
)
1841 if (!streq_ptr(m
->interface
, "org.freedesktop.DBus.Peer"))
1844 if (m
->header
->flags
& SD_BUS_MESSAGE_NO_REPLY_EXPECTED
)
1847 if (streq_ptr(m
->member
, "Ping"))
1848 r
= sd_bus_message_new_method_return(bus
, m
, &reply
);
1849 else if (streq_ptr(m
->member
, "GetMachineId")) {
1853 r
= sd_id128_get_machine(&id
);
1857 r
= sd_bus_message_new_method_return(bus
, m
, &reply
);
1861 r
= sd_bus_message_append(reply
, "s", sd_id128_to_string(id
, sid
));
1863 r
= sd_bus_message_new_method_errorf(
1865 "org.freedesktop.DBus.Error.UnknownMethod",
1866 "Unknown method '%s' on interface '%s'.", m
->member
, m
->interface
);
1872 r
= sd_bus_send(bus
, reply
, NULL
);
1879 static int process_message(sd_bus
*bus
, sd_bus_message
*m
) {
1885 bus
->iteration_counter
++;
1887 r
= process_hello(bus
, m
);
1891 r
= process_reply(bus
, m
);
1895 r
= process_filter(bus
, m
);
1899 r
= process_match(bus
, m
);
1903 r
= process_builtin(bus
, m
);
1907 return bus_process_object(bus
, m
);
1910 static int process_running(sd_bus
*bus
, sd_bus_message
**ret
) {
1911 _cleanup_bus_message_unref_ sd_bus_message
*m
= NULL
;
1915 assert(bus
->state
== BUS_RUNNING
|| bus
->state
== BUS_HELLO
);
1917 r
= process_timeout(bus
);
1921 r
= dispatch_wqueue(bus
);
1925 r
= dispatch_rqueue(bus
, &m
);
1931 r
= process_message(bus
, m
);
1936 r
= sd_bus_message_rewind(m
, true);
1945 if (m
->header
->type
== SD_BUS_MESSAGE_TYPE_METHOD_CALL
) {
1947 r
= sd_bus_reply_method_errorf(
1949 "org.freedesktop.DBus.Error.UnknownObject",
1950 "Unknown object '%s'.", m
->path
);
1964 int sd_bus_process(sd_bus
*bus
, sd_bus_message
**ret
) {
1967 /* Returns 0 when we didn't do anything. This should cause the
1968 * caller to invoke sd_bus_wait() before returning the next
1969 * time. Returns > 0 when we did something, which possibly
1970 * means *ret is filled in with an unprocessed message. */
1974 if (bus_pid_changed(bus
))
1977 /* We don't allow recursively invoking sd_bus_process(). */
1978 if (bus
->processing
)
1981 switch (bus
->state
) {
1988 r
= bus_socket_process_opening(bus
);
1995 case BUS_AUTHENTICATING
:
1997 r
= bus_socket_process_authenticating(bus
);
2007 bus
->processing
= true;
2008 r
= process_running(bus
, ret
);
2009 bus
->processing
= false;
2014 assert_not_reached("Unknown state");
2017 static int bus_poll(sd_bus
*bus
, bool need_more
, uint64_t timeout_usec
) {
2018 struct pollfd p
[2] = {};
2025 if (!BUS_IS_OPEN(bus
->state
))
2028 e
= sd_bus_get_events(bus
);
2035 r
= sd_bus_get_timeout(bus
, &until
);
2042 nw
= now(CLOCK_MONOTONIC
);
2043 m
= until
> nw
? until
- nw
: 0;
2046 if (timeout_usec
!= (uint64_t) -1 && (m
== (uint64_t) -1 || timeout_usec
< m
))
2049 p
[0].fd
= bus
->input_fd
;
2050 if (bus
->output_fd
== bus
->input_fd
) {
2054 p
[0].events
= e
& POLLIN
;
2055 p
[1].fd
= bus
->output_fd
;
2056 p
[1].events
= e
& POLLOUT
;
2060 r
= ppoll(p
, n
, m
== (uint64_t) -1 ? NULL
: timespec_store(&ts
, m
), NULL
);
2064 return r
> 0 ? 1 : 0;
2067 int sd_bus_wait(sd_bus
*bus
, uint64_t timeout_usec
) {
2071 if (!BUS_IS_OPEN(bus
->state
))
2073 if (bus_pid_changed(bus
))
2076 if (bus
->rqueue_size
> 0)
2079 return bus_poll(bus
, false, timeout_usec
);
2082 int sd_bus_flush(sd_bus
*bus
) {
2087 if (!BUS_IS_OPEN(bus
->state
))
2089 if (bus_pid_changed(bus
))
2092 r
= bus_ensure_running(bus
);
2096 if (bus
->wqueue_size
<= 0)
2100 r
= dispatch_wqueue(bus
);
2104 if (bus
->wqueue_size
<= 0)
2107 r
= bus_poll(bus
, false, (uint64_t) -1);
2113 int sd_bus_add_filter(sd_bus
*bus
, sd_bus_message_handler_t callback
, void *userdata
) {
2114 struct filter_callback
*f
;
2120 if (bus_pid_changed(bus
))
2123 f
= new0(struct filter_callback
, 1);
2126 f
->callback
= callback
;
2127 f
->userdata
= userdata
;
2129 bus
->filter_callbacks_modified
= true;
2130 LIST_PREPEND(callbacks
, bus
->filter_callbacks
, f
);
2134 int sd_bus_remove_filter(sd_bus
*bus
, sd_bus_message_handler_t callback
, void *userdata
) {
2135 struct filter_callback
*f
;
2141 if (bus_pid_changed(bus
))
2144 LIST_FOREACH(callbacks
, f
, bus
->filter_callbacks
) {
2145 if (f
->callback
== callback
&& f
->userdata
== userdata
) {
2146 bus
->filter_callbacks_modified
= true;
2147 LIST_REMOVE(callbacks
, bus
->filter_callbacks
, f
);
2156 int sd_bus_add_match(sd_bus
*bus
, const char *match
, sd_bus_message_handler_t callback
, void *userdata
) {
2157 struct bus_match_component
*components
= NULL
;
2158 unsigned n_components
= 0;
2159 uint64_t cookie
= 0;
2166 if (bus_pid_changed(bus
))
2169 r
= bus_match_parse(match
, &components
, &n_components
);
2173 if (bus
->bus_client
) {
2174 cookie
= ++bus
->match_cookie
;
2176 r
= bus_add_match_internal(bus
, match
, components
, n_components
, cookie
);
2181 bus
->match_callbacks_modified
= true;
2182 r
= bus_match_add(&bus
->match_callbacks
, components
, n_components
, callback
, userdata
, cookie
, NULL
);
2184 if (bus
->bus_client
)
2185 bus_remove_match_internal(bus
, match
, cookie
);
2189 bus_match_parse_free(components
, n_components
);
2193 int sd_bus_remove_match(sd_bus
*bus
, const char *match
, sd_bus_message_handler_t callback
, void *userdata
) {
2194 struct bus_match_component
*components
= NULL
;
2195 unsigned n_components
= 0;
2197 uint64_t cookie
= 0;
2203 if (bus_pid_changed(bus
))
2206 r
= bus_match_parse(match
, &components
, &n_components
);
2210 bus
->match_callbacks_modified
= true;
2211 r
= bus_match_remove(&bus
->match_callbacks
, components
, n_components
, callback
, userdata
, &cookie
);
2213 if (bus
->bus_client
)
2214 q
= bus_remove_match_internal(bus
, match
, cookie
);
2216 bus_match_parse_free(components
, n_components
);
2218 return r
< 0 ? r
: q
;
2221 bool bus_pid_changed(sd_bus
*bus
) {
2224 /* We don't support people creating a bus connection and
2225 * keeping it around over a fork(). Let's complain. */
2227 return bus
->original_pid
!= getpid();