From: Denis Laxalde Date: Mon, 15 Nov 2021 08:43:04 +0000 (+0100) Subject: Factor out a _stop_workers() method on connection pool classes X-Git-Tag: pool-3.1~45^2~6 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=bcf1d5619d1d3b61810f43170b4b399626e3dced;p=thirdparty%2Fpsycopg.git Factor out a _stop_workers() method on connection pool classes The method is symmetrical with open(). In particular, we take advantage of this method in ConnectionPool.__del__() to remove some duplication. --- diff --git a/psycopg_pool/psycopg_pool/pool.py b/psycopg_pool/psycopg_pool/pool.py index 0211424ae..f92de0bed 100644 --- a/psycopg_pool/psycopg_pool/pool.py +++ b/psycopg_pool/psycopg_pool/pool.py @@ -11,7 +11,7 @@ from time import monotonic from queue import Queue, Empty from types import TracebackType from typing import Any, Callable, Dict, Iterator, List -from typing import Optional, Type +from typing import Optional, Sequence, Type from weakref import ref from contextlib import contextmanager @@ -62,16 +62,7 @@ class ConnectionPool(BasePool[Connection[Any]]): if getattr(self, "_closed", True): return - # Things we can try to do on a best-effort basis while the world - # is crumbling (a-la Eternal Sunshine of the Spotless Mind) - # At worse we put an item in a queue that is being deleted. - - # Stop the scheduler - self._sched.enter(0, None) - - # Stop the worker threads - for i in range(len(self._workers)): - self.run_task(StopWorker(self)) + self._stop_workers() def wait(self, timeout: float = 30.0) -> None: """ @@ -289,11 +280,19 @@ class ConnectionPool(BasePool[Connection[Any]]): # Take waiting client and pool connections out of the state waiting = list(self._waiting) self._waiting.clear() - pool = list(self._pool) + connections = list(self._pool) self._pool.clear() # Now that the flag _closed is set, getconn will fail immediately, # putconn will just close the returned connection. + self._stop_workers(waiting, connections, timeout) + + def _stop_workers( + self, + waiting_clients: Sequence["WaitingClient"] = (), + connections: Sequence[Connection[Any]] = (), + timeout: float = 0, + ) -> None: # Stop the scheduler self._sched.enter(0, None) @@ -304,11 +303,11 @@ class ConnectionPool(BasePool[Connection[Any]]): self.run_task(StopWorker(self)) # Signal to eventual clients in the queue that business is closed. - for pos in waiting: + for pos in waiting_clients: pos.fail(PoolClosed(f"the pool {self.name!r} is closed")) # Close the connections still in the pool - for conn in pool: + for conn in connections: conn.close() # Wait for the worker threads to terminate diff --git a/psycopg_pool/psycopg_pool/pool_async.py b/psycopg_pool/psycopg_pool/pool_async.py index 6c140c09c..c954898b1 100644 --- a/psycopg_pool/psycopg_pool/pool_async.py +++ b/psycopg_pool/psycopg_pool/pool_async.py @@ -11,7 +11,7 @@ from abc import ABC, abstractmethod from time import monotonic from types import TracebackType from typing import Any, AsyncIterator, Awaitable, Callable -from typing import Dict, List, Optional, Type +from typing import Dict, List, Optional, Sequence, Type from weakref import ref from psycopg import errors as e @@ -229,12 +229,19 @@ class AsyncConnectionPool(BasePool[AsyncConnection[Any]]): # Take waiting client and pool connections out of the state waiting = list(self._waiting) self._waiting.clear() - pool = list(self._pool) + connections = list(self._pool) self._pool.clear() # Now that the flag _closed is set, getconn will fail immediately, # putconn will just close the returned connection. + await self._stop_workers(waiting, connections, timeout) + async def _stop_workers( + self, + waiting_clients: Sequence["AsyncClient"] = (), + connections: Sequence[AsyncConnection[Any]] = (), + timeout: float = 0, + ) -> None: # Stop the scheduler await self._sched.enter(0, None) @@ -244,11 +251,11 @@ class AsyncConnectionPool(BasePool[AsyncConnection[Any]]): self.run_task(StopWorker(self)) # Signal to eventual clients in the queue that business is closed. - for pos in waiting: + for pos in waiting_clients: await pos.fail(PoolClosed(f"the pool {self.name!r} is closed")) # Close the connections still in the pool - for conn in pool: + for conn in connections: await conn.close() # Wait for the worker tasks to terminate