]> git.ipfire.org Git - people/jschlag/pbs.git/commitdiff
Rewrite bigger chunks of the job schedulung code.
authorMichael Tremer <michael.tremer@ipfire.org>
Fri, 21 Dec 2012 15:44:11 +0000 (16:44 +0100)
committerMichael Tremer <michael.tremer@ipfire.org>
Fri, 21 Dec 2012 15:44:11 +0000 (16:44 +0100)
This patch fixes #10164 and some more parts have been
rewritten/duplicate code has been removed.

backend/builders.py
backend/builds.py
data/templates/builder-detail.html
hub/handlers.py
web/handlers.py
web/handlers_builders.py

index b27fea1a6d23f02b93ac801a9336b1b018305f10..9dcf78705446f13ce2ad395a142b6e0063aacfa9 100644 (file)
@@ -539,30 +539,36 @@ class Builder(base.Object):
 
                return "online"
 
-       def get_active_jobs(self, count=False):
-               query = self.db.query("\
-                       SELECT * FROM jobs \
-                       WHERE \
-                               jobs.builder_id = %s AND \
-                               (jobs.state = 'dispatching' OR jobs.state = 'running' OR jobs.state = 'uploading') \
-                       ORDER BY time_started ASC",
-                       self.id)
+       def get_active_jobs(self, *args, **kwargs):
+               if self._active_jobs is None:
+                       self._active_jobs = self.pakfire.jobs.get_active(builder=self, *args, **kwargs)
+
+               return self._active_jobs
+
+       def count_active_jobs(self):
+               return len(self.get_active_jobs())
+
+       @property
+       def too_many_jobs(self):
+               """
+                       Tell if this host is already running enough or too many jobs.
+               """
+               return self.count_active_jobs() >= self.max_jobs
 
-               if count:
-                       return len(query)
+       def get_next_jobs(self, arches=None, limit=None):
+               if arches is None:
+                       arches = self.get_arches()
 
-               jobs = []
-               for job in query:
-                       job = self.pakfire.jobs.get_by_id(job.id, job)
-                       jobs.append(job)
+               return self.pakfire.jobs.get_next(arches=arches, builder=self,
+                       state="pending", limit=limit)
 
-               return jobs
+       def get_next_job(self, *args, **kwargs):
+               kwargs["limit"] = 1
 
-       def count_active_jobs(self):
-               if self._active_jobs is None:
-                       self._active_jobs = self.get_active_jobs(count=True)
+               jobs = self.get_next_jobs(*args, **kwargs)
 
-               return self._active_jobs
+               if jobs:
+                       return jobs[0]
 
        def get_history(self, *args, **kwargs):
                kwargs["builder"] = self
index 403fe9511dd8087d967600e09eaf8e9a10b66fc5..e67be2fab540d5bfcde5d5407668cca8430e7285 100644 (file)
@@ -663,7 +663,6 @@ class Build(base.Object):
 
                if self._data:
                        self._data["state"] = state
-               self.clear_cache()
 
                # In broken state, the removal from the repository is forced and
                # all jobs that are not finished yet will be aborted.
@@ -1303,17 +1302,15 @@ class Jobs(base.Object):
                # Return sorted list of jobs.
                return sorted(jobs)
 
-       def get_active(self, host_id=None, uploads=True, running_only=False):
-               running_states = ["dispatching", "running"]
-
-               if not running_only:
-                       running_states += ["new", "pending",]
+       def get_active(self, host_id=None, builder=None, states=None):
+               if builder:
+                       host_id = builder.id
 
-               if uploads:
-                       running_states.append("uploading")
+               if states is None:
+                       states = ["dispatching", "running", "uploading"]
 
-               query = "SELECT * FROM jobs WHERE (%s)" % \
-                       " OR ".join(["state = '%s'" % s for s in running_states])
+               query = "SELECT * FROM jobs WHERE state IN (%s)" % ", ".join(["%s"] * len(states))
+               args = states
 
                if host_id:
                        query += " AND builder_id = %s" % host_id
@@ -1327,40 +1324,72 @@ class Jobs(base.Object):
                                WHEN jobs.state = 'new'         THEN 4 \
                        END, time_started ASC"
 
-               return [Job(self.pakfire, j.id, j) for j in self.db.query(query)]
+               return [Job(self.pakfire, j.id, j) for j in self.db.query(query, *args)]
 
-       def get_next_iter(self, arches=None, limit=None, offset=None, type=None, states=["pending", "new"], max_tries=None):
-               args = []
-               conditions = [
-                       "(start_not_before IS NULL OR start_not_before <= NOW())",
-               ]
+       def get_next_iter(self, *args, **kwargs):
+               return iter(self.get_next(*args, **kwargs))
 
