]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
core: don't process dbus unit and job queue when there are already too many messages... 8171/head
authorLennart Poettering <lennart@poettering.net>
Tue, 13 Feb 2018 17:30:34 +0000 (18:30 +0100)
committerLennart Poettering <lennart@poettering.net>
Tue, 27 Feb 2018 18:54:29 +0000 (19:54 +0100)
We maintain a queue of units and jobs that we are supposed to generate
change/new notifications for because they were either just created or
some of their property has changed. Let's throttle processing of this
queue a bit: as soon as > 1K of bus messages are queued for writing
let's skip processing the queue, and then recheck on the next
iteration again.

Moreover, never process more than 100 units in one go, return to the
event loop after that. Both limits together should put effective limits
on both space and time usage of the function, delaying further
operations until a later moment, when the queue is empty or the the
event loop is sufficiently idle again.

This should keep the number of generated messages much lower than
before on busy systems or where some client is hanging.

Note that this also means a bad client can slow down message dispatching
substantially for up to 90s if it likes to, for all clients. But that
should be acceptable as we only allow trusted bus clients, anyway.

Fixes: #8166
src/core/dbus.c
src/core/dbus.h
src/core/manager.c
src/systemd/sd-bus.h

index d023c8715c2893142bfd985d65aa646333cea588..56b43adcdaadda2f6fe17a78a402828fe7d305d8 100644 (file)
@@ -1279,3 +1279,34 @@ int bus_verify_reload_daemon_async(Manager *m, sd_bus_message *call, sd_bus_erro
 int bus_verify_set_environment_async(Manager *m, sd_bus_message *call, sd_bus_error *error) {
         return bus_verify_polkit_async(call, CAP_SYS_ADMIN, "org.freedesktop.systemd1.set-environment", NULL, false, UID_INVALID, &m->polkit_registry, error);
 }
+
+uint64_t manager_bus_n_queued_write(Manager *m) {
+        uint64_t c = 0;
+        Iterator i;
+        sd_bus *b;
+        int r;
+
+        /* Returns the total number of messages queued for writing on all our direct and API busses. */
+
+        SET_FOREACH(b, m->private_buses, i) {
+                uint64_t k;
+
+                r = sd_bus_get_n_queued_write(b, &k);
+                if (r < 0)
+                        log_debug_errno(r, "Failed to query queued messages for private bus: %m");
+                else
+                        c += k;
+        }
+
+        if (m->api_bus) {
+                uint64_t k;
+
+                r = sd_bus_get_n_queued_write(m->api_bus, &k);
+                if (r < 0)
+                        log_debug_errno(r, "Failed to query queued messages for API bus: %m");
+                else
+                        c += k;
+        }
+
+        return c;
+}
index 3051ec0665846cd87a7f74043597aab7e389eb99..702bc48ae1817dcea68846a391716d7f60be58be 100644 (file)
@@ -48,3 +48,5 @@ int bus_verify_reload_daemon_async(Manager *m, sd_bus_message *call, sd_bus_erro
 int bus_verify_set_environment_async(Manager *m, sd_bus_message *call, sd_bus_error *error);
 
 int bus_forward_agent_released(Manager *m, const char *path);
+
+uint64_t manager_bus_n_queued_write(Manager *m);
index ebac47024c7ed9acc7e4a7cc63d2854200dacb4e..84adb9c6660f5c126ce971b3832aaa1fdeccc824 100644 (file)
 #define JOBS_IN_PROGRESS_PERIOD_USEC (USEC_PER_SEC / 3)
 #define JOBS_IN_PROGRESS_PERIOD_DIVISOR 3
 
+/* If there are more than 1K bus messages queue across our API and direct busses, then let's not add more on top until
+ * the queue gets more empty. */
+#define MANAGER_BUS_BUSY_THRESHOLD 1024LU
+
+/* How many units and jobs to process of the bus queue before returning to the event loop. */
+#define MANAGER_BUS_MESSAGE_BUDGET 100U
+
 static int manager_dispatch_notify_fd(sd_event_source *source, int fd, uint32_t revents, void *userdata);
 static int manager_dispatch_cgroups_agent_fd(sd_event_source *source, int fd, uint32_t revents, void *userdata);
 static int manager_dispatch_signal_fd(sd_event_source *source, int fd, uint32_t revents, void *userdata);
@@ -1886,41 +1893,65 @@ static int manager_dispatch_run_queue(sd_event_source *source, void *userdata) {
 }
 
 static unsigned manager_dispatch_dbus_queue(Manager *m) {
-        Job *j;
+        unsigned n = 0, budget;
         Unit *u;
-        unsigned n = 0;
+        Job *j;
 
         assert(m);
 
         if (m->dispatching_dbus_queue)
                 return 0;
 
+        /* Anything to do at all? */
+        if (!m->dbus_unit_queue && !m->dbus_job_queue && !m->send_reloading_done && !m->queued_message)
+                return 0;
+
+        /* Do we have overly many messages queued at the moment? If so, let's not enqueue more on top, let's sit this
+         * cycle out, and process things in a later cycle when the queues got a bit emptier. */
+        if (manager_bus_n_queued_write(m) > MANAGER_BUS_BUSY_THRESHOLD)
+                return 0;
+
+        /* Only process a certain number of units/jobs per event loop iteration. Even if the bus queue wasn't overly
+         * full before this call we shouldn't increase it in size too wildly in one step, and we shouldn't monopolize
+         * CPU time with generating these messages. Note the difference in counting of this "budget" and the
+         * "threshold" above: the "budget" is decreased only once per generated message, regardless how many
+         * busses/direct connections it is enqueued on, while the "threshold" is applied to each queued instance of bus
+         * message, i.e. if the same message is enqueued to five busses/direct connections it will be counted five
+         * times. This difference in counting ("references" vs. "instances") is primarily a result of the fact that
+         * it's easier to implement it this way, however it also reflects the thinking that the "threshold" should put
+         * a limit on used queue memory, i.e. space, while the "budget" should put a limit on time. Also note that
+         * the "threshold" is currently chosen much higher than the "budget". */
+        budget = MANAGER_BUS_MESSAGE_BUDGET;
+
         m->dispatching_dbus_queue = true;
 
-        while ((u = m->dbus_unit_queue)) {
+        while (budget > 0 && (u = m->dbus_unit_queue)) {
+
                 assert(u->in_dbus_queue);
 
                 bus_unit_send_change_signal(u);
-                n++;
+                n++, budget--;
         }
 
-        while ((j = m->dbus_job_queue)) {
+        while (budget > 0 && (j = m->dbus_job_queue)) {
                 assert(j->in_dbus_queue);
 
                 bus_job_send_change_signal(j);
-                n++;
+                n++, budget--;
         }
 
         m->dispatching_dbus_queue = false;
 
-        if (m->send_reloading_done) {
+        if (budget > 0 && m->send_reloading_done) {
                 m->send_reloading_done = false;
-
                 bus_manager_send_reloading(m, false);
+                n++, budget--;
         }
 
-        if (m->queued_message)
+        if (budget > 0 && m->queued_message) {
                 bus_send_queued_message(m);
+                n++;
+        }
 
         return n;
 }
index 82e7d445d28328ff41df09bd9faa16f60feaebf7..fff6798d0f3c4eaeb9b25d78491bac8c5e0deb10 100644 (file)
@@ -202,6 +202,9 @@ int sd_bus_attach_event(sd_bus *bus, sd_event *e, int priority);
 int sd_bus_detach_event(sd_bus *bus);
 sd_event *sd_bus_get_event(sd_bus *bus);
 
+int sd_bus_get_n_queued_read(sd_bus *bus, uint64_t *ret);
+int sd_bus_get_n_queued_write(sd_bus *bus, uint64_t *ret);
+
 int sd_bus_add_filter(sd_bus *bus, sd_bus_slot **slot, sd_bus_message_handler_t callback, void *userdata);
 int sd_bus_add_match(sd_bus *bus, sd_bus_slot **slot, const char *match, sd_bus_message_handler_t callback, void *userdata);
 int sd_bus_add_match_async(sd_bus *bus, sd_bus_slot **slot, const char *match, sd_bus_message_handler_t callback, sd_bus_message_handler_t install_callback, void *userdata);