return await self.db.fetch_one(stmt)
+ @property
+ def least_busy(self):
+ """
+ Returns all builders in order of busyness (i.e. running the least jobs)
+ """
+ builder_jobs = (
+ sqlalchemy
+ .select(
+ Builder.id.label("builder_id"),
+ sqlalchemy.func.count().label("running_jobs"),
+ )
+ .join(
+ jobs.Job,
+ jobs.Job.builder_id == Builder.id,
+ )
+ .where(
+ # Jobs cannot be deleted
+ jobs.Job.deleted_at == None,
+
+ # Jobs must be running
+ jobs.Job.started_at != None,
+ jobs.Job.finished_at == None,
+ )
+ .group_by(
+ Builder.id,
+ )
+ .cte("builder_jobs")
+ )
+
+ stmt = (
+ sqlalchemy
+ .select(
+ Builder,
+ )
+ .select_from(
+ builder_jobs,
+ )
+ .join(
+ Builder,
+ Builder.id == builder_jobs.c.builder_id,
+ )
+ .order_by(
+ (
+ builder_jobs.c.running_jobs
+ /
+ Builder.max_jobs
+ ).asc(),
+ )
+ )
+
+ return self.db.fetch(stmt)
+
async def autoscale(self, wait=False):
"""
This method performs two tasks:
# Clear the request
self._dispatch_requested.clear()
- builders = []
+ builders = queue.SimpleQueue()
# Add all builders to the queue in ascending order of running jobs
- async for builder in self.backend.builders:
+ async for builder in self.backend.builders.least_busy:
# Skip any builders that are not enabled
if not builder.enabled:
continue
# Skip any builders that are not connected
- elif not builder.is_connected():
+ elif not builder.is_online():
continue
# Skip any builders that are already full
elif builder.is_full():
continue
- # Fetch all jobs this builder is currently running
- jobs = await builder.get_jobs()
-
- # Add the builder to the list and include the number of running jobs
- builders.append(
- len(jobs), builder,
- )
-
- # Create a queue for all builders and put those
- # that have the fewest jobs at the beginning.
- builders = queue.SimpleQueue(
- [builder for jobs, builder in sorted(builders)],
- )
+ # Add it to the queue
+ builders.put(builder)
# Run for as long as we have unprocessed builders
while not builders.empty():