from . import base
from . import builders
+from . import misc
from . import users
from .constants import *
"""
Called to launch all given jobs
"""
- if not jobs:
- return
-
- tasks = []
-
- async with asyncio.TaskGroup() as tg:
- for job in jobs:
- task = tg.create_task(job.installcheck())
- tasks.append(task)
+ # Group jobs by their build repository
+ repos = misc.group(jobs, lambda job: job.build.build_repo)
- # Try to dispatch any jobs afterwards if at least one task returned True
- for task in tasks:
- if not task.result():
- continue
+ # Run the dependency check for each repository concurrently
+ async with asyncio.TaskGroup() as tasks:
+ for repo in repos:
+ tasks.create_task(
+ repo.installcheck(repos[repo])
+ )
- await self.backend.jobs.queue.dispatch()
- break
class Queue(base.Object):
+ # Locked when the queue is being processed
+ lock = asyncio.Lock()
+
def init(self):
self.db.execute("""
CREATE TEMPORARY VIEW job_queue AS
Will be called regularly and will dispatch any pending jobs to any
available builders
"""
- log.debug("Dispatching jobs...")
-
- # Create a queue for all builders
- builders = queue.SimpleQueue()
+ # Just request dispatching when we are already running
+ if self.lock.locked():
+ self._dispatch_requested.set()
+ return
- # Add all builders to the queue in ascending order of running jobs
- for builder in sorted(self.backend.builders.connected, key=lambda b: len(b.jobs)):
- # Skip builders that are already full
- if builder.is_full():
- continue
+ # Schedule dispatch
+ self.backend.run_task(self._dispatch)
- builders.put(builder)
+ # Set when a dispatch is requested
+ _dispatch_requested = asyncio.Event()
- # Run for as long as we have unprocessed builders
- while not builders.empty():
- # Take the first builder
- builder = builders.get()
+ async def _dispatch(self):
+ async with self.lock:
+ log.debug("Dispatching jobs...")
- log.debug(" Processing builder %s" % builder)
+ # Clear the request
+ self._dispatch_requested.clear()
- with self.backend.db.transaction():
- # We are ready for a new job
- job = self.pop(builder)
+ # Create a queue for all builders
+ builders = queue.SimpleQueue()
- # If no job could be found for this builder, we simply continue
- if not job:
- log.debug(" No jobs processable for %s" % builder)
+ # Add all builders to the queue in ascending order of running jobs
+ for builder in sorted(self.backend.builders.connected, key=lambda b: len(b.jobs)):
+ # Skip builders that are already full
+ if builder.is_full():
continue
- # If we have a job, we dispatch it to the builder
- try:
- builder.dispatch_job(job)
+ builders.put(builder)
- # Ignore if the builder suddenly went away
- except BuilderNotOnlineError:
- continue
+ # Run for as long as we have unprocessed builders
+ while not builders.empty():
+ # Take the first builder
+ builder = builders.get()
- # If the builder has not any free slots remaining, we put it back in the
- # queue to fill it up, but we give priority to builders who have fewer
- # jobs to run.
- if not builder.is_full():
- builders.put(builder)
+ log.debug(" Processing builder %s" % builder)
- # Auto-scale builders
- await self.backend.builders.autoscale()
+ with self.backend.db.transaction():
+ # We are ready for a new job
+ job = self.pop(builder)
+
+ # If no job could be found for this builder, we simply continue
+ if not job:
+ log.debug(" No jobs processable for %s" % builder)
+ continue
+
+ # If we have a job, we dispatch it to the builder
+ try:
+ builder.dispatch_job(job)
+
+ # Ignore if the builder suddenly went away
+ except BuilderNotOnlineError:
+ continue
+
+ # If the builder has not any free slots remaining, we put it back in the
+ # queue to fill it up, but we give priority to builders who have fewer
+ # jobs to run.
+ if not builder.is_full():
+ builders.put(builder)
+
+ # Auto-scale builders
+ await self.backend.builders.autoscale()
+
+ # Did we get asked to run again?
+ if self._dispatch_requested.is_set():
+ await self._dispatch()
class Job(base.DataObject):
self._set_attribute("aborted_by", user)
# Try to dispatch more jobs in the background
- self.backend.run_task(self.backend.jobs.queue.dispatch)
+ await self.backend.jobs.queue.dispatch()
def is_aborted(self):
"""
"""
Perform install check
"""
- return await self.build.build_repo.installcheck([self])
+ await self.build.build_repo.installcheck([self])
def _installcheck(self, p):
"""
# Store the timestamp
self._set_attribute_now("installcheck_performed_at")
- # Return the status
- return self.installcheck_succeeded
-
@property
def installcheck_succeeded(self):
return self.data.installcheck_succeeded