try:
await asyncio.wait_for(self._pool_full_event.wait(), timeout)
except asyncio.TimeoutError:
- await self.close() # stop all the threads
+ await self.close() # stop all the tasks
raise PoolTimeout(
f"pool initialization incomplete after {timeout} sec"
) from None
self._waiting.append(pos)
self._stats[self._REQUESTS_QUEUED] += 1
- # Allow only one thread at time to grow the pool (or returning
+ # Allow only one task at time to grow the pool (or returning
# connections might be starved).
if self._nconns < self._max_size and not self._growing:
self._nconns += 1
await conn.close()
return
- # Use a worker to perform eventual maintenance work in a separate thread
+ # Use a worker to perform eventual maintenance work in a separate task
if self._reset:
self.run_task(ReturnConnection(self, conn))
else:
# Stop the scheduler
await self._sched.enter(0, None)
- # Stop the worker threads
+ # Stop the worker tasks
for w in self._workers:
self.run_task(StopWorker(self))
for conn in pool:
await conn.close()
- # Wait for the worker threads to terminate
+ # Wait for the worker tasks to terminate
wait = asyncio.gather(self._sched_runner, *self._workers)
try:
if timeout > 0:
self._reconnect_failed(self)
def run_task(self, task: "MaintenanceTask") -> None:
- """Run a maintenance task in a worker thread."""
+ """Run a maintenance task in a worker."""
self._tasks.put_nowait(task)
async def schedule_task(
self, task: "MaintenanceTask", delay: float
) -> None:
- """Run a maintenance task in a worker thread in the future."""
+ """Run a maintenance task in a worker in the future."""
await self._sched.enter(delay, task.tick)
@classmethod
async def worker(cls, q: "asyncio.Queue[MaintenanceTask]") -> None:
"""Runner to execute pending maintenance task.
- The function is designed to run as a separate thread.
+ The function is designed to run as a task.
Block on the queue *q*, run a task received. Finish running if a
StopWorker is received.
async def run(self) -> None:
"""Run the task.
- This usually happens in a worker thread. Call the concrete _run()
+ This usually happens in a worker. Call the concrete _run()
implementation, if the pool is still alive.
"""
pool = self.pool()
async def tick(self) -> None:
"""Run the scheduled task
- This function is called by the scheduler thread. Use a worker to
+ This function is called by the scheduler task. Use a worker to
run the task for real in order to free the scheduler immediately.
"""
pool = self.pool()
class StopWorker(MaintenanceTask):
- """Signal the maintenance thread to terminate."""
+ """Signal the maintenance worker to terminate."""
async def _run(self, pool: "AsyncConnectionPool") -> None:
pass