from queue import Queue, Empty
from types import TracebackType
from typing import Any, Callable, Dict, Iterator, List
-from typing import Optional, Type
+from typing import Optional, Sequence, Type
from weakref import ref
from contextlib import contextmanager
if getattr(self, "_closed", True):
return
- # Things we can try to do on a best-effort basis while the world
- # is crumbling (a-la Eternal Sunshine of the Spotless Mind)
- # At worse we put an item in a queue that is being deleted.
-
- # Stop the scheduler
- self._sched.enter(0, None)
-
- # Stop the worker threads
- for i in range(len(self._workers)):
- self.run_task(StopWorker(self))
+ self._stop_workers()
def wait(self, timeout: float = 30.0) -> None:
"""
# Take waiting client and pool connections out of the state
waiting = list(self._waiting)
self._waiting.clear()
- pool = list(self._pool)
+ connections = list(self._pool)
self._pool.clear()
# Now that the flag _closed is set, getconn will fail immediately,
# putconn will just close the returned connection.
+ self._stop_workers(waiting, connections, timeout)
+
+ def _stop_workers(
+ self,
+ waiting_clients: Sequence["WaitingClient"] = (),
+ connections: Sequence[Connection[Any]] = (),
+ timeout: float = 0,
+ ) -> None:
# Stop the scheduler
self._sched.enter(0, None)
self.run_task(StopWorker(self))
# Signal to eventual clients in the queue that business is closed.
- for pos in waiting:
+ for pos in waiting_clients:
pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
# Close the connections still in the pool
- for conn in pool:
+ for conn in connections:
conn.close()
# Wait for the worker threads to terminate
from time import monotonic
from types import TracebackType
from typing import Any, AsyncIterator, Awaitable, Callable
-from typing import Dict, List, Optional, Type
+from typing import Dict, List, Optional, Sequence, Type
from weakref import ref
from psycopg import errors as e
# Take waiting client and pool connections out of the state
waiting = list(self._waiting)
self._waiting.clear()
- pool = list(self._pool)
+ connections = list(self._pool)
self._pool.clear()
# Now that the flag _closed is set, getconn will fail immediately,
# putconn will just close the returned connection.
+ await self._stop_workers(waiting, connections, timeout)
+ async def _stop_workers(
+ self,
+ waiting_clients: Sequence["AsyncClient"] = (),
+ connections: Sequence[AsyncConnection[Any]] = (),
+ timeout: float = 0,
+ ) -> None:
# Stop the scheduler
await self._sched.enter(0, None)
self.run_task(StopWorker(self))
# Signal to eventual clients in the queue that business is closed.
- for pos in waiting:
+ for pos in waiting_clients:
await pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
# Close the connections still in the pool
- for conn in pool:
+ for conn in connections:
await conn.close()
# Wait for the worker tasks to terminate