From: Michael Tremer Date: Fri, 21 Dec 2012 15:44:11 +0000 (+0100) Subject: Rewrite bigger chunks of the job schedulung code. X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=163d9d8b6c28e162a3479281f927eec2b2a73fe1;p=pbs.git Rewrite bigger chunks of the job schedulung code. This patch fixes #10164 and some more parts have been rewritten/duplicate code has been removed. --- diff --git a/backend/builders.py b/backend/builders.py index b27fea1a..9dcf7870 100644 --- a/backend/builders.py +++ b/backend/builders.py @@ -539,30 +539,36 @@ class Builder(base.Object): return "online" - def get_active_jobs(self, count=False): - query = self.db.query("\ - SELECT * FROM jobs \ - WHERE \ - jobs.builder_id = %s AND \ - (jobs.state = 'dispatching' OR jobs.state = 'running' OR jobs.state = 'uploading') \ - ORDER BY time_started ASC", - self.id) + def get_active_jobs(self, *args, **kwargs): + if self._active_jobs is None: + self._active_jobs = self.pakfire.jobs.get_active(builder=self, *args, **kwargs) + + return self._active_jobs + + def count_active_jobs(self): + return len(self.get_active_jobs()) + + @property + def too_many_jobs(self): + """ + Tell if this host is already running enough or too many jobs. + """ + return self.count_active_jobs() >= self.max_jobs - if count: - return len(query) + def get_next_jobs(self, arches=None, limit=None): + if arches is None: + arches = self.get_arches() - jobs = [] - for job in query: - job = self.pakfire.jobs.get_by_id(job.id, job) - jobs.append(job) + return self.pakfire.jobs.get_next(arches=arches, builder=self, + state="pending", limit=limit) - return jobs + def get_next_job(self, *args, **kwargs): + kwargs["limit"] = 1 - def count_active_jobs(self): - if self._active_jobs is None: - self._active_jobs = self.get_active_jobs(count=True) + jobs = self.get_next_jobs(*args, **kwargs) - return self._active_jobs + if jobs: + return jobs[0] def get_history(self, *args, **kwargs): kwargs["builder"] = self diff --git a/backend/builds.py b/backend/builds.py index 403fe951..e67be2fa 100644 --- a/backend/builds.py +++ b/backend/builds.py @@ -663,7 +663,6 @@ class Build(base.Object): if self._data: self._data["state"] = state - self.clear_cache() # In broken state, the removal from the repository is forced and # all jobs that are not finished yet will be aborted. @@ -1303,17 +1302,15 @@ class Jobs(base.Object): # Return sorted list of jobs. return sorted(jobs) - def get_active(self, host_id=None, uploads=True, running_only=False): - running_states = ["dispatching", "running"] - - if not running_only: - running_states += ["new", "pending",] + def get_active(self, host_id=None, builder=None, states=None): + if builder: + host_id = builder.id - if uploads: - running_states.append("uploading") + if states is None: + states = ["dispatching", "running", "uploading"] - query = "SELECT * FROM jobs WHERE (%s)" % \ - " OR ".join(["state = '%s'" % s for s in running_states]) + query = "SELECT * FROM jobs WHERE state IN (%s)" % ", ".join(["%s"] * len(states)) + args = states if host_id: query += " AND builder_id = %s" % host_id @@ -1327,40 +1324,72 @@ class Jobs(base.Object): WHEN jobs.state = 'new' THEN 4 \ END, time_started ASC" - return [Job(self.pakfire, j.id, j) for j in self.db.query(query)] + return [Job(self.pakfire, j.id, j) for j in self.db.query(query, *args)] - def get_next_iter(self, arches=None, limit=None, offset=None, type=None, states=["pending", "new"], max_tries=None): - args = [] - conditions = [ - "(start_not_before IS NULL OR start_not_before <= NOW())", - ] + def get_next_iter(self, *args, **kwargs): + return iter(self.get_next(*args, **kwargs)) - if type: - conditions.append("jobs.type = %s") - args.append(type) + def get_next(self, arches=None, builder=None, limit=None, offset=None, type=None, + state=None, states=None, max_tries=None): - if states: - conditions.append("(%s)" % " OR ".join(["jobs.state = %s" for state in states])) - args += states + if state is None and states is None: + states = ["pending", "new"] + + if builder and arches is None: + arches = builder.get_arches() + + query = "SELECT jobs.* FROM jobs \ + JOIN builds ON jobs.build_id = builds.id \ + WHERE \ + (start_not_before IS NULL OR start_not_before <= NOW())" + args = [] if arches: - conditions.append("(%s)" % " OR ".join(["jobs.arch_id = %s" for a in arches])) - args += [a.id for a in arches] + query += " AND jobs.arch_id IN (%s)" % ", ".join(["%s"] * len(arches)) + args.extend([a.id for a in arches]) + + if builder: + #query += " AND (jobs.builder_id = %s OR jobs.builder_id IS NULL)" + #args.append(builder.id) + + # Check out which types of builds this builder builds. + build_types = [] + for build_type in builder.build_types: + if build_type == "release": + build_types.append("(builds.type = 'release' AND jobs.type = 'build')") + elif build_type == "scratch": + build_types.append("(builds.type = 'scratch' AND jobs.type = 'build')") + elif build_type == "test": + build_types.append("jobs.type = 'test'") + + if build_types: + query += " AND (%s)" % " OR ".join(build_types) - # Only return jobs with up to max_tries tries. if max_tries: - conditions.append("jobs.tries <= %s") + query += " AND jobs.max_tries <= %s" args.append(max_tries) - query = "SELECT jobs.* FROM jobs \ - JOIN builds ON jobs.build_id = builds.id" + if state: + query += " AND jobs.state = %s" + args.append(state) - if conditions: - query += " WHERE %s" % " AND ".join(conditions) + if states: + query += " AND jobs.state IN (%s)" % ", ".join(["%s"] * len(states)) + args.extend(states) + + if type: + query += " AND jobs.type = %s" + args.append(type) + + # Order builds. + # Release builds and scratch builds are more important than test builds. + # Builds are sorted by priority and older builds are preferred. - # Choose the oldest one at first, but prefer real builds instead of - # test builds. query += " ORDER BY \ + CASE \ + WHEN jobs.state = 'pending' THEN 0 \ + WHEN jobs.state = 'new' THEN 1 \ + END, \ CASE \ WHEN jobs.type = 'build' THEN 0 \ WHEN jobs.type = 'test' THEN 1 \ @@ -1368,21 +1397,12 @@ class Jobs(base.Object): builds.priority DESC, jobs.time_created ASC" if limit: - if offset: - query += " LIMIT %s,%s" - args += [limit, offset] - else: - query += " LIMIT %s" - args += [limit] - - for job in self.db.query(query, *args): - yield Job(self.pakfire, job.id, job) + query += " LIMIT %s" + args.append(limit) - def get_next(self, *args, **kwargs): jobs = [] - - # Fetch all objects right now. - for job in self.get_next_iter(*args, **kwargs): + for row in self.db.query(query, *args): + job = self.pakfire.jobs.get_by_id(row.id, row) jobs.append(job) return jobs @@ -1580,12 +1600,7 @@ class Job(base.Object): @property def data(self): if self._data is None: - data = self.cache.get(self.cache_key) - if not data: - data = self.db.get("SELECT * FROM jobs WHERE id = %s", self.id) - self.cache.set(self.cache_key, data) - - self._data = data + self._data = self.db.get("SELECT * FROM jobs WHERE id = %s", self.id) assert self._data return self._data @@ -1714,9 +1729,8 @@ class Job(base.Object): time_finished = NULL WHERE id = %s", self.id) elif state in ("aborted", "dependency_error", "finished", "failed"): - # Set finish time. - self.db.execute("UPDATE jobs SET time_finished = NOW() WHERE id = %s", - self.id) + # Set finish time and reset builder.. + self.db.execute("UPDATE jobs SET time_finished = NOW() WHERE id = %s", self.id) # Send messages to the user. if state == "finished": diff --git a/data/templates/builder-detail.html b/data/templates/builder-detail.html index ab1a8fbc..18b29a2a 100644 --- a/data/templates/builder-detail.html +++ b/data/templates/builder-detail.html @@ -192,8 +192,17 @@ -
+ {% if jobs %} +
-

