From 956191ed6749a7bd655f2f548edb58f70249ff6d Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Wed, 22 Jan 2025 13:00:31 +0000 Subject: [PATCH] builders: Refactor dispatching jobs Signed-off-by: Michael Tremer --- src/buildservice/builders.py | 18 ------------------ src/buildservice/jobs.py | 31 +++++++++++++++++++++++++------ 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/src/buildservice/builders.py b/src/buildservice/builders.py index f3fc80c7..fd2930fa 100644 --- a/src/buildservice/builders.py +++ b/src/buildservice/builders.py @@ -211,24 +211,6 @@ class Builders(base.Object): return await self.db.fetch_one(stmt) - @property - def connected(self): - """ - Returns all connected builders - """ - # We do this so that we get a fresh builder object just in case - # some configuration has changed in the database. - for builder in self: - # Skip disabled builders - if not builder.enabled: - continue - - # Skip all builders that don't have an open connection - if not builder in self.connections: - continue - - yield builder - async def autoscale(self, wait=False): """ This method performs two tasks: diff --git a/src/buildservice/jobs.py b/src/buildservice/jobs.py index 284c1cfc..2f9e644a 100644 --- a/src/buildservice/jobs.py +++ b/src/buildservice/jobs.py @@ -297,16 +297,35 @@ class Queue(base.Object): # Clear the request self._dispatch_requested.clear() - # Create a queue for all builders - builders = queue.SimpleQueue() + builders = [] # Add all builders to the queue in ascending order of running jobs - for builder in sorted(self.backend.builders.connected, key=lambda b: len(b.jobs)): - # Skip builders that are already full - if builder.is_full(): + async for builder in self.backend.builders: + # Skip any builders that are not enabled + if not builder.enabled: continue - builders.put(builder) + # Skip any builders that are not connected + elif not builder.is_connected(): + continue + + # Skip any builders that are already full + elif builder.is_full(): + continue + + # Fetch all jobs this builder is currently running + jobs = await builder.get_jobs() + + # Add the builder to the list and include the number of running jobs + builders.append( + len(jobs), builder, + ) + + # Create a queue for all builders and put those + # that have the fewest jobs at the beginning. + builders = queue.SimpleQueue( + [builder for jobs, builder in sorted(builders)], + ) # Run for as long as we have unprocessed builders while not builders.empty(): -- 2.47.2