From: Michael Tremer Date: Wed, 26 Apr 2023 14:37:14 +0000 (+0000) Subject: jobs: Push queue back as subclass in jobs.py X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=bc6d8b64e19158ec422e08f436b294b108d1d03e;p=pbs.git jobs: Push queue back as subclass in jobs.py Signed-off-by: Michael Tremer --- diff --git a/Makefile.am b/Makefile.am index 6fb71306..c244ece4 100644 --- a/Makefile.am +++ b/Makefile.am @@ -95,7 +95,6 @@ buildservice_PYTHON = \ src/buildservice/errors.py \ src/buildservice/events.py \ src/buildservice/git.py \ - src/buildservice/jobqueue.py \ src/buildservice/jobs.py \ src/buildservice/keys.py \ src/buildservice/logstreams.py \ diff --git a/src/buildservice/__init__.py b/src/buildservice/__init__.py index 044c3544..9646135f 100644 --- a/src/buildservice/__init__.py +++ b/src/buildservice/__init__.py @@ -21,7 +21,6 @@ from . import config from . import database from . import distribution from . import events -from . import jobqueue from . import jobs from . import keys from . import logstreams @@ -69,7 +68,6 @@ class Backend(object): self.builders = builders.Builders(self) self.distros = distribution.Distributions(self) self.events = events.Events(self) - self.jobqueue = jobqueue.JobQueue(self) self.keys = keys.Keys(self) self.logstreams = logstreams.LogStreams(self) self.messages = messages.Messages(self) diff --git a/src/buildservice/builders.py b/src/buildservice/builders.py index 7f100126..0503d069 100644 --- a/src/buildservice/builders.py +++ b/src/buildservice/builders.py @@ -90,7 +90,7 @@ class Builders(base.Object): # Run through all build jobs and allocate them to a builder. # If a builder is full (i.e. reaches the threshold of its build time), # we move on to the next builder until that is full and so on. - for job in self.backend.jobqueue: + for job in self.backend.jobs.queue: log.debug("Processing job %s..." % job) for builder in builders: diff --git a/src/buildservice/jobqueue.py b/src/buildservice/jobqueue.py deleted file mode 100644 index 62cc9ea8..00000000 --- a/src/buildservice/jobqueue.py +++ /dev/null @@ -1,72 +0,0 @@ -#!/usr/bin/python - -import logging - -from . import base -from .errors import * - -# Setup logging -log = logging.getLogger("pbs.jobqueue") - -class JobQueue(base.Object): - def __iter__(self): - jobs = self.backend.jobs._get_jobs("SELECT jobs.* FROM job_queue queue \ - LEFT JOIN jobs ON queue.job_id = jobs.id") - - return iter(jobs) - - def __len__(self): - res = self.db.get("SELECT COUNT(*) AS len FROM job_queue") - - return res.len - - def pop(self, builder): - """ - Returns the next build job that matches the given architectures - """ - return self.backend.jobs._get_job(""" - SELECT - jobs.* - FROM - job_queue queue - LEFT JOIN - jobs ON queue.job_id = jobs.id - WHERE - queue.arch = ANY(%s) - LIMIT 1""", - builder.supported_arches, - ) - - async def dispatch_jobs(self): - """ - Will be called regularly and will dispatch any pending jobs to any - available builders - """ - log.debug("Dispatching jobs...") - - # Process all builders and assign jobs - # We prioritize builders with fewer jobs - for builder in sorted(self.backend.builders.connections, key=lambda b: len(b.jobs)): - log.debug(" Processing builder %s" % builder) - - with self.backend.db.transaction(): - if not builder.is_ready(): - log.debug(" Builder %s is not ready" % builder) - continue - - # We are ready for a new job - job = self.pop(builder) - if job: - try: - builder.dispatch_job(job) - - # Ignore if the builder suddenly went away - except BuilderNotOnlineError: - pass - - continue - - log.debug(" No jobs processable for %s" % builder) - - # If there is no job for the builder, we might as well shut it down - await builder.stop() diff --git a/src/buildservice/jobs.py b/src/buildservice/jobs.py index 7d1fa09e..f2f4b8cd 100644 --- a/src/buildservice/jobs.py +++ b/src/buildservice/jobs.py @@ -21,6 +21,10 @@ from .decorators import * log = logging.getLogger("pbs.jobs") class Jobs(base.Object): + def init(self): + # Setup queue + self.queue = Queue(self.backend) + def _get_job(self, query, *args): res = self.db.get(query, *args) @@ -96,6 +100,84 @@ class Jobs(base.Object): await asyncio.gather(*(job.depcheck() for job in jobs)) +class Queue(base.Object): + def __iter__(self): + jobs = self.backend.jobs._get_jobs(""" + SELECT + jobs.* + FROM + job_queue + LEFT JOIN + jobs ON job_queue.job_id = jobs.id + """) + + return iter(jobs) + + def __len__(self): + res = self.db.get(""" + SELECT + COUNT(*) AS len + FROM + job_queue + """) + + if res: + return res.len + + return 0 + + def pop(self, builder): + """ + Returns the next build job that matches the given architectures + """ + return self.backend.jobs._get_job(""" + SELECT + jobs.* + FROM + job_queue + LEFT JOIN + jobs ON job_queue.job_id = jobs.id + WHERE + job_queue.arch = ANY(%s) + LIMIT 1""", + builder.supported_arches, + ) + + async def dispatch_jobs(self): + """ + Will be called regularly and will dispatch any pending jobs to any + available builders + """ + log.debug("Dispatching jobs...") + + # Process all builders and assign jobs + # We prioritize builders with fewer jobs + for builder in sorted(self.backend.builders.connections, key=lambda b: len(b.jobs)): + log.debug(" Processing builder %s" % builder) + + with self.backend.db.transaction(): + if not builder.is_ready(): + log.debug(" Builder %s is not ready" % builder) + continue + + # We are ready for a new job + job = self.pop(builder) + if job: + try: + builder.dispatch_job(job) + + # Ignore if the builder suddenly went away + except BuilderNotOnlineError: + pass + + continue + + log.debug(" No jobs processable for %s" % builder) + + # If there is no job for the builder, we might as well shut it down + await builder.stop() + + class Job(base.DataObject): table = "jobs" @@ -354,7 +436,7 @@ class Job(base.DataObject): self._set_attribute("aborted_by", user) # Dispatch more jobs - await self.backend.jobqueue.dispatch_jobs() + await self.backend.jobs.queue.dispatch_jobs() def is_aborted(self): """ diff --git a/src/templates/index.html b/src/templates/index.html index 3fe8ffb8..230391db 100644 --- a/src/templates/index.html +++ b/src/templates/index.html @@ -28,7 +28,7 @@
- {% set q = len(backend.jobqueue) %} + {% set q = len(backend.jobs.queue) %} {{ _("One Job In Queue", "%(num)s Jobs In Queue", q) % { "num" : q } }} diff --git a/src/web/builders.py b/src/web/builders.py index e4e1ef58..af67fe76 100644 --- a/src/web/builders.py +++ b/src/web/builders.py @@ -15,7 +15,7 @@ class APIv1ControlHandler(base.APIMixin, tornado.websocket.WebSocketHandler): self.current_user.connected(self) # After the builder has connected, try to dispatch some jobs - await self.backend.jobqueue.dispatch_jobs() + await self.backend.jobs.queue.dispatch_jobs() def on_ping(self, data): log.debug("%s has pinged us" % self.builder) diff --git a/src/web/builds.py b/src/web/builds.py index 781c9263..2c97830b 100644 --- a/src/web/builds.py +++ b/src/web/builds.py @@ -56,7 +56,7 @@ class APIv1IndexHandler(base.APIMixin, base.BaseHandler): await self.backend.jobs.depcheck(build.jobs) # Try to dispatch jobs - await self.backend.jobqueue.dispatch_jobs() + await self.backend.jobs.queue.dispatch_jobs() class IndexHandler(base.BaseHandler): diff --git a/src/web/jobs.py b/src/web/jobs.py index 86cb273b..3e624ea1 100644 --- a/src/web/jobs.py +++ b/src/web/jobs.py @@ -81,7 +81,7 @@ class APIv1DetailHandler(base.APIMixin, tornado.websocket.WebSocketHandler): await self.job.finished(success=success, logfile=logfile, packages=packages) # Try to dispatch the next job - await self.backend.jobqueue.dispatch_jobs() + await self.backend.jobs.queue.dispatch_jobs() class APIv1LogStreamHandler(base.BackendMixin, tornado.websocket.WebSocketHandler): @@ -119,7 +119,7 @@ class APIv1LogStreamHandler(base.BackendMixin, tornado.websocket.WebSocketHandle class QueueHandler(base.BaseHandler): def get(self): - self.render("queue.html", queue=self.backend.jobqueue) + self.render("queue.html", queue=self.backend.jobs.queue) class LogHandler(base.BaseHandler): diff --git a/tests/build.py b/tests/build.py index 680c5d21..56af86dd 100755 --- a/tests/build.py +++ b/tests/build.py @@ -91,38 +91,38 @@ class BuildTestCase(test.TestCase): # There should be no jobs in the queue yet, because the dependency check # has not been finished, yet - self.assertEqual(len(self.backend.jobqueue), 0) + self.assertEqual(len(self.backend.jobs.queue), 0) # Pretend the dependency check was successful for job in build.jobs: job._set_attribute("depcheck_succeeded", True) # There should now be two jobs in the queue - self.assertEqual(len(self.backend.jobqueue), 2) + self.assertEqual(len(self.backend.jobs.queue), 2) # Assign a job to a builder job1.assign(self.builder) # There should be one job left - self.assertEqual(len(self.backend.jobqueue), 1) + self.assertEqual(len(self.backend.jobs.queue), 1) # Let the job fail await job1.finished(success=False) # There should still be only one job - self.assertEqual(len(self.backend.jobqueue), 1) + self.assertEqual(len(self.backend.jobs.queue), 1) # Assign the second job job2.assign(self.builder) # The queue should now be empty - self.assertEqual(len(self.backend.jobqueue), 0) + self.assertEqual(len(self.backend.jobs.queue), 0) # Pretend the job finished successfully await job2.finished(success=True) # The queue should still be empty - self.assertEqual(len(self.backend.jobqueue), 0) + self.assertEqual(len(self.backend.jobs.queue), 0) async def test_watchers(self): """