{{ _("Log") }}

- {% module Log(builder.get_history(limit=5)) %} +

{{ _("Active and pending jobs") }}

+ {% module JobsList(jobs) %} + {% end %} + + {% if log %} +
+ +

{{ _("Log") }}

+ {% module Log(log) %} + {% end %} {% end block %} diff --git a/hub/handlers.py b/hub/handlers.py index b344ff27..bcafc9e4 100644 --- a/hub/handlers.py +++ b/hub/handlers.py @@ -607,20 +607,12 @@ class BuilderHandler(BuilderAuthMixin, CommonAuthHandler): def build_get_job(self, arches): # Disabled buildes do not get any jobs. if self.builder.disabled: - logging.debug("Host requested job but is disabled: %s" \ - % self.builder.name) + logging.debug("Host requested job but is disabled: %s" % self.builder.name) return - # So do hosts where the metadata is not up to date. - #if self.builder.needs_update(): - # logging.debug("Host requested job but needs metadata update: %s" \ - # % self.builder.name) - # return - # Check if host has already too many simultaneous jobs. - if len(self.builder.get_active_jobs()) >= self.builder.max_jobs: - logging.debug("Host has already too many jobs: %s" % \ - self.builder.name) + if self.builder.too_many_jobs: + logging.debug("Host has already too many jobs: %s" % self.builder.name) return # Automatically add noarch if not already present. @@ -642,26 +634,11 @@ class BuilderHandler(BuilderAuthMixin, CommonAuthHandler): supported_arches.append(arch) if not supported_arches: - logging.warning("Host does not support any arches: %s" % \ - self.builder.name) + logging.warning("Host does not support any arches: %s" % self.builder.name) return - # Get all jobs from the database that can be built by this host. - jobs = self.pakfire.jobs.get_next_iter(states=["pending"], - arches=supported_arches) - - job = None - for _job in jobs: - # Skip jobs that should not be built here. - if _job.type == "test" and not "test" in self.builder.build_types: - continue - - if not _job.build.type in self.builder.build_types: - continue - - job = _job - break - + # Get the next job for this builder. + job = self.builder.get_next_job(supported_arches) if not job: logging.debug("Could not find a buildable job for %s" % self.builder.name) return diff --git a/web/handlers.py b/web/handlers.py index 06ae96da..cbbd48d9 100644 --- a/web/handlers.py +++ b/web/handlers.py @@ -18,6 +18,7 @@ from handlers_users import * class IndexHandler(BaseHandler): def get(self): jobs = self.pakfire.jobs.get_active() + jobs += self.pakfire.jobs.get_next() jobs += self.pakfire.jobs.get_latest(age="24 HOUR", limit=5) # Updates diff --git a/web/handlers_builders.py b/web/handlers_builders.py index eaab382e..029f1541 100644 --- a/web/handlers_builders.py +++ b/web/handlers_builders.py @@ -17,7 +17,14 @@ class BuilderDetailHandler(BaseHandler): def get(self, hostname): builder = self.pakfire.builders.get_by_name(hostname) - self.render("builder-detail.html", builder=builder) + # Get running and pending jobs. + jobs = self.pakfire.jobs.get_active(builder=builder) + jobs += self.pakfire.jobs.get_next(builder=builder) + + # Get log. + log = builder.get_history(limit=5) + + self.render("builder-detail.html", builder=builder, jobs=jobs, log=log) @tornado.web.authenticated def post(self, hostname):