From e0a085811de1d1976c019afcfc2e4e74f590cc9f Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Tue, 13 Feb 2018 18:30:34 +0100 Subject: [PATCH] core: don't process dbus unit and job queue when there are already too many messages pending We maintain a queue of units and jobs that we are supposed to generate change/new notifications for because they were either just created or some of their property has changed. Let's throttle processing of this queue a bit: as soon as > 1K of bus messages are queued for writing let's skip processing the queue, and then recheck on the next iteration again. Moreover, never process more than 100 units in one go, return to the event loop after that. Both limits together should put effective limits on both space and time usage of the function, delaying further operations until a later moment, when the queue is empty or the the event loop is sufficiently idle again. This should keep the number of generated messages much lower than before on busy systems or where some client is hanging. Note that this also means a bad client can slow down message dispatching substantially for up to 90s if it likes to, for all clients. But that should be acceptable as we only allow trusted bus clients, anyway. Fixes: #8166 --- src/core/dbus.c | 31 ++++++++++++++++++++++++++++ src/core/dbus.h | 2 ++ src/core/manager.c | 49 ++++++++++++++++++++++++++++++++++++-------- src/systemd/sd-bus.h | 3 +++ 4 files changed, 76 insertions(+), 9 deletions(-) diff --git a/src/core/dbus.c b/src/core/dbus.c index d023c8715c2..56b43adcdaa 100644 --- a/src/core/dbus.c +++ b/src/core/dbus.c @@ -1279,3 +1279,34 @@ int bus_verify_reload_daemon_async(Manager *m, sd_bus_message *call, sd_bus_erro int bus_verify_set_environment_async(Manager *m, sd_bus_message *call, sd_bus_error *error) { return bus_verify_polkit_async(call, CAP_SYS_ADMIN, "org.freedesktop.systemd1.set-environment", NULL, false, UID_INVALID, &m->polkit_registry, error); } + +uint64_t manager_bus_n_queued_write(Manager *m) { + uint64_t c = 0; + Iterator i; + sd_bus *b; + int r; + + /* Returns the total number of messages queued for writing on all our direct and API busses. */ + + SET_FOREACH(b, m->private_buses, i) { + uint64_t k; + + r = sd_bus_get_n_queued_write(b, &k); + if (r < 0) + log_debug_errno(r, "Failed to query queued messages for private bus: %m"); + else + c += k; + } + + if (m->api_bus) { + uint64_t k; + + r = sd_bus_get_n_queued_write(m->api_bus, &k); + if (r < 0) + log_debug_errno(r, "Failed to query queued messages for API bus: %m"); + else + c += k; + } + + return c; +} diff --git a/src/core/dbus.h b/src/core/dbus.h index 3051ec06658..702bc48ae18 100644 --- a/src/core/dbus.h +++ b/src/core/dbus.h @@ -48,3 +48,5 @@ int bus_verify_reload_daemon_async(Manager *m, sd_bus_message *call, sd_bus_erro int bus_verify_set_environment_async(Manager *m, sd_bus_message *call, sd_bus_error *error); int bus_forward_agent_released(Manager *m, const char *path); + +uint64_t manager_bus_n_queued_write(Manager *m); diff --git a/src/core/manager.c b/src/core/manager.c index ebac47024c7..84adb9c6660 100644 --- a/src/core/manager.c +++ b/src/core/manager.c @@ -102,6 +102,13 @@ #define JOBS_IN_PROGRESS_PERIOD_USEC (USEC_PER_SEC / 3) #define JOBS_IN_PROGRESS_PERIOD_DIVISOR 3 +/* If there are more than 1K bus messages queue across our API and direct busses, then let's not add more on top until + * the queue gets more empty. */ +#define MANAGER_BUS_BUSY_THRESHOLD 1024LU + +/* How many units and jobs to process of the bus queue before returning to the event loop. */ +#define MANAGER_BUS_MESSAGE_BUDGET 100U + static int manager_dispatch_notify_fd(sd_event_source *source, int fd, uint32_t revents, void *userdata); static int manager_dispatch_cgroups_agent_fd(sd_event_source *source, int fd, uint32_t revents, void *userdata); static int manager_dispatch_signal_fd(sd_event_source *source, int fd, uint32_t revents, void *userdata); @@ -1886,41 +1893,65 @@ static int manager_dispatch_run_queue(sd_event_source *source, void *userdata) { } static unsigned manager_dispatch_dbus_queue(Manager *m) { - Job *j; + unsigned n = 0, budget; Unit *u; - unsigned n = 0; + Job *j; assert(m); if (m->dispatching_dbus_queue) return 0; + /* Anything to do at all? */ + if (!m->dbus_unit_queue && !m->dbus_job_queue && !m->send_reloading_done && !m->queued_message) + return 0; + + /* Do we have overly many messages queued at the moment? If so, let's not enqueue more on top, let's sit this + * cycle out, and process things in a later cycle when the queues got a bit emptier. */ + if (manager_bus_n_queued_write(m) > MANAGER_BUS_BUSY_THRESHOLD) + return 0; + + /* Only process a certain number of units/jobs per event loop iteration. Even if the bus queue wasn't overly + * full before this call we shouldn't increase it in size too wildly in one step, and we shouldn't monopolize + * CPU time with generating these messages. Note the difference in counting of this "budget" and the + * "threshold" above: the "budget" is decreased only once per generated message, regardless how many + * busses/direct connections it is enqueued on, while the "threshold" is applied to each queued instance of bus + * message, i.e. if the same message is enqueued to five busses/direct connections it will be counted five + * times. This difference in counting ("references" vs. "instances") is primarily a result of the fact that + * it's easier to implement it this way, however it also reflects the thinking that the "threshold" should put + * a limit on used queue memory, i.e. space, while the "budget" should put a limit on time. Also note that + * the "threshold" is currently chosen much higher than the "budget". */ + budget = MANAGER_BUS_MESSAGE_BUDGET; + m->dispatching_dbus_queue = true; - while ((u = m->dbus_unit_queue)) { + while (budget > 0 && (u = m->dbus_unit_queue)) { + assert(u->in_dbus_queue); bus_unit_send_change_signal(u); - n++; + n++, budget--; } - while ((j = m->dbus_job_queue)) { + while (budget > 0 && (j = m->dbus_job_queue)) { assert(j->in_dbus_queue); bus_job_send_change_signal(j); - n++; + n++, budget--; } m->dispatching_dbus_queue = false; - if (m->send_reloading_done) { + if (budget > 0 && m->send_reloading_done) { m->send_reloading_done = false; - bus_manager_send_reloading(m, false); + n++, budget--; } - if (m->queued_message) + if (budget > 0 && m->queued_message) { bus_send_queued_message(m); + n++; + } return n; } diff --git a/src/systemd/sd-bus.h b/src/systemd/sd-bus.h index 82e7d445d28..fff6798d0f3 100644 --- a/src/systemd/sd-bus.h +++ b/src/systemd/sd-bus.h @@ -202,6 +202,9 @@ int sd_bus_attach_event(sd_bus *bus, sd_event *e, int priority); int sd_bus_detach_event(sd_bus *bus); sd_event *sd_bus_get_event(sd_bus *bus); +int sd_bus_get_n_queued_read(sd_bus *bus, uint64_t *ret); +int sd_bus_get_n_queued_write(sd_bus *bus, uint64_t *ret); + int sd_bus_add_filter(sd_bus *bus, sd_bus_slot **slot, sd_bus_message_handler_t callback, void *userdata); int sd_bus_add_match(sd_bus *bus, sd_bus_slot **slot, const char *match, sd_bus_message_handler_t callback, void *userdata); int sd_bus_add_match_async(sd_bus *bus, sd_bus_slot **slot, const char *match, sd_bus_message_handler_t callback, sd_bus_message_handler_t install_callback, void *userdata); -- 2.47.3