return ret;
}
-static int bus_read_message(sd_bus *bus) {
+static int bus_read_message(sd_bus *bus, bool hint_priority, int64_t priority) {
assert(bus);
if (bus->is_kernel)
- return bus_kernel_read_message(bus);
+ return bus_kernel_read_message(bus, hint_priority, priority);
else
return bus_socket_read_message(bus);
}
return 0;
}
-static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
+static int dispatch_rqueue(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **m) {
int r, ret = 0;
assert(bus);
assert(m);
assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
+ /* Note that the priority logic is only available on kdbus,
+ * where the rqueue is unused. We check the rqueue here
+ * anyway, because it's simple... */
+
for (;;) {
if (bus->rqueue_size > 0) {
/* Dispatch a queued message */
}
/* Try to read a new message */
- r = bus_read_message(bus);
+ r = bus_read_message(bus, hint_priority, priority);
if (r < 0)
return r;
if (r == 0)
i++;
}
- r = bus_read_message(bus);
+ r = bus_read_message(bus, false, 0);
if (r < 0) {
if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
bus_enter_closing(bus);
return r;
}
-static int process_running(sd_bus *bus, sd_bus_message **ret) {
+static int process_running(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **ret) {
_cleanup_bus_message_unref_ sd_bus_message *m = NULL;
int r;
if (r != 0)
goto null_message;
- r = dispatch_rqueue(bus, &m);
+ r = dispatch_rqueue(bus, hint_priority, priority, &m);
if (r < 0)
return r;
if (!m)
return r;
}
-_public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
+static int bus_process_internal(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **ret) {
BUS_DONT_DESTROY(bus);
int r;
case BUS_RUNNING:
case BUS_HELLO:
- r = process_running(bus, ret);
+ r = process_running(bus, hint_priority, priority, ret);
if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
bus_enter_closing(bus);
r = 1;
assert_not_reached("Unknown state");
}
+_public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
+ return bus_process_internal(bus, false, 0, ret);
+}
+
+_public_ int sd_bus_process_priority(sd_bus *bus, int64_t priority, sd_bus_message **ret) {
+ return bus_process_internal(bus, true, priority, ret);
+}
+
static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
struct pollfd p[2] = {};
int r, e, n;