]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Factor out a _stop_workers() method on connection pool classes
authorDenis Laxalde <denis.laxalde@dalibo.com>
Mon, 15 Nov 2021 08:43:04 +0000 (09:43 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Mon, 3 Jan 2022 15:41:10 +0000 (16:41 +0100)
The method is symmetrical with open().
In particular, we take advantage of this method in
ConnectionPool.__del__() to remove some duplication.

psycopg_pool/psycopg_pool/pool.py
psycopg_pool/psycopg_pool/pool_async.py

index 0211424ae594c2400d6eb3ac46dfac16da896ae3..f92de0bed417d9099b1813c2cb4579d030a2518b 100644 (file)
@@ -11,7 +11,7 @@ from time import monotonic
 from queue import Queue, Empty
 from types import TracebackType
 from typing import Any, Callable, Dict, Iterator, List
-from typing import Optional, Type
+from typing import Optional, Sequence, Type
 from weakref import ref
 from contextlib import contextmanager
 
@@ -62,16 +62,7 @@ class ConnectionPool(BasePool[Connection[Any]]):
         if getattr(self, "_closed", True):
             return
 
-        # Things we can try to do on a best-effort basis while the world
-        # is crumbling (a-la Eternal Sunshine of the Spotless Mind)
-        # At worse we put an item in a queue that is being deleted.
-
-        # Stop the scheduler
-        self._sched.enter(0, None)
-
-        # Stop the worker threads
-        for i in range(len(self._workers)):
-            self.run_task(StopWorker(self))
+        self._stop_workers()
 
     def wait(self, timeout: float = 30.0) -> None:
         """
@@ -289,11 +280,19 @@ class ConnectionPool(BasePool[Connection[Any]]):
             # Take waiting client and pool connections out of the state
             waiting = list(self._waiting)
             self._waiting.clear()
-            pool = list(self._pool)
+            connections = list(self._pool)
             self._pool.clear()
 
         # Now that the flag _closed is set, getconn will fail immediately,
         # putconn will just close the returned connection.
+        self._stop_workers(waiting, connections, timeout)
+
+    def _stop_workers(
+        self,
+        waiting_clients: Sequence["WaitingClient"] = (),
+        connections: Sequence[Connection[Any]] = (),
+        timeout: float = 0,
+    ) -> None:
 
         # Stop the scheduler
         self._sched.enter(0, None)
@@ -304,11 +303,11 @@ class ConnectionPool(BasePool[Connection[Any]]):
             self.run_task(StopWorker(self))
 
         # Signal to eventual clients in the queue that business is closed.
-        for pos in waiting:
+        for pos in waiting_clients:
             pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
 
         # Close the connections still in the pool
-        for conn in pool:
+        for conn in connections:
             conn.close()
 
         # Wait for the worker threads to terminate
index 6c140c09c3f617841e3453e5a4db76a3723b4d18..c954898b1386511508ae028eb44de34e5525958a 100644 (file)
@@ -11,7 +11,7 @@ from abc import ABC, abstractmethod
 from time import monotonic
 from types import TracebackType
 from typing import Any, AsyncIterator, Awaitable, Callable
-from typing import Dict, List, Optional, Type
+from typing import Dict, List, Optional, Sequence, Type
 from weakref import ref
 
 from psycopg import errors as e
@@ -229,12 +229,19 @@ class AsyncConnectionPool(BasePool[AsyncConnection[Any]]):
             # Take waiting client and pool connections out of the state
             waiting = list(self._waiting)
             self._waiting.clear()
-            pool = list(self._pool)
+            connections = list(self._pool)
             self._pool.clear()
 
         # Now that the flag _closed is set, getconn will fail immediately,
         # putconn will just close the returned connection.
+        await self._stop_workers(waiting, connections, timeout)
 
+    async def _stop_workers(
+        self,
+        waiting_clients: Sequence["AsyncClient"] = (),
+        connections: Sequence[AsyncConnection[Any]] = (),
+        timeout: float = 0,
+    ) -> None:
         # Stop the scheduler
         await self._sched.enter(0, None)
 
@@ -244,11 +251,11 @@ class AsyncConnectionPool(BasePool[AsyncConnection[Any]]):
             self.run_task(StopWorker(self))
 
         # Signal to eventual clients in the queue that business is closed.
-        for pos in waiting:
+        for pos in waiting_clients:
             await pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
 
         # Close the connections still in the pool
-        for conn in pool:
+        for conn in connections:
             await conn.close()
 
         # Wait for the worker tasks to terminate