return await self.db.fetch_one(stmt)
- @property
- def connected(self):
- """
- Returns all connected builders
- """
- # We do this so that we get a fresh builder object just in case
- # some configuration has changed in the database.
- for builder in self:
- # Skip disabled builders
- if not builder.enabled:
- continue
-
- # Skip all builders that don't have an open connection
- if not builder in self.connections:
- continue
-
- yield builder
-
async def autoscale(self, wait=False):
"""
This method performs two tasks:
# Clear the request
self._dispatch_requested.clear()
- # Create a queue for all builders
- builders = queue.SimpleQueue()
+ builders = []
# Add all builders to the queue in ascending order of running jobs
- for builder in sorted(self.backend.builders.connected, key=lambda b: len(b.jobs)):
- # Skip builders that are already full
- if builder.is_full():
+ async for builder in self.backend.builders:
+ # Skip any builders that are not enabled
+ if not builder.enabled:
continue
- builders.put(builder)
+ # Skip any builders that are not connected
+ elif not builder.is_connected():
+ continue
+
+ # Skip any builders that are already full
+ elif builder.is_full():
+ continue
+
+ # Fetch all jobs this builder is currently running
+ jobs = await builder.get_jobs()
+
+ # Add the builder to the list and include the number of running jobs
+ builders.append(
+ len(jobs), builder,
+ )
+
+ # Create a queue for all builders and put those
+ # that have the fewest jobs at the beginning.
+ builders = queue.SimpleQueue(
+ [builder for jobs, builder in sorted(builders)],
+ )
# Run for as long as we have unprocessed builders
while not builders.empty():