]> git.ipfire.org Git - pbs.git/commitdiff
jobs: Run dispatch less often and smarter
authorMichael Tremer <michael.tremer@ipfire.org>
Thu, 25 May 2023 09:41:45 +0000 (09:41 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Thu, 25 May 2023 09:41:45 +0000 (09:41 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/buildservice/jobs.py
src/buildservice/repository.py
src/web/builders.py
src/web/jobs.py

index 80303d1eb48f0450e329265b72e80b5c8a077cfc..38c78fc51477e37e8347a9884d43098871a4e836 100644 (file)
@@ -14,6 +14,7 @@ import pakfire.config
 
 from . import base
 from . import builders
+from . import misc
 from . import users
 
 from .constants import *
@@ -98,25 +99,21 @@ class Jobs(base.Object):
                """
                        Called to launch all given jobs
                """
-               if not jobs:
-                       return
-
-               tasks = []
-
-               async with asyncio.TaskGroup() as tg:
-                       for job in jobs:
-                               task = tg.create_task(job.installcheck())
-                               tasks.append(task)
+               # Group jobs by their build repository
+               repos = misc.group(jobs, lambda job: job.build.build_repo)
 
-               # Try to dispatch any jobs afterwards if at least one task returned True
-               for task in tasks:
-                       if not task.result():
-                               continue
+               # Run the dependency check for each repository concurrently
+               async with asyncio.TaskGroup() as tasks:
+                       for repo in repos:
+                               tasks.create_task(
+                                       repo.installcheck(repos[repo])
+                               )
 
-                       await self.backend.jobs.queue.dispatch()
-                       break
 
 class Queue(base.Object):
+       # Locked when the queue is being processed
+       lock = asyncio.Lock()
+
        def init(self):
                self.db.execute("""
                        CREATE TEMPORARY VIEW job_queue AS
@@ -194,51 +191,71 @@ class Queue(base.Object):
                        Will be called regularly and will dispatch any pending jobs to any
                        available builders
                """
-               log.debug("Dispatching jobs...")
-
-               # Create a queue for all builders
-               builders = queue.SimpleQueue()
+               # Just request dispatching when we are already running
+               if self.lock.locked():
+                       self._dispatch_requested.set()
+                       return
 
-               # Add all builders to the queue in ascending order of running jobs
-               for builder in sorted(self.backend.builders.connected, key=lambda b: len(b.jobs)):
-                       # Skip builders that are already full
-                       if builder.is_full():
-                               continue
+               # Schedule dispatch
+               self.backend.run_task(self._dispatch)
 
-                       builders.put(builder)
+       # Set when a dispatch is requested
+       _dispatch_requested = asyncio.Event()
 
-               # Run for as long as we have unprocessed builders
-               while not builders.empty():
-                       # Take the first builder
-                       builder = builders.get()
+       async def _dispatch(self):
+               async with self.lock:
+                       log.debug("Dispatching jobs...")
 
-                       log.debug("  Processing builder %s" % builder)
+                       # Clear the request
+                       self._dispatch_requested.clear()
 
-                       with self.backend.db.transaction():
-                               # We are ready for a new job
-                               job = self.pop(builder)
+                       # Create a queue for all builders
+                       builders = queue.SimpleQueue()
 
-                               # If no job could be found for this builder, we simply continue
-                               if not job:
-                                       log.debug("  No jobs processable for %s" % builder)
+                       # Add all builders to the queue in ascending order of running jobs
+                       for builder in sorted(self.backend.builders.connected, key=lambda b: len(b.jobs)):
+                               # Skip builders that are already full
+                               if builder.is_full():
                                        continue
 
-                               # If we have a job, we dispatch it to the builder
-                               try:
-                                       builder.dispatch_job(job)
+                               builders.put(builder)
 
-                               # Ignore if the builder suddenly went away
-                               except BuilderNotOnlineError:
-                                       continue
+                       # Run for as long as we have unprocessed builders
+                       while not builders.empty():
+                               # Take the first builder
+                               builder = builders.get()
 
-                       # If the builder has not any free slots remaining, we put it back in the
-                       # queue to fill it up, but we give priority to builders who have fewer
-                       # jobs to run.
-                       if not builder.is_full():
-                               builders.put(builder)
+                               log.debug("  Processing builder %s" % builder)
 
-               # Auto-scale builders
-               await self.backend.builders.autoscale()
+                               with self.backend.db.transaction():
+                                       # We are ready for a new job
+                                       job = self.pop(builder)
+
+                                       # If no job could be found for this builder, we simply continue
+                                       if not job:
+                                               log.debug("  No jobs processable for %s" % builder)
+                                               continue
+
+                                       # If we have a job, we dispatch it to the builder
+                                       try:
+                                               builder.dispatch_job(job)
+
+                                       # Ignore if the builder suddenly went away
+                                       except BuilderNotOnlineError:
+                                               continue
+
+                               # If the builder has not any free slots remaining, we put it back in the
+                               # queue to fill it up, but we give priority to builders who have fewer
+                               # jobs to run.
+                               if not builder.is_full():
+                                       builders.put(builder)
+
+                       # Auto-scale builders
+                       await self.backend.builders.autoscale()
+
+               # Did we get asked to run again?
+               if self._dispatch_requested.is_set():
+                       await self._dispatch()
 
 
 class Job(base.DataObject):
@@ -566,7 +583,7 @@ class Job(base.DataObject):
                        self._set_attribute("aborted_by", user)
 
                # Try to dispatch more jobs in the background
-               self.backend.run_task(self.backend.jobs.queue.dispatch)
+               await self.backend.jobs.queue.dispatch()
 
        def is_aborted(self):
                """
@@ -841,7 +858,7 @@ class Job(base.DataObject):
                """
                        Perform install check
                """
-               return await self.build.build_repo.installcheck([self])
+               await self.build.build_repo.installcheck([self])
 
        def _installcheck(self, p):
                """
@@ -881,9 +898,6 @@ class Job(base.DataObject):
                # Store the timestamp
                self._set_attribute_now("installcheck_performed_at")
 
-               # Return the status
-               return self.installcheck_succeeded
-
        @property
        def installcheck_succeeded(self):
                return self.data.installcheck_succeeded
index c9643d277696d00cce7aabcfed62df3931a0b650..5dd57d237475c9ff9cbe6c6290bef74e6bcfe4e7 100644 (file)
@@ -962,11 +962,10 @@ class Repository(base.DataObject):
                log.debug("%s: Relaunching pending jobs" % self)
 
                # Perform installcheck on all pending jobs
-               success = await self.installcheck(self.pending_jobs)
+               await self.installcheck(self.pending_jobs)
 
-               # If at least one job passed the check we will try to dispatch it
-               if success:
-                       self.backend.run_task(self.backend.jobs.queue.dispatch)
+               # Request dispatch
+               await self.backend.jobs.queue.dispatch()
 
        async def installcheck(self, jobs):
                """
index df0d149a51feb285f31b8148dfef6b60b77f42ce..b2b75ec0c3ea532fb6e0033715129ce60459de74 100644 (file)
@@ -132,7 +132,7 @@ class BuilderEditHandler(base.BaseHandler):
                        builder.max_jobs = self.get_argument_int("max_jobs")
 
                # Try to dispatch more jobs
-               self.backend.run_task(self.backend.jobs.queue.dispatch)
+               await self.backend.jobs.queue.dispatch()
 
                self.redirect("/builders/%s" % builder.hostname)
 
index 4e964a190d19d5644730ba254657d2f59826fa05..55821e9a6b911b5721ec66542db88b9eb8a7d30b 100644 (file)
@@ -91,7 +91,7 @@ class APIv1FinishedHandler(base.APIMixin, tornado.web.RequestHandler):
                                logfile=logfile, packages=packages)
 
                # Try to dispatch the next job
-               self.backend.run_task(self.backend.jobs.queue.dispatch)
+               await self.backend.jobs.queue.dispatch()
 
                # Launch any (test) builds
                if builds: