# XXX max queue length
threshold = datetime.timedelta(minutes=5)
- # Fetch all builders
- builders = [b async for b in self]
-
- # Fetch the priority for each builder
- builders = dict(
- zip(builders, await asyncio.gather(
- *(b._autoscale_sort() for b in builders),
- )),
- )
+ builders = {}
+
+ # Fetch all builders and their value for sorting
+ async with asyncio.TaskGroup() as tg:
+ async for builder in self:
+ task = tg.create_task(
+ builder._autoscale_sort(),
+ )
+
+ builders[builder] = task
+
+ # Fetch the results
+ builders = { builder : builders[builder].result() for builder in builders }
# Sort all builders after their priority
builders = sorted(builders, key=lambda b: (-builders[b], b))
# Run through all build jobs and allocate them to a builder.
# If a builder is full (i.e. reaches the threshold of its build time),
# we move on to the next builder until that is full and so on.
- for job in self.backend.jobs.queue:
+ async for job in self.backend.jobs.queue:
log.debug("Processing job %s..." % job)
for builder in builders:
# Find all builders that are no longer needed and can be shut down
builders_to_be_shut_down = [
- builder for builder in builders if not queue[builder] and builder.is_idle()
+ builder for builder in builders if not queue[builder] and await builder.is_idle()
]
async with asyncio.TaskGroup() as tasks:
continue
# Skip any builders that are already full
- elif builder.is_full():
+ elif await builder.is_full():
continue
# Add it to the queue
# If the builder has not any free slots remaining, we put it back in the
# queue to fill it up, but we give priority to builders who have fewer
# jobs to run.
- if not builder.is_full():
+ if not await builder.is_full():
builders.put(builder)
# Auto-scale builders
"builds:create-test-builds" : self._builds_create_test_builds,
"builds:reverse-requires" : self._builds_reverse_requires,
+ # Builders
+ "builders:autoscale" : self.backend.builders.autoscale,
+
# Certificates
"load-certificate" : self.backend.load_certificate,