src/buildservice/errors.py \
src/buildservice/events.py \
src/buildservice/git.py \
- src/buildservice/jobqueue.py \
src/buildservice/jobs.py \
src/buildservice/keys.py \
src/buildservice/logstreams.py \
from . import database
from . import distribution
from . import events
-from . import jobqueue
from . import jobs
from . import keys
from . import logstreams
self.builders = builders.Builders(self)
self.distros = distribution.Distributions(self)
self.events = events.Events(self)
- self.jobqueue = jobqueue.JobQueue(self)
self.keys = keys.Keys(self)
self.logstreams = logstreams.LogStreams(self)
self.messages = messages.Messages(self)
# Run through all build jobs and allocate them to a builder.
# If a builder is full (i.e. reaches the threshold of its build time),
# we move on to the next builder until that is full and so on.
- for job in self.backend.jobqueue:
+ for job in self.backend.jobs.queue:
log.debug("Processing job %s..." % job)
for builder in builders:
+++ /dev/null
-#!/usr/bin/python
-
-import logging
-
-from . import base
-from .errors import *
-
-# Setup logging
-log = logging.getLogger("pbs.jobqueue")
-
-class JobQueue(base.Object):
- def __iter__(self):
- jobs = self.backend.jobs._get_jobs("SELECT jobs.* FROM job_queue queue \
- LEFT JOIN jobs ON queue.job_id = jobs.id")
-
- return iter(jobs)
-
- def __len__(self):
- res = self.db.get("SELECT COUNT(*) AS len FROM job_queue")
-
- return res.len
-
- def pop(self, builder):
- """
- Returns the next build job that matches the given architectures
- """
- return self.backend.jobs._get_job("""
- SELECT
- jobs.*
- FROM
- job_queue queue
- LEFT JOIN
- jobs ON queue.job_id = jobs.id
- WHERE
- queue.arch = ANY(%s)
- LIMIT 1""",
- builder.supported_arches,
- )
-
- async def dispatch_jobs(self):
- """
- Will be called regularly and will dispatch any pending jobs to any
- available builders
- """
- log.debug("Dispatching jobs...")
-
- # Process all builders and assign jobs
- # We prioritize builders with fewer jobs
- for builder in sorted(self.backend.builders.connections, key=lambda b: len(b.jobs)):
- log.debug(" Processing builder %s" % builder)
-
- with self.backend.db.transaction():
- if not builder.is_ready():
- log.debug(" Builder %s is not ready" % builder)
- continue
-
- # We are ready for a new job
- job = self.pop(builder)
- if job:
- try:
- builder.dispatch_job(job)
-
- # Ignore if the builder suddenly went away
- except BuilderNotOnlineError:
- pass
-
- continue
-
- log.debug(" No jobs processable for %s" % builder)
-
- # If there is no job for the builder, we might as well shut it down
- await builder.stop()
log = logging.getLogger("pbs.jobs")
class Jobs(base.Object):
+ def init(self):
+ # Setup queue
+ self.queue = Queue(self.backend)
+
def _get_job(self, query, *args):
res = self.db.get(query, *args)
await asyncio.gather(*(job.depcheck() for job in jobs))
+class Queue(base.Object):
+ def __iter__(self):
+ jobs = self.backend.jobs._get_jobs("""
+ SELECT
+ jobs.*
+ FROM
+ job_queue
+ LEFT JOIN
+ jobs ON job_queue.job_id = jobs.id
+ """)
+
+ return iter(jobs)
+
+ def __len__(self):
+ res = self.db.get("""
+ SELECT
+ COUNT(*) AS len
+ FROM
+ job_queue
+ """)
+
+ if res:
+ return res.len
+
+ return 0
+
+ def pop(self, builder):
+ """
+ Returns the next build job that matches the given architectures
+ """
+ return self.backend.jobs._get_job("""
+ SELECT
+ jobs.*
+ FROM
+ job_queue
+ LEFT JOIN
+ jobs ON job_queue.job_id = jobs.id
+ WHERE
+ job_queue.arch = ANY(%s)
+ LIMIT 1""",
+ builder.supported_arches,
+ )
+
+ async def dispatch_jobs(self):
+ """
+ Will be called regularly and will dispatch any pending jobs to any
+ available builders
+ """
+ log.debug("Dispatching jobs...")
+
+ # Process all builders and assign jobs
+ # We prioritize builders with fewer jobs
+ for builder in sorted(self.backend.builders.connections, key=lambda b: len(b.jobs)):
+ log.debug(" Processing builder %s" % builder)
+
+ with self.backend.db.transaction():
+ if not builder.is_ready():
+ log.debug(" Builder %s is not ready" % builder)
+ continue
+
+ # We are ready for a new job
+ job = self.pop(builder)
+ if job:
+ try:
+ builder.dispatch_job(job)
+
+ # Ignore if the builder suddenly went away
+ except BuilderNotOnlineError:
+ pass
+
+ continue
+
+ log.debug(" No jobs processable for %s" % builder)
+
+ # If there is no job for the builder, we might as well shut it down
+ await builder.stop()
+
+
class Job(base.DataObject):
table = "jobs"
self._set_attribute("aborted_by", user)
# Dispatch more jobs
- await self.backend.jobqueue.dispatch_jobs()
+ await self.backend.jobs.queue.dispatch_jobs()
def is_aborted(self):
"""
<section class="section has-text-centered">
<div class="container">
- {% set q = len(backend.jobqueue) %}
+ {% set q = len(backend.jobs.queue) %}
<a href="/queue">
{{ _("One Job In Queue", "%(num)s Jobs In Queue", q) % { "num" : q } }}
self.current_user.connected(self)
# After the builder has connected, try to dispatch some jobs
- await self.backend.jobqueue.dispatch_jobs()
+ await self.backend.jobs.queue.dispatch_jobs()
def on_ping(self, data):
log.debug("%s has pinged us" % self.builder)
await self.backend.jobs.depcheck(build.jobs)
# Try to dispatch jobs
- await self.backend.jobqueue.dispatch_jobs()
+ await self.backend.jobs.queue.dispatch_jobs()
class IndexHandler(base.BaseHandler):
await self.job.finished(success=success, logfile=logfile, packages=packages)
# Try to dispatch the next job
- await self.backend.jobqueue.dispatch_jobs()
+ await self.backend.jobs.queue.dispatch_jobs()
class APIv1LogStreamHandler(base.BackendMixin, tornado.websocket.WebSocketHandler):
class QueueHandler(base.BaseHandler):
def get(self):
- self.render("queue.html", queue=self.backend.jobqueue)
+ self.render("queue.html", queue=self.backend.jobs.queue)
class LogHandler(base.BaseHandler):
# There should be no jobs in the queue yet, because the dependency check
# has not been finished, yet
- self.assertEqual(len(self.backend.jobqueue), 0)
+ self.assertEqual(len(self.backend.jobs.queue), 0)
# Pretend the dependency check was successful
for job in build.jobs:
job._set_attribute("depcheck_succeeded", True)
# There should now be two jobs in the queue
- self.assertEqual(len(self.backend.jobqueue), 2)
+ self.assertEqual(len(self.backend.jobs.queue), 2)
# Assign a job to a builder
job1.assign(self.builder)
# There should be one job left
- self.assertEqual(len(self.backend.jobqueue), 1)
+ self.assertEqual(len(self.backend.jobs.queue), 1)
# Let the job fail
await job1.finished(success=False)
# There should still be only one job
- self.assertEqual(len(self.backend.jobqueue), 1)
+ self.assertEqual(len(self.backend.jobs.queue), 1)
# Assign the second job
job2.assign(self.builder)
# The queue should now be empty
- self.assertEqual(len(self.backend.jobqueue), 0)
+ self.assertEqual(len(self.backend.jobs.queue), 0)
# Pretend the job finished successfully
await job2.finished(success=True)
# The queue should still be empty
- self.assertEqual(len(self.backend.jobqueue), 0)
+ self.assertEqual(len(self.backend.jobs.queue), 0)
async def test_watchers(self):
"""