int bus_verify_set_environment_async(Manager *m, sd_bus_message *call, sd_bus_error *error) {
return bus_verify_polkit_async(call, CAP_SYS_ADMIN, "org.freedesktop.systemd1.set-environment", NULL, false, UID_INVALID, &m->polkit_registry, error);
}
+
+uint64_t manager_bus_n_queued_write(Manager *m) {
+ uint64_t c = 0;
+ Iterator i;
+ sd_bus *b;
+ int r;
+
+ /* Returns the total number of messages queued for writing on all our direct and API busses. */
+
+ SET_FOREACH(b, m->private_buses, i) {
+ uint64_t k;
+
+ r = sd_bus_get_n_queued_write(b, &k);
+ if (r < 0)
+ log_debug_errno(r, "Failed to query queued messages for private bus: %m");
+ else
+ c += k;
+ }
+
+ if (m->api_bus) {
+ uint64_t k;
+
+ r = sd_bus_get_n_queued_write(m->api_bus, &k);
+ if (r < 0)
+ log_debug_errno(r, "Failed to query queued messages for API bus: %m");
+ else
+ c += k;
+ }
+
+ return c;
+}
#define JOBS_IN_PROGRESS_PERIOD_USEC (USEC_PER_SEC / 3)
#define JOBS_IN_PROGRESS_PERIOD_DIVISOR 3
+/* If there are more than 1K bus messages queue across our API and direct busses, then let's not add more on top until
+ * the queue gets more empty. */
+#define MANAGER_BUS_BUSY_THRESHOLD 1024LU
+
+/* How many units and jobs to process of the bus queue before returning to the event loop. */
+#define MANAGER_BUS_MESSAGE_BUDGET 100U
+
static int manager_dispatch_notify_fd(sd_event_source *source, int fd, uint32_t revents, void *userdata);
static int manager_dispatch_cgroups_agent_fd(sd_event_source *source, int fd, uint32_t revents, void *userdata);
static int manager_dispatch_signal_fd(sd_event_source *source, int fd, uint32_t revents, void *userdata);
}
static unsigned manager_dispatch_dbus_queue(Manager *m) {
- Job *j;
+ unsigned n = 0, budget;
Unit *u;
- unsigned n = 0;
+ Job *j;
assert(m);
if (m->dispatching_dbus_queue)
return 0;
+ /* Anything to do at all? */
+ if (!m->dbus_unit_queue && !m->dbus_job_queue && !m->send_reloading_done && !m->queued_message)
+ return 0;
+
+ /* Do we have overly many messages queued at the moment? If so, let's not enqueue more on top, let's sit this
+ * cycle out, and process things in a later cycle when the queues got a bit emptier. */
+ if (manager_bus_n_queued_write(m) > MANAGER_BUS_BUSY_THRESHOLD)
+ return 0;
+
+ /* Only process a certain number of units/jobs per event loop iteration. Even if the bus queue wasn't overly
+ * full before this call we shouldn't increase it in size too wildly in one step, and we shouldn't monopolize
+ * CPU time with generating these messages. Note the difference in counting of this "budget" and the
+ * "threshold" above: the "budget" is decreased only once per generated message, regardless how many
+ * busses/direct connections it is enqueued on, while the "threshold" is applied to each queued instance of bus
+ * message, i.e. if the same message is enqueued to five busses/direct connections it will be counted five
+ * times. This difference in counting ("references" vs. "instances") is primarily a result of the fact that
+ * it's easier to implement it this way, however it also reflects the thinking that the "threshold" should put
+ * a limit on used queue memory, i.e. space, while the "budget" should put a limit on time. Also note that
+ * the "threshold" is currently chosen much higher than the "budget". */
+ budget = MANAGER_BUS_MESSAGE_BUDGET;
+
m->dispatching_dbus_queue = true;
- while ((u = m->dbus_unit_queue)) {
+ while (budget > 0 && (u = m->dbus_unit_queue)) {
+
assert(u->in_dbus_queue);
bus_unit_send_change_signal(u);
- n++;
+ n++, budget--;
}
- while ((j = m->dbus_job_queue)) {
+ while (budget > 0 && (j = m->dbus_job_queue)) {
assert(j->in_dbus_queue);
bus_job_send_change_signal(j);
- n++;
+ n++, budget--;
}
m->dispatching_dbus_queue = false;
- if (m->send_reloading_done) {
+ if (budget > 0 && m->send_reloading_done) {
m->send_reloading_done = false;
-
bus_manager_send_reloading(m, false);
+ n++, budget--;
}
- if (m->queued_message)
+ if (budget > 0 && m->queued_message) {
bus_send_queued_message(m);
+ n++;
+ }
return n;
}
int sd_bus_detach_event(sd_bus *bus);
sd_event *sd_bus_get_event(sd_bus *bus);
+int sd_bus_get_n_queued_read(sd_bus *bus, uint64_t *ret);
+int sd_bus_get_n_queued_write(sd_bus *bus, uint64_t *ret);
+
int sd_bus_add_filter(sd_bus *bus, sd_bus_slot **slot, sd_bus_message_handler_t callback, void *userdata);
int sd_bus_add_match(sd_bus *bus, sd_bus_slot **slot, const char *match, sd_bus_message_handler_t callback, void *userdata);
int sd_bus_add_match_async(sd_bus *bus, sd_bus_slot **slot, const char *match, sd_bus_message_handler_t callback, sd_bus_message_handler_t install_callback, void *userdata);