From: Michael Tremer Date: Thu, 25 May 2023 09:41:45 +0000 (+0000) Subject: jobs: Run dispatch less often and smarter X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=815e17d07347d0e0d73e7e77f95506757ac1fc90;p=pbs.git jobs: Run dispatch less often and smarter Signed-off-by: Michael Tremer --- diff --git a/src/buildservice/jobs.py b/src/buildservice/jobs.py index 80303d1e..38c78fc5 100644 --- a/src/buildservice/jobs.py +++ b/src/buildservice/jobs.py @@ -14,6 +14,7 @@ import pakfire.config from . import base from . import builders +from . import misc from . import users from .constants import * @@ -98,25 +99,21 @@ class Jobs(base.Object): """ Called to launch all given jobs """ - if not jobs: - return - - tasks = [] - - async with asyncio.TaskGroup() as tg: - for job in jobs: - task = tg.create_task(job.installcheck()) - tasks.append(task) + # Group jobs by their build repository + repos = misc.group(jobs, lambda job: job.build.build_repo) - # Try to dispatch any jobs afterwards if at least one task returned True - for task in tasks: - if not task.result(): - continue + # Run the dependency check for each repository concurrently + async with asyncio.TaskGroup() as tasks: + for repo in repos: + tasks.create_task( + repo.installcheck(repos[repo]) + ) - await self.backend.jobs.queue.dispatch() - break class Queue(base.Object): + # Locked when the queue is being processed + lock = asyncio.Lock() + def init(self): self.db.execute(""" CREATE TEMPORARY VIEW job_queue AS @@ -194,51 +191,71 @@ class Queue(base.Object): Will be called regularly and will dispatch any pending jobs to any available builders """ - log.debug("Dispatching jobs...") - - # Create a queue for all builders - builders = queue.SimpleQueue() + # Just request dispatching when we are already running + if self.lock.locked(): + self._dispatch_requested.set() + return - # Add all builders to the queue in ascending order of running jobs - for builder in sorted(self.backend.builders.connected, key=lambda b: len(b.jobs)): - # Skip builders that are already full - if builder.is_full(): - continue + # Schedule dispatch + self.backend.run_task(self._dispatch) - builders.put(builder) + # Set when a dispatch is requested + _dispatch_requested = asyncio.Event() - # Run for as long as we have unprocessed builders - while not builders.empty(): - # Take the first builder - builder = builders.get() + async def _dispatch(self): + async with self.lock: + log.debug("Dispatching jobs...") - log.debug(" Processing builder %s" % builder) + # Clear the request + self._dispatch_requested.clear() - with self.backend.db.transaction(): - # We are ready for a new job - job = self.pop(builder) + # Create a queue for all builders + builders = queue.SimpleQueue() - # If no job could be found for this builder, we simply continue - if not job: - log.debug(" No jobs processable for %s" % builder) + # Add all builders to the queue in ascending order of running jobs + for builder in sorted(self.backend.builders.connected, key=lambda b: len(b.jobs)): + # Skip builders that are already full + if builder.is_full(): continue - # If we have a job, we dispatch it to the builder - try: - builder.dispatch_job(job) + builders.put(builder) - # Ignore if the builder suddenly went away - except BuilderNotOnlineError: - continue + # Run for as long as we have unprocessed builders + while not builders.empty(): + # Take the first builder + builder = builders.get() - # If the builder has not any free slots remaining, we put it back in the - # queue to fill it up, but we give priority to builders who have fewer - # jobs to run. - if not builder.is_full(): - builders.put(builder) + log.debug(" Processing builder %s" % builder) - # Auto-scale builders - await self.backend.builders.autoscale() + with self.backend.db.transaction(): + # We are ready for a new job + job = self.pop(builder) + + # If no job could be found for this builder, we simply continue + if not job: + log.debug(" No jobs processable for %s" % builder) + continue + + # If we have a job, we dispatch it to the builder + try: + builder.dispatch_job(job) + + # Ignore if the builder suddenly went away + except BuilderNotOnlineError: + continue + + # If the builder has not any free slots remaining, we put it back in the + # queue to fill it up, but we give priority to builders who have fewer + # jobs to run. + if not builder.is_full(): + builders.put(builder) + + # Auto-scale builders + await self.backend.builders.autoscale() + + # Did we get asked to run again? + if self._dispatch_requested.is_set(): + await self._dispatch() class Job(base.DataObject): @@ -566,7 +583,7 @@ class Job(base.DataObject): self._set_attribute("aborted_by", user) # Try to dispatch more jobs in the background - self.backend.run_task(self.backend.jobs.queue.dispatch) + await self.backend.jobs.queue.dispatch() def is_aborted(self): """ @@ -841,7 +858,7 @@ class Job(base.DataObject): """ Perform install check """ - return await self.build.build_repo.installcheck([self]) + await self.build.build_repo.installcheck([self]) def _installcheck(self, p): """ @@ -881,9 +898,6 @@ class Job(base.DataObject): # Store the timestamp self._set_attribute_now("installcheck_performed_at") - # Return the status - return self.installcheck_succeeded - @property def installcheck_succeeded(self): return self.data.installcheck_succeeded diff --git a/src/buildservice/repository.py b/src/buildservice/repository.py index c9643d27..5dd57d23 100644 --- a/src/buildservice/repository.py +++ b/src/buildservice/repository.py @@ -962,11 +962,10 @@ class Repository(base.DataObject): log.debug("%s: Relaunching pending jobs" % self) # Perform installcheck on all pending jobs - success = await self.installcheck(self.pending_jobs) + await self.installcheck(self.pending_jobs) - # If at least one job passed the check we will try to dispatch it - if success: - self.backend.run_task(self.backend.jobs.queue.dispatch) + # Request dispatch + await self.backend.jobs.queue.dispatch() async def installcheck(self, jobs): """ diff --git a/src/web/builders.py b/src/web/builders.py index df0d149a..b2b75ec0 100644 --- a/src/web/builders.py +++ b/src/web/builders.py @@ -132,7 +132,7 @@ class BuilderEditHandler(base.BaseHandler): builder.max_jobs = self.get_argument_int("max_jobs") # Try to dispatch more jobs - self.backend.run_task(self.backend.jobs.queue.dispatch) + await self.backend.jobs.queue.dispatch() self.redirect("/builders/%s" % builder.hostname) diff --git a/src/web/jobs.py b/src/web/jobs.py index 4e964a19..55821e9a 100644 --- a/src/web/jobs.py +++ b/src/web/jobs.py @@ -91,7 +91,7 @@ class APIv1FinishedHandler(base.APIMixin, tornado.web.RequestHandler): logfile=logfile, packages=packages) # Try to dispatch the next job - self.backend.run_task(self.backend.jobs.queue.dispatch) + await self.backend.jobs.queue.dispatch() # Launch any (test) builds if builds: