]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix(pool): make sure there are no connection in the pool after close()
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 10 Apr 2024 21:58:24 +0000 (23:58 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 12 Apr 2024 01:20:05 +0000 (03:20 +0200)
The case has been reported in #784. While not easy to reproduce, it
seems that it might be caused by the pool being closed while a worker is
still trying to create a connection, which will be put in the _pool
state after supposedly no other operation should have been performed.

Stop the workers and then empty the pool only after they have stopped to
run.

Also refactor the cleanup of the pool and waiting queue, moving them
to close(). There is no reason why a method called "stop workers" should
empty them, and there is no other code path that use such feature.

Close #784.

docs/news_pool.rst
psycopg_pool/psycopg_pool/pool.py
psycopg_pool/psycopg_pool/pool_async.py

index 503202db2bb0e021516da6c34d1055a6000c1371..99269b5c8541f15500d30f2ea5aaa27fc94027e4 100644 (file)
@@ -15,6 +15,7 @@ psycopg_pool 3.2.2 (unreleased)
 
 - Raise a `RuntimeWarning` instead of a `DeprecationWarning` if an async pool
   is open in the constructor.
+- Fix connections possibly left in the pool after closing (:ticket:`#784`).
 
 
 Current release
index e08476e603ae06ddb67a107a1d5c3603bc641ec7..f1bdc9707605f1fb9174e8ae6a309fb26af8c704 100644 (file)
@@ -15,7 +15,7 @@ from abc import ABC, abstractmethod
 from time import monotonic
 from types import TracebackType
 from typing import Any, Iterator, cast, Dict, Generic, List
-from typing import Optional, Sequence, Type
+from typing import Optional, Type
 from weakref import ref
 from contextlib import contextmanager
 
@@ -106,7 +106,7 @@ class ConnectionPool(Generic[CT], BasePool):
         if getattr(self, "_closed", True):
             return
 
-        self._stop_workers()
+        self._stop_workers(timeout=5.0)
 
     def _check_open_getconn(self) -> None:
         super()._check_open_getconn()
@@ -421,22 +421,23 @@ class ConnectionPool(Generic[CT], BasePool):
             self._closed = True
             logger.debug("pool %r closed", self.name)
 
-            # Take waiting client and pool connections out of the state
-            waiting = list(self._waiting)
-            self._waiting.clear()
-            connections = list(self._pool)
-            self._pool.clear()
+            # Stop the worker, wait for the threads to finish.
+            self._stop_workers(timeout=timeout)
 
-        # Now that the flag _closed is set, getconn will fail immediately,
-        # putconn will just close the returned connection.
-        self._stop_workers(waiting, connections, timeout)
+            # Signal to eventual clients in the queue that business is closed.
+            while self._waiting:
+                pos = self._waiting.pop()
+                pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
 
-    def _stop_workers(
-        self,
-        waiting_clients: Sequence[WaitingClient[CT]] = (),
-        connections: Sequence[CT] = (),
-        timeout: float | None = None,
-    ) -> None:
+            # Close the connections still in the pool
+            while self._pool:
+                conn = self._pool.pop()
+                conn.close()
+
+    # Now that the flag _closed is set, getconn will fail immediately,
+    # putconn will just close the returned connection.
+
+    def _stop_workers(self, timeout: float | None = None) -> None:
         # Stop the scheduler
         self._sched.enter(0, None)
 
@@ -445,18 +446,12 @@ class ConnectionPool(Generic[CT], BasePool):
         for _ in workers:
             self.run_task(StopWorker(self))
 
-        # Signal to eventual clients in the queue that business is closed.
-        for pos in waiting_clients:
-            pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
-
-        # Close the connections still in the pool
-        for conn in connections:
-            conn.close()
+        if self._sched_runner:  # likely
+            workers.append(self._sched_runner)
+            self._sched_runner = None
 
         # Wait for the worker tasks to terminate
-        assert self._sched_runner is not None
-        sched_runner, self._sched_runner = (self._sched_runner, None)
-        gather(sched_runner, *workers, timeout=timeout)
+        gather(*workers, timeout=timeout)
 
     def __enter__(self) -> Self:
         self._open_implicit = False
index e70781b6031e345fb5ad3e438de640d1fd9f1e4d..39c0e6bbd9ba4ffe2be2f23f649116847e7197e3 100644 (file)
@@ -12,7 +12,7 @@ from abc import ABC, abstractmethod
 from time import monotonic
 from types import TracebackType
 from typing import Any, AsyncIterator, cast, Dict, Generic, List
-from typing import Optional, Sequence, Type
+from typing import Optional, Type
 from weakref import ref
 from contextlib import asynccontextmanager
 
@@ -111,7 +111,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
             if getattr(self, "_closed", True):
                 return
 
-            self._stop_workers()
+            self._stop_workers(timeout=5.0)
 
     def _check_open_getconn(self) -> None:
         super()._check_open_getconn()
@@ -453,22 +453,23 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
             self._closed = True
             logger.debug("pool %r closed", self.name)
 
-            # Take waiting client and pool connections out of the state
-            waiting = list(self._waiting)
-            self._waiting.clear()
-            connections = list(self._pool)
-            self._pool.clear()
+            # Stop the worker, wait for the threads to finish.
+            await self._stop_workers(timeout=timeout)
+
+            # Signal to eventual clients in the queue that business is closed.
+            while self._waiting:
+                pos = self._waiting.pop()
+                await pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
+
+            # Close the connections still in the pool
+            while self._pool:
+                conn = self._pool.pop()
+                await conn.close()
 
         # Now that the flag _closed is set, getconn will fail immediately,
         # putconn will just close the returned connection.
-        await self._stop_workers(waiting, connections, timeout)
 
-    async def _stop_workers(
-        self,
-        waiting_clients: Sequence[WaitingClient[ACT]] = (),
-        connections: Sequence[ACT] = (),
-        timeout: float | None = None,
-    ) -> None:
+    async def _stop_workers(self, timeout: float | None = None) -> None:
         # Stop the scheduler
         await self._sched.enter(0, None)
 
@@ -477,18 +478,12 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
         for _ in workers:
             self.run_task(StopWorker(self))
 
-        # Signal to eventual clients in the queue that business is closed.
-        for pos in waiting_clients:
-            await pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
-
-        # Close the connections still in the pool
-        for conn in connections:
-            await conn.close()
+        if self._sched_runner:  # likely
+            workers.append(self._sched_runner)
+            self._sched_runner = None
 
         # Wait for the worker tasks to terminate
-        assert self._sched_runner is not None
-        sched_runner, self._sched_runner = self._sched_runner, None
-        await agather(sched_runner, *workers, timeout=timeout)
+        await agather(*workers, timeout=timeout)
 
     async def __aenter__(self) -> Self:
         self._open_implicit = False