]> git.ipfire.org Git - pbs.git/commitdiff
jobs: Push queue back as subclass in jobs.py
authorMichael Tremer <michael.tremer@ipfire.org>
Wed, 26 Apr 2023 14:37:14 +0000 (14:37 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Wed, 26 Apr 2023 14:37:14 +0000 (14:37 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
Makefile.am
src/buildservice/__init__.py
src/buildservice/builders.py
src/buildservice/jobqueue.py [deleted file]
src/buildservice/jobs.py
src/templates/index.html
src/web/builders.py
src/web/builds.py
src/web/jobs.py
tests/build.py

index 6fb713069c1a5441e2a6ed0ec41e71b1ee4997fa..c244ece46a44403b98ca9f78a952b81a9d7b0eee 100644 (file)
@@ -95,7 +95,6 @@ buildservice_PYTHON = \
        src/buildservice/errors.py \
        src/buildservice/events.py \
        src/buildservice/git.py \
-       src/buildservice/jobqueue.py \
        src/buildservice/jobs.py \
        src/buildservice/keys.py \
        src/buildservice/logstreams.py \
index 044c35441f057217e5c116d337e9d22c2495d469..9646135fa16c97cecfc33c5d9d26691eb637d9d4 100644 (file)
@@ -21,7 +21,6 @@ from . import config
 from . import database
 from . import distribution
 from . import events
-from . import jobqueue
 from . import jobs
 from . import keys
 from . import logstreams
@@ -69,7 +68,6 @@ class Backend(object):
                self.builders    = builders.Builders(self)
                self.distros     = distribution.Distributions(self)
                self.events      = events.Events(self)
-               self.jobqueue    = jobqueue.JobQueue(self)
                self.keys        = keys.Keys(self)
                self.logstreams  = logstreams.LogStreams(self)
                self.messages    = messages.Messages(self)
index 7f100126c785092a2a4e1a90c1664e02f82d392c..0503d069bce751f1ed33cd3b1b4a00642f5864c4 100644 (file)
@@ -90,7 +90,7 @@ class Builders(base.Object):
                # Run through all build jobs and allocate them to a builder.
                # If a builder is full (i.e. reaches the threshold of its build time),
                # we move on to the next builder until that is full and so on.
-               for job in self.backend.jobqueue:
+               for job in self.backend.jobs.queue:
                        log.debug("Processing job %s..." % job)
 
                        for builder in builders:
diff --git a/src/buildservice/jobqueue.py b/src/buildservice/jobqueue.py
deleted file mode 100644 (file)
index 62cc9ea..0000000
+++ /dev/null
@@ -1,72 +0,0 @@
-#!/usr/bin/python
-
-import logging
-
-from . import base
-from .errors import *
-
-# Setup logging
-log = logging.getLogger("pbs.jobqueue")
-
-class JobQueue(base.Object):
-       def __iter__(self):
-               jobs = self.backend.jobs._get_jobs("SELECT jobs.* FROM job_queue queue \
-                       LEFT JOIN jobs ON queue.job_id = jobs.id")
-
-               return iter(jobs)
-
-       def __len__(self):
-               res = self.db.get("SELECT COUNT(*) AS len FROM job_queue")
-
-               return res.len
-
-       def pop(self, builder):
-               """
-                       Returns the next build job that matches the given architectures
-               """
-               return self.backend.jobs._get_job("""
-                       SELECT
-                               jobs.*
-                       FROM
-                               job_queue queue
-                       LEFT JOIN
-                               jobs ON queue.job_id = jobs.id
-                       WHERE
-                               queue.arch = ANY(%s)
-                       LIMIT 1""",
-                       builder.supported_arches,
-               )
-
-       async def dispatch_jobs(self):
-               """
-                       Will be called regularly and will dispatch any pending jobs to any
-                       available builders
-               """
-               log.debug("Dispatching jobs...")
-
-               # Process all builders and assign jobs
-               # We prioritize builders with fewer jobs
-               for builder in sorted(self.backend.builders.connections, key=lambda b: len(b.jobs)):
-                       log.debug("  Processing builder %s" % builder)
-
-                       with self.backend.db.transaction():
-                               if not builder.is_ready():
-                                       log.debug("  Builder %s is not ready" % builder)
-                                       continue
-
-                               # We are ready for a new job
-                               job = self.pop(builder)
-                               if job:
-                                       try:
-                                               builder.dispatch_job(job)
-
-                                       # Ignore if the builder suddenly went away
-                                       except BuilderNotOnlineError:
-                                               pass
-
-                                       continue
-
-                               log.debug("  No jobs processable for %s" % builder)
-
-                               # If there is no job for the builder, we might as well shut it down
-                               await builder.stop()
index 7d1fa09e26b00a137e8a5de8bcf19653fc6916cd..f2f4b8cdf050c29abb69aaad86d2ad2e15e2ae1d 100644 (file)
@@ -21,6 +21,10 @@ from .decorators import *
 log = logging.getLogger("pbs.jobs")
 
 class Jobs(base.Object):
+       def init(self):
+               # Setup queue
+               self.queue = Queue(self.backend)
+
        def _get_job(self, query, *args):
                res = self.db.get(query, *args)
 
@@ -96,6 +100,84 @@ class Jobs(base.Object):
                        await asyncio.gather(*(job.depcheck() for job in jobs))
 
 
+class Queue(base.Object):
+       def __iter__(self):
+               jobs = self.backend.jobs._get_jobs("""
+                       SELECT
+                               jobs.*
+                       FROM
+                               job_queue
+                       LEFT JOIN
+                               jobs ON job_queue.job_id = jobs.id
+                       """)
+
+               return iter(jobs)
+
+       def __len__(self):
+               res = self.db.get("""
+                       SELECT
+                               COUNT(*) AS len
+                       FROM
+                               job_queue
+                       """)
+
+               if res:
+                       return res.len
+
+               return 0
+
+       def pop(self, builder):
+               """
+                       Returns the next build job that matches the given architectures
+               """
+               return self.backend.jobs._get_job("""
+                       SELECT
+                               jobs.*
+                       FROM
+                               job_queue
+                       LEFT JOIN
+                               jobs ON job_queue.job_id = jobs.id
+                       WHERE
+                               job_queue.arch = ANY(%s)
+                       LIMIT 1""",
+                       builder.supported_arches,
+               )
+
+       async def dispatch_jobs(self):
+               """
+                       Will be called regularly and will dispatch any pending jobs to any
+                       available builders
+               """
+               log.debug("Dispatching jobs...")
+
+               # Process all builders and assign jobs
+               # We prioritize builders with fewer jobs
+               for builder in sorted(self.backend.builders.connections, key=lambda b: len(b.jobs)):
+                       log.debug("  Processing builder %s" % builder)
+
+                       with self.backend.db.transaction():
+                               if not builder.is_ready():
+                                       log.debug("  Builder %s is not ready" % builder)
+                                       continue
+
+                               # We are ready for a new job
+                               job = self.pop(builder)
+                               if job:
+                                       try:
+                                               builder.dispatch_job(job)
+
+                                       # Ignore if the builder suddenly went away
+                                       except BuilderNotOnlineError:
+                                               pass
+
+                                       continue
+
+                               log.debug("  No jobs processable for %s" % builder)
+
+                               # If there is no job for the builder, we might as well shut it down
+                               await builder.stop()
+
+
 class Job(base.DataObject):
        table = "jobs"
 
@@ -354,7 +436,7 @@ class Job(base.DataObject):
                        self._set_attribute("aborted_by", user)
 
                # Dispatch more jobs
-               await self.backend.jobqueue.dispatch_jobs()
+               await self.backend.jobs.queue.dispatch_jobs()
 
        def is_aborted(self):
                """
index 3fe8ffb87170c7b38b64914f4156d7a6781e6278..230391db5551827f6bf7f2b305f4be23fc622def 100644 (file)
@@ -28,7 +28,7 @@
 
        <section class="section has-text-centered">
                <div class="container">
-                       {% set q = len(backend.jobqueue) %}
+                       {% set q = len(backend.jobs.queue) %}
 
                        <a href="/queue">
                                {{ _("One Job In Queue", "%(num)s Jobs In Queue", q) % { "num" : q } }}
index e4e1ef5838a3f05bf679ed061b9a1f1a6e7c2a74..af67fe76dd4821f0661375e8487dccdf8bf43af3 100644 (file)
@@ -15,7 +15,7 @@ class APIv1ControlHandler(base.APIMixin, tornado.websocket.WebSocketHandler):
                self.current_user.connected(self)
 
                # After the builder has connected, try to dispatch some jobs
-               await self.backend.jobqueue.dispatch_jobs()
+               await self.backend.jobs.queue.dispatch_jobs()
 
        def on_ping(self, data):
                log.debug("%s has pinged us" % self.builder)
index 781c92636c749d42c9b512b44161d66c8a582786..2c97830be4fd837100a01ac6c9611156e4f72392 100644 (file)
@@ -56,7 +56,7 @@ class APIv1IndexHandler(base.APIMixin, base.BaseHandler):
                await self.backend.jobs.depcheck(build.jobs)
 
                # Try to dispatch jobs
-               await self.backend.jobqueue.dispatch_jobs()
+               await self.backend.jobs.queue.dispatch_jobs()
 
 
 class IndexHandler(base.BaseHandler):
index 86cb273b774267765c4e2bd6d88aa8774c13cef3..3e624ea1b1378b7c564fe23804986c371e6ba129 100644 (file)
@@ -81,7 +81,7 @@ class APIv1DetailHandler(base.APIMixin, tornado.websocket.WebSocketHandler):
                        await self.job.finished(success=success, logfile=logfile, packages=packages)
 
                # Try to dispatch the next job
-               await self.backend.jobqueue.dispatch_jobs()
+               await self.backend.jobs.queue.dispatch_jobs()
 
 
 class APIv1LogStreamHandler(base.BackendMixin, tornado.websocket.WebSocketHandler):
@@ -119,7 +119,7 @@ class APIv1LogStreamHandler(base.BackendMixin, tornado.websocket.WebSocketHandle
 
 class QueueHandler(base.BaseHandler):
        def get(self):
-               self.render("queue.html", queue=self.backend.jobqueue)
+               self.render("queue.html", queue=self.backend.jobs.queue)
 
 
 class LogHandler(base.BaseHandler):
index 680c5d215b90aceb68a1a51d3e0de7722fc6a35e..56af86ddbe071e0ffee069d33f7ac01c947f41f4 100755 (executable)
@@ -91,38 +91,38 @@ class BuildTestCase(test.TestCase):
 
                # There should be no jobs in the queue yet, because the dependency check
                # has not been finished, yet
-               self.assertEqual(len(self.backend.jobqueue), 0)
+               self.assertEqual(len(self.backend.jobs.queue), 0)
 
                # Pretend the dependency check was successful
                for job in build.jobs:
                        job._set_attribute("depcheck_succeeded", True)
 
                # There should now be two jobs in the queue
-               self.assertEqual(len(self.backend.jobqueue), 2)
+               self.assertEqual(len(self.backend.jobs.queue), 2)
 
                # Assign a job to a builder
                job1.assign(self.builder)
 
                # There should be one job left
-               self.assertEqual(len(self.backend.jobqueue), 1)
+               self.assertEqual(len(self.backend.jobs.queue), 1)
 
                # Let the job fail
                await job1.finished(success=False)
 
                # There should still be only one job
-               self.assertEqual(len(self.backend.jobqueue), 1)
+               self.assertEqual(len(self.backend.jobs.queue), 1)
 
                # Assign the second job
                job2.assign(self.builder)
 
                # The queue should now be empty
-               self.assertEqual(len(self.backend.jobqueue), 0)
+               self.assertEqual(len(self.backend.jobs.queue), 0)
 
                # Pretend the job finished successfully
                await job2.finished(success=True)
 
                # The queue should still be empty
-               self.assertEqual(len(self.backend.jobqueue), 0)
+               self.assertEqual(len(self.backend.jobs.queue), 0)
 
        async def test_watchers(self):
                """