-               if type:
-                       conditions.append("jobs.type = %s")
-                       args.append(type)
+       def get_next(self, arches=None, builder=None, limit=None, offset=None, type=None,
+                       state=None, states=None, max_tries=None):
 
-               if states:
-                       conditions.append("(%s)" % " OR ".join(["jobs.state = %s" for state in states]))
-                       args += states
+               if state is None and states is None:
+                       states = ["pending", "new"]
+
+               if builder and arches is None:
+                       arches = builder.get_arches()
+
+               query = "SELECT jobs.* FROM jobs \
+                                       JOIN builds ON jobs.build_id = builds.id \
+                               WHERE \
+                                       (start_not_before IS NULL OR start_not_before <= NOW())"
+               args = []
 
                if arches:
-                       conditions.append("(%s)" % " OR ".join(["jobs.arch_id = %s" for a in arches]))
-                       args += [a.id for a in arches]
+                       query += " AND jobs.arch_id IN (%s)" % ", ".join(["%s"] * len(arches))
+                       args.extend([a.id for a in arches])
+
+               if builder:
+                       #query += " AND (jobs.builder_id = %s OR jobs.builder_id IS NULL)"
+                       #args.append(builder.id)
+
+                       # Check out which types of builds this builder builds.
+                       build_types = []
+                       for build_type in builder.build_types:
+                               if build_type == "release":
+                                       build_types.append("(builds.type = 'release' AND jobs.type = 'build')")
+                               elif build_type == "scratch":
+                                       build_types.append("(builds.type = 'scratch' AND jobs.type = 'build')")
+                               elif build_type == "test":
+                                       build_types.append("jobs.type = 'test'")
+
+                       if build_types:
+                               query += " AND (%s)" % " OR ".join(build_types)
 
-               # Only return jobs with up to max_tries tries.
                if max_tries:
-                       conditions.append("jobs.tries <= %s")
+                       query += " AND jobs.max_tries <= %s"
                        args.append(max_tries)
 
-               query = "SELECT jobs.* FROM jobs \
-                       JOIN builds ON jobs.build_id = builds.id"
+               if state:
+                       query += " AND jobs.state = %s"
+                       args.append(state)
 
-               if conditions:
-                       query += " WHERE %s" % " AND ".join(conditions)
+               if states:
+                       query += " AND jobs.state IN (%s)" % ", ".join(["%s"] * len(states))
+                       args.extend(states)
+
+               if type:
+                       query += " AND jobs.type = %s"
+                       args.append(type)
+
+               # Order builds.
+               #  Release builds and scratch builds are more important than test builds.
+               #  Builds are sorted by priority and older builds are preferred.
 
-               # Choose the oldest one at first, but prefer real builds instead of
-               # test builds.
                query += " ORDER BY \
+                       CASE \
+                               WHEN jobs.state = 'pending' THEN 0 \
+                               WHEN jobs.state = 'new'     THEN 1 \
+                       END, \
                        CASE \
                                WHEN jobs.type = 'build' THEN 0 \
                                WHEN jobs.type = 'test'  THEN 1 \
@@ -1368,21 +1397,12 @@ class Jobs(base.Object):
                        builds.priority DESC, jobs.time_created ASC"
 
                if limit:
-                       if offset:
-                               query += " LIMIT %s,%s"
-                               args += [limit, offset]
-                       else:
-                               query += " LIMIT %s"
-                               args += [limit]
-
-               for job in self.db.query(query, *args):
-                       yield Job(self.pakfire, job.id, job)
+                       query += " LIMIT %s"
+                       args.append(limit)
 
-       def get_next(self, *args, **kwargs):
                jobs = []
-
-               # Fetch all objects right now.
-               for job in self.get_next_iter(*args, **kwargs):
+               for row in self.db.query(query, *args):
+                       job = self.pakfire.jobs.get_by_id(row.id, row)
                        jobs.append(job)
 
                return jobs
@@ -1580,12 +1600,7 @@ class Job(base.Object):
        @property
        def data(self):
                if self._data is None:
-                       data = self.cache.get(self.cache_key)
-                       if not data:
-                               data = self.db.get("SELECT * FROM jobs WHERE id = %s", self.id)
-                               self.cache.set(self.cache_key, data)
-
-                       self._data = data
+                       self._data = self.db.get("SELECT * FROM jobs WHERE id = %s", self.id)
                        assert self._data
 
                return self._data
@@ -1714,9 +1729,8 @@ class Job(base.Object):
                                time_finished = NULL WHERE id = %s", self.id)
 
                elif state in ("aborted", "dependency_error", "finished", "failed"):
-                       # Set finish time.
-                       self.db.execute("UPDATE jobs SET time_finished = NOW() WHERE id = %s",
-                               self.id)
+                       # Set finish time and reset builder..
+                       self.db.execute("UPDATE jobs SET time_finished = NOW() WHERE id = %s", self.id)
 
                        # Send messages to the user.
                        if state == "finished":
index ab1a8fbc0305128ca41b6155dc7be51389e01dd6..18b29a2a0f0b2f651134cafa436fba16d3fdd1dc 100644 (file)
                </div>
        </div>
 
-       <hr>
+       {% if jobs %}
+               <hr>
 
-       <h3>{{ _("Log") }}</h3>
-       {% module Log(builder.get_history(limit=5)) %}
+               <h3>{{ _("Active and pending jobs") }}</h3>
+               {% module JobsList(jobs) %}
+       {% end %}
+
+       {% if log %}
+               <hr>
+
+               <h3>{{ _("Log") }}</h3>
+               {% module Log(log) %}
+       {% end %}
 {% end block %}
index b344ff27efa73dd770bcdc464d6d2635dda8ca66..bcafc9e4d77b0866323863e18e2e3758fc7d7727 100644 (file)
@@ -607,20 +607,12 @@ class BuilderHandler(BuilderAuthMixin, CommonAuthHandler):
        def build_get_job(self, arches):
                # Disabled buildes do not get any jobs.
                if self.builder.disabled:
-                       logging.debug("Host requested job but is disabled: %s" \
-                               % self.builder.name)
+                       logging.debug("Host requested job but is disabled: %s" % self.builder.name)
                        return
 
-               # So do hosts where the metadata is not up to date.
-               #if self.builder.needs_update():
-               #       logging.debug("Host requested job but needs metadata update: %s" \
-               #               % self.builder.name)
-               #       return
-
                # Check if host has already too many simultaneous jobs.
-               if len(self.builder.get_active_jobs()) >= self.builder.max_jobs:
-                       logging.debug("Host has already too many jobs: %s" % \
-                               self.builder.name)
+               if self.builder.too_many_jobs:
+                       logging.debug("Host has already too many jobs: %s" % self.builder.name)
                        return
 
                # Automatically add noarch if not already present.
@@ -642,26 +634,11 @@ class BuilderHandler(BuilderAuthMixin, CommonAuthHandler):
                        supported_arches.append(arch)
 
                if not supported_arches:
-                       logging.warning("Host does not support any arches: %s" % \
-                               self.builder.name)
+                       logging.warning("Host does not support any arches: %s" % self.builder.name)
                        return
 
-               # Get all jobs from the database that can be built by this host.
-               jobs = self.pakfire.jobs.get_next_iter(states=["pending"],
-                       arches=supported_arches)
-
-               job = None
-               for _job in jobs:
-                       # Skip jobs that should not be built here.
-                       if _job.type == "test" and not "test" in self.builder.build_types:
-                               continue
-
-                       if not _job.build.type in self.builder.build_types:
-                               continue
-
-                       job = _job
-                       break
-
+               # Get the next job for this builder.
+               job = self.builder.get_next_job(supported_arches)
                if not job:
                        logging.debug("Could not find a buildable job for %s" % self.builder.name)
                        return
index 06ae96da33b4e7af5c778139141c8de1f566e086..cbbd48d9868362ebec622a430ce26cfa202e8ca3 100644 (file)
@@ -18,6 +18,7 @@ from handlers_users import *
 class IndexHandler(BaseHandler):
        def get(self):
                jobs = self.pakfire.jobs.get_active()
+               jobs += self.pakfire.jobs.get_next()
                jobs += self.pakfire.jobs.get_latest(age="24 HOUR", limit=5)
 
                # Updates
index eaab382ea3ff7323f0682d7a8e844bde1202ff25..029f15415d99d96f65df660248a1d2ce74a3e8d2 100644 (file)
@@ -17,7 +17,14 @@ class BuilderDetailHandler(BaseHandler):
        def get(self, hostname):
                builder = self.pakfire.builders.get_by_name(hostname)
 
-               self.render("builder-detail.html", builder=builder)
+               # Get running and pending jobs.
+               jobs = self.pakfire.jobs.get_active(builder=builder)
+               jobs += self.pakfire.jobs.get_next(builder=builder)
+
+               # Get log.
+               log = builder.get_history(limit=5)
+
+               self.render("builder-detail.html", builder=builder, jobs=jobs, log=log)
 
        @tornado.web.authenticated
        def post(self, hostname):