]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
core: GC redundant device jobs from the run queue
authorLennart Poettering <lennart@poettering.net>
Tue, 15 Nov 2016 18:32:50 +0000 (19:32 +0100)
committerLennart Poettering <lennart@poettering.net>
Wed, 16 Nov 2016 14:03:26 +0000 (15:03 +0100)
In contrast to all other unit types device units when queued just track
external state, they cannot effect state changes on their own. Hence unless a
client or other job waits for them there's no reason to keep them in the job
queue. This adds a concept of GC'ing jobs of this type as soon as no client or
other job waits for them anymore.

To ensure this works correctly we need to track which clients actually
reference a job (i.e. which ones enqueued it). Unfortunately that's pretty
nasty to do for direct connections, as sd_bus_track doesn't work for
them. For now, work around this, by simply remembering in a boolean that a job
was requested by a direct connection, and reset it when we notice the direct
connection is gone. This means the GC logic works fine, except that jobs are
not immediately removed when direct connections disconnect.

In the longer term, a rework of the bus logic should fix this properly. For now
this should be good enough, as GC works for fine all cases except this one, and
thus is a clear improvement over the previous behaviour.

Fixes: #1921
src/core/dbus-job.c
src/core/dbus-job.h
src/core/dbus-unit.c
src/core/device.c
src/core/job.c
src/core/job.h
src/core/manager.c
src/core/manager.h
src/core/unit.c
src/core/unit.h
src/shared/bus-unit-util.c

index e8c69ed3e48ab62be458e9d9f5d2b80e27fb74d0..7888c163f15c38e2e4868da5ae8dedaee7180155 100644 (file)
@@ -191,3 +191,71 @@ void bus_job_send_removed_signal(Job *j) {
         if (r < 0)
                 log_debug_errno(r, "Failed to send job remove signal for %u: %m", j->id);
 }
+
+static int bus_job_track_handler(sd_bus_track *t, void *userdata) {
+        Job *j = userdata;
+
+        assert(t);
+        assert(j);
+
+        j->bus_track = sd_bus_track_unref(j->bus_track); /* make sure we aren't called again */
+
+        /* Last client dropped off the bus, maybe we should GC this now? */
+        job_add_to_gc_queue(j);
+        return 0;
+}
+
+static int bus_job_allocate_bus_track(Job *j) {
+        int r;
+
+        assert(j);
+
+        if (j->bus_track)
+                return 0;
+
+        r = sd_bus_track_new(j->unit->manager->api_bus, &j->bus_track, bus_job_track_handler, j);
+        if (r < 0)
+                return r;
+
+        return 0;
+}
+
+int bus_job_coldplug_bus_track(Job *j) {
+        int r = 0;
+
+        assert(j);
+
+        if (strv_isempty(j->deserialized_clients))
+                goto finish;
+
+        if (!j->manager->api_bus)
+                goto finish;
+
+        r = bus_job_allocate_bus_track(j);
+        if (r < 0)
+                goto finish;
+
+        r = bus_track_add_name_many(j->bus_track, j->deserialized_clients);
+
+finish:
+        j->deserialized_clients = strv_free(j->deserialized_clients);
+        return r;
+}
+
+int bus_job_track_sender(Job *j, sd_bus_message *m) {
+        int r;
+
+        assert(j);
+        assert(m);
+
+        if (sd_bus_message_get_bus(m) != j->unit->manager->api_bus) {
+                j->ref_by_private_bus = true;
+                return 0;
+        }
+
+        r = bus_job_allocate_bus_track(j);
+        if (r < 0)
+                return r;
+
+        return sd_bus_track_add_sender(j->bus_track, m);
+}
index 024d06719ea89311b0e898fe94cecb0c6b12c322..f9148895be31822b133c9455162b258f61bd1d6e 100644 (file)
@@ -29,3 +29,6 @@ int bus_job_method_cancel(sd_bus_message *message, void *job, sd_bus_error *erro
 
 void bus_job_send_change_signal(Job *j);
 void bus_job_send_removed_signal(Job *j);
+
+int bus_job_coldplug_bus_track(Job *j);
+int bus_job_track_sender(Job *j, sd_bus_message *m);
index 90cf5651cabd0ec0cd297b9c8bbbc48d369b7de6..2adc1d9288a75833aa19f4f5071a747ef8f8454a 100644 (file)
@@ -22,6 +22,7 @@
 #include "alloc-util.h"
 #include "bus-common-errors.h"
 #include "cgroup-util.h"
+#include "dbus-job.h"
 #include "dbus-unit.h"
 #include "dbus.h"
 #include "fd-util.h"
@@ -1223,17 +1224,9 @@ int bus_unit_queue_job(
         if (r < 0)
                 return r;
 
-        if (sd_bus_message_get_bus(message) == u->manager->api_bus) {
-                if (!j->bus_track) {
-                        r = sd_bus_track_new(sd_bus_message_get_bus(message), &j->bus_track, NULL, NULL);
-                        if (r < 0)
-                                return r;
-                }
-
-                r = sd_bus_track_add_sender(j->bus_track, message);
-                if (r < 0)
-                        return r;
-        }
+        r = bus_job_track_sender(j, message);
+        if (r < 0)
+                return r;
 
         path = job_dbus_path(j);
         if (!path)
@@ -1507,7 +1500,7 @@ int bus_unit_check_load_state(Unit *u, sd_bus_error *error) {
         return sd_bus_error_set_errnof(error, u->load_error, "Unit %s is not loaded properly: %m.", u->id);
 }
 
-static int bus_track_handler(sd_bus_track *t, void *userdata) {
+static int bus_unit_track_handler(sd_bus_track *t, void *userdata) {
         Unit *u = userdata;
 
         assert(t);
@@ -1519,7 +1512,7 @@ static int bus_track_handler(sd_bus_track *t, void *userdata) {
         return 0;
 }
 
-static int allocate_bus_track(Unit *u) {
+static int bus_unit_allocate_bus_track(Unit *u) {
         int r;
 
         assert(u);
@@ -1527,7 +1520,7 @@ static int allocate_bus_track(Unit *u) {
         if (u->bus_track)
                 return 0;
 
-        r = sd_bus_track_new(u->manager->api_bus, &u->bus_track, bus_track_handler, u);
+        r = sd_bus_track_new(u->manager->api_bus, &u->bus_track, bus_unit_track_handler, u);
         if (r < 0)
                 return r;
 
@@ -1545,7 +1538,7 @@ int bus_unit_track_add_name(Unit *u, const char *name) {
 
         assert(u);
 
-        r = allocate_bus_track(u);
+        r = bus_unit_allocate_bus_track(u);
         if (r < 0)
                 return r;
 
@@ -1557,7 +1550,7 @@ int bus_unit_track_add_sender(Unit *u, sd_bus_message *m) {
 
         assert(u);
 
-        r = allocate_bus_track(u);
+        r = bus_unit_allocate_bus_track(u);
         if (r < 0)
                 return r;
 
index c572a6737cd75a177fdaffac48fd58b3afd5bc79..074e93ffe2ea0ea7ab4c9bdfb53831430d21cd6b 100644 (file)
@@ -831,6 +831,8 @@ const UnitVTable device_vtable = {
                 "Device\0"
                 "Install\0",
 
+        .gc_jobs = true,
+
         .init = device_init,
         .done = device_done,
         .load = unit_load_fragment_and_dropin_optional,
index 3a20da6d06e519d353ae77be6b915f4b43f0146c..d6e71d68efd910c8fafb35d3863c75be0c92110a 100644 (file)
@@ -90,6 +90,9 @@ void job_free(Job *j) {
         if (j->in_dbus_queue)
                 LIST_REMOVE(dbus_queue, j->manager->dbus_job_queue, j);
 
+        if (j->in_gc_queue)
+                LIST_REMOVE(gc_queue, j->manager->gc_job_queue, j);
+
         sd_event_source_unref(j->timer_event_source);
 
         sd_bus_track_unref(j->bus_track);
@@ -226,6 +229,9 @@ Job* job_install(Job *j) {
         log_unit_debug(j->unit,
                        "Installed new job %s/%s as %u",
                        j->unit->id, job_type_to_string(j->type), (unsigned) j->id);
+
+        job_add_to_gc_queue(j);
+
         return j;
 }
 
@@ -639,6 +645,7 @@ _pure_ static const char *job_get_status_message_format(Unit *u, JobType t, JobR
                 [JOB_DEPENDENCY]  = "Dependency failed for %s.",
                 [JOB_ASSERT]      = "Assertion failed for %s.",
                 [JOB_UNSUPPORTED] = "Starting of %s not supported.",
+                [JOB_COLLECTED]   = "Unecessary job for %s was removed.",
         };
         static const char *const generic_finished_stop_job[_JOB_RESULT_MAX] = {
                 [JOB_DONE]        = "Stopped %s.",
@@ -698,6 +705,7 @@ static void job_print_status_message(Unit *u, JobType t, JobResult result) {
                 [JOB_SKIPPED]     = { ANSI_HIGHLIGHT,        " INFO " },
                 [JOB_ASSERT]      = { ANSI_HIGHLIGHT_YELLOW, "ASSERT" },
                 [JOB_UNSUPPORTED] = { ANSI_HIGHLIGHT_YELLOW, "UNSUPP" },
+                [JOB_COLLECTED]   = { ANSI_HIGHLIGHT,        " INFO " },
         };
 
         const char *format;
@@ -749,6 +757,7 @@ static void job_log_status_message(Unit *u, JobType t, JobResult result) {
                 [JOB_INVALID]     = LOG_INFO,
                 [JOB_ASSERT]      = LOG_WARNING,
                 [JOB_UNSUPPORTED] = LOG_WARNING,
+                [JOB_COLLECTED]   = LOG_INFO,
         };
 
         assert(u);
@@ -860,6 +869,7 @@ int job_finish_and_invalidate(Job *j, JobResult result, bool recursive, bool alr
                 job_set_state(j, JOB_WAITING);
 
                 job_add_to_run_queue(j);
+                job_add_to_gc_queue(j);
 
                 goto finish;
         }
@@ -903,11 +913,15 @@ int job_finish_and_invalidate(Job *j, JobResult result, bool recursive, bool alr
 finish:
         /* Try to start the next jobs that can be started */
         SET_FOREACH(other, u->dependencies[UNIT_AFTER], i)
-                if (other->job)
+                if (other->job) {
                         job_add_to_run_queue(other->job);
+                        job_add_to_gc_queue(other->job);
+                }
         SET_FOREACH(other, u->dependencies[UNIT_BEFORE], i)
-                if (other->job)
+                if (other->job) {
                         job_add_to_run_queue(other->job);
+                        job_add_to_gc_queue(other->job);
+                }
 
         manager_check_finished(u->manager);
 
@@ -1121,12 +1135,14 @@ int job_coldplug(Job *j) {
 
         /* After deserialization is complete and the bus connection
          * set up again, let's start watching our subscribers again */
-        (void) bus_track_coldplug(j->manager, &j->bus_track, false, j->deserialized_clients);
-        j->deserialized_clients = strv_free(j->deserialized_clients);
+        (void) bus_job_coldplug_bus_track(j);
 
         if (j->state == JOB_WAITING)
                 job_add_to_run_queue(j);
 
+        /* Maybe due to new dependencies we don't actually need this job anymore? */
+        job_add_to_gc_queue(j);
+
         if (j->begin_usec == 0 || j->unit->job_timeout == USEC_INFINITY)
                 return 0;
 
@@ -1201,6 +1217,95 @@ int job_get_timeout(Job *j, usec_t *timeout) {
         return 1;
 }
 
+bool job_check_gc(Job *j) {
+        Unit *other;
+        Iterator i;
+
+        assert(j);
+
+        /* Checks whether this job should be GC'ed away. We only do this for jobs of units that have no effect on their
+         * own and just track external state. For now the only unit type that qualifies for this are .device units. */
+
+        if (!UNIT_VTABLE(j->unit)->gc_jobs)
+                return true;
+
+        if (sd_bus_track_count(j->bus_track) > 0)
+                return true;
+
+        /* FIXME: So this is a bit ugly: for now we don't properly track references made via private bus connections
+         * (because it's nasty, as sd_bus_track doesn't apply to it). We simply remember that the job was once
+         * referenced by one, and reset this whenever we notice that no private bus connections are around. This means
+         * the GC is a bit too conservative when it comes to jobs created by private bus connections. */
+        if (j->ref_by_private_bus) {
+                if (set_isempty(j->unit->manager->private_buses))
+                        j->ref_by_private_bus = false;
+                else
+                        return true;
+        }
+
+        if (j->type == JOB_NOP)
+                return true;
+
+        /* If a job is ordered after ours, and is to be started, then it needs to wait for us, regardless if we stop or
+         * start, hence let's not GC in that case. */
+        SET_FOREACH(other, j->unit->dependencies[UNIT_BEFORE], i) {
+                if (!other->job)
+                        continue;
+
+                if (other->job->ignore_order)
+                        continue;
+
+                if (IN_SET(other->job->type, JOB_START, JOB_VERIFY_ACTIVE, JOB_RELOAD))
+                        return true;
+        }
+
+        /* If we are going down, but something else is orederd After= us, then it needs to wait for us */
+        if (IN_SET(j->type, JOB_STOP, JOB_RESTART)) {
+
+                SET_FOREACH(other, j->unit->dependencies[UNIT_AFTER], i) {
+                        if (!other->job)
+                                continue;
+
+                        if (other->job->ignore_order)
+                                continue;
+
+                        return true;
+                }
+        }
+
+        /* The logic above is kinda the inverse of the job_is_runnable() logic. Specifically, if the job "we" is
+         * ordered before the job "other":
+         *
+         *  we start + other start → stay
+         *  we start + other stop  → gc
+         *  we stop  + other start → stay
+         *  we stop  + other stop  → gc
+         *
+         * "we" are ordered after "other":
+         *
+         *  we start + other start → gc
+         *  we start + other stop  → gc
+         *  we stop  + other start → stay
+         *  we stop  + other stop  → stay
+         *
+         */
+
+        return false;
+}
+
+void job_add_to_gc_queue(Job *j) {
+        assert(j);
+
+        if (j->in_gc_queue)
+                return;
+
+        if (job_check_gc(j))
+                return;
+
+        LIST_PREPEND(gc_queue, j->unit->manager->gc_job_queue, j);
+        j->in_gc_queue = true;
+}
+
 static const char* const job_state_table[_JOB_STATE_MAX] = {
         [JOB_WAITING] = "waiting",
         [JOB_RUNNING] = "running",
@@ -1244,6 +1349,7 @@ static const char* const job_result_table[_JOB_RESULT_MAX] = {
         [JOB_INVALID] = "invalid",
         [JOB_ASSERT] = "assert",
         [JOB_UNSUPPORTED] = "unsupported",
+        [JOB_COLLECTED] = "collected",
 };
 
 DEFINE_STRING_TABLE_LOOKUP(job_result, JobResult);
index ccfc7def4d71660833445b3dc534b573dc553b03..6fdec9f2268428a7e7eafb31534b799ed303a48e 100644 (file)
@@ -107,6 +107,7 @@ enum JobResult {
         JOB_INVALID,             /* JOB_RELOAD of inactive unit */
         JOB_ASSERT,              /* Couldn't start a unit, because an assert didn't hold */
         JOB_UNSUPPORTED,         /* Couldn't start a unit, because the unit type is not supported on the system */
+        JOB_COLLECTED,           /* Job was garbage collected, since nothing needed it anymore */
         _JOB_RESULT_MAX,
         _JOB_RESULT_INVALID = -1
 };
@@ -133,6 +134,7 @@ struct Job {
         LIST_FIELDS(Job, transaction);
         LIST_FIELDS(Job, run_queue);
         LIST_FIELDS(Job, dbus_queue);
+        LIST_FIELDS(Job, gc_queue);
 
         LIST_HEAD(JobDependency, subject_list);
         LIST_HEAD(JobDependency, object_list);
@@ -168,6 +170,8 @@ struct Job {
         bool sent_dbus_new_signal:1;
         bool ignore_order:1;
         bool irreversible:1;
+        bool in_gc_queue:1;
+        bool ref_by_private_bus:1;
 };
 
 Job* job_new(Unit *unit, JobType type);
@@ -227,6 +231,9 @@ void job_shutdown_magic(Job *j);
 
 int job_get_timeout(Job *j, usec_t *timeout) _pure_;
 
+bool job_check_gc(Job *j);
+void job_add_to_gc_queue(Job *j);
+
 const char* job_type_to_string(JobType t) _const_;
 JobType job_type_from_string(const char *s) _pure_;
 
index dc81af949229e26944d9c0e36263c65ad3c9ca85..31770eef3a8c1d3e7ff41e5abfd3e540ded45f98 100644 (file)
@@ -981,10 +981,9 @@ good:
         unit_gc_mark_good(u, gc_marker);
 }
 
-static unsigned manager_dispatch_gc_queue(Manager *m) {
+static unsigned manager_dispatch_gc_unit_queue(Manager *m) {
+        unsigned n = 0, gc_marker;
         Unit *u;
-        unsigned n = 0;
-        unsigned gc_marker;
 
         assert(m);
 
@@ -996,12 +995,12 @@ static unsigned manager_dispatch_gc_queue(Manager *m) {
 
         gc_marker = m->gc_marker;
 
-        while ((u = m->gc_queue)) {
+        while ((u = m->gc_unit_queue)) {
                 assert(u->in_gc_queue);
 
                 unit_gc_sweep(u, gc_marker);
 
-                LIST_REMOVE(gc_queue, m->gc_queue, u);
+                LIST_REMOVE(gc_queue, m->gc_unit_queue, u);
                 u->in_gc_queue = false;
 
                 n++;
@@ -1018,6 +1017,30 @@ static unsigned manager_dispatch_gc_queue(Manager *m) {
         return n;
 }
 
+static unsigned manager_dispatch_gc_job_queue(Manager *m) {
+        unsigned n = 0;
+        Job *j;
+
+        assert(m);
+
+        while ((j = m->gc_job_queue)) {
+                assert(j->in_gc_queue);
+
+                LIST_REMOVE(gc_queue, m->gc_job_queue, j);
+                j->in_gc_queue = false;
+
+                n++;
+
+                if (job_check_gc(j))
+                        continue;
+
+                log_unit_debug(j->unit, "Collecting job.");
+                (void) job_finish_and_invalidate(j, JOB_COLLECTED, false, false);
+        }
+
+        return n;
+}
+
 static void manager_clear_jobs_and_units(Manager *m) {
         Unit *u;
 
@@ -1033,7 +1056,8 @@ static void manager_clear_jobs_and_units(Manager *m) {
         assert(!m->dbus_unit_queue);
         assert(!m->dbus_job_queue);
         assert(!m->cleanup_queue);
-        assert(!m->gc_queue);
+        assert(!m->gc_unit_queue);
+        assert(!m->gc_job_queue);
 
         assert(hashmap_isempty(m->jobs));
         assert(hashmap_isempty(m->units));
@@ -2226,7 +2250,10 @@ int manager_loop(Manager *m) {
                 if (manager_dispatch_load_queue(m) > 0)
                         continue;
 
-                if (manager_dispatch_gc_queue(m) > 0)
+                if (manager_dispatch_gc_job_queue(m) > 0)
+                        continue;
+
+                if (manager_dispatch_gc_unit_queue(m) > 0)
                         continue;
 
                 if (manager_dispatch_cleanup_queue(m) > 0)
index aa3f95e8e084de24d3e4dab8b390600e1acc4a21..d54ca5410755e2ef1a3c72a3de66e6b4b1f0aca6 100644 (file)
@@ -104,8 +104,9 @@ struct Manager {
         /* Units to remove */
         LIST_HEAD(Unit, cleanup_queue);
 
-        /* Units to check when doing GC */
-        LIST_HEAD(Unit, gc_queue);
+        /* Units and jobs to check when doing GC */
+        LIST_HEAD(Unit, gc_unit_queue);
+        LIST_HEAD(Job, gc_job_queue);
 
         /* Units that should be realized */
         LIST_HEAD(Unit, cgroup_queue);
index df60a5bf04f5cb54eca4258506ad2d2657837564..fbb21e4985f9dc731ac256d548fea42be8bd0552 100644 (file)
@@ -389,7 +389,7 @@ void unit_add_to_gc_queue(Unit *u) {
         if (unit_check_gc(u))
                 return;
 
-        LIST_PREPEND(gc_queue, u->manager->gc_queue, u);
+        LIST_PREPEND(gc_queue, u->manager->gc_unit_queue, u);
         u->in_gc_queue = true;
 }
 
@@ -569,7 +569,7 @@ void unit_free(Unit *u) {
                 LIST_REMOVE(cleanup_queue, u->manager->cleanup_queue, u);
 
         if (u->in_gc_queue)
-                LIST_REMOVE(gc_queue, u->manager->gc_queue, u);
+                LIST_REMOVE(gc_queue, u->manager->gc_unit_queue, u);
 
         if (u->in_cgroup_queue)
                 LIST_REMOVE(cgroup_queue, u->manager->cgroup_queue, u);
index 991543664bbcba71a606d505c518ccac92635d07..6d6885b487a84fe36e8ec94a93df6ae2efd2e3bc 100644 (file)
@@ -441,6 +441,9 @@ struct UnitVTable {
 
         /* True if transient units of this type are OK */
         bool can_transient:1;
+
+        /* True if queued jobs of this type should be GC'ed if no other job needs them anymore */
+        bool gc_jobs:1;
 };
 
 extern const UnitVTable * const unit_vtable[_UNIT_TYPE_MAX];
index 4f66497f3a9f60894eaf9c483227acf5b0adda40..a7702602ebea676bf72b8fcc5ddf85c41031868a 100644 (file)
@@ -838,6 +838,8 @@ static int check_wait_response(BusWaitForJobs *d, bool quiet, const char* const*
                         log_error("Assertion failed on job for %s.", strna(d->name));
                 else if (streq(d->result, "unsupported"))
                         log_error("Operation on or unit type of %s not supported on this system.", strna(d->name));
+                else if (streq(d->result, "collected"))
+                        log_error("Queued job for %s was garbage collected.", strna(d->name));
                 else if (!streq(d->result, "done") && !streq(d->result, "skipped")) {
                         if (d->name) {
                                 int q;
@@ -853,7 +855,7 @@ static int check_wait_response(BusWaitForJobs *d, bool quiet, const char* const*
                 }
         }
 
-        if (streq(d->result, "canceled"))
+        if (STR_IN_SET(d->result, "canceled", "collected"))
                 r = -ECANCELED;
         else if (streq(d->result, "timeout"))
                 r = -ETIME;