]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix(pool): avoid possible deadlock (until timeout) on pool closing
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Mon, 6 May 2024 09:21:33 +0000 (11:21 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Tue, 7 May 2024 16:58:07 +0000 (18:58 +0200)
With the previous change to avoid finding open connections in the pool
(#784), stopping the worker was moved into the critical section. This
can create a deadlock in case a worker is in the process of obtaining a
new connection, because putting it to the pool requires the lock. The
deadlock only last for the default 5s timeout passed to _stop_workers().

Solve the problem by guarding _add_to_pool() to avoid it to try to add
the connection if the pool is closed.

However, refactor the pool closing sequence too and close the workers
and other resources that now out of the state outside the critical
section to keep the operation running under lock to a minimum.

psycopg_pool/psycopg_pool/pool.py
psycopg_pool/psycopg_pool/pool_async.py

index f1bdc9707605f1fb9174e8ae6a309fb26af8c704..2d17f5ff179c9612bcdfd6bdb3f7dd6f167ecd97 100644 (file)
@@ -106,7 +106,8 @@ class ConnectionPool(Generic[CT], BasePool):
         if getattr(self, "_closed", True):
             return
 
-        self._stop_workers(timeout=5.0)
+        workers = self._signal_stop_worker()
+        gather(*workers, timeout=5.0)
 
     def _check_open_getconn(self) -> None:
         super()._check_open_getconn()
@@ -421,23 +422,30 @@ class ConnectionPool(Generic[CT], BasePool):
             self._closed = True
             logger.debug("pool %r closed", self.name)
 
-            # Stop the worker, wait for the threads to finish.
-            self._stop_workers(timeout=timeout)
+            # Take waiting client and pool connections out of the state
+            waiting = list(self._waiting)
+            self._waiting.clear()
+            connections = list(self._pool)
+            self._pool.clear()
 
-            # Signal to eventual clients in the queue that business is closed.
-            while self._waiting:
-                pos = self._waiting.pop()
-                pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
+            # Take the workers out of the pool. Will stop them outside the lock
+            workers = self._signal_stop_worker()
 
-            # Close the connections still in the pool
-            while self._pool:
-                conn = self._pool.pop()
-                conn.close()
+        # Now that the flag _closed is set, getconn will fail immediately,
+        # putconn will just close the returned connection.
 
-    # Now that the flag _closed is set, getconn will fail immediately,
-    # putconn will just close the returned connection.
+        # Wait for the worker tasks to terminate
+        gather(*workers, timeout=timeout)
 
-    def _stop_workers(self, timeout: float | None = None) -> None:
+        # Close the connections that were still in the pool
+        for conn in connections:
+            conn.close()
+
+        # Signal to eventual clients in the queue that business is closed.
+        for pos in waiting:
+            pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
+
+    def _signal_stop_worker(self) -> list[Worker]:
         # Stop the scheduler
         self._sched.enter(0, None)
 
@@ -450,8 +458,7 @@ class ConnectionPool(Generic[CT], BasePool):
             workers.append(self._sched_runner)
             self._sched_runner = None
 
-        # Wait for the worker tasks to terminate
-        gather(*workers, timeout=timeout)
+        return workers
 
     def __enter__(self) -> Self:
         self._open_implicit = False
@@ -706,9 +713,25 @@ class ConnectionPool(Generic[CT], BasePool):
         # Also disable the warning for open connection in conn.__del__
         conn._pool = None
 
+        # Early bailout in case the pool is closed. Don't add anything to the
+        # state. There is still a remote chance that the pool will be closed
+        # between here and entering the lock. Therefore we will make another
+        # check later.
+        if self._closed:
+            conn.close()
+            return
+
         # Critical section: if there is a client waiting give it the connection
         # otherwise put it back into the pool.
         with self._lock:
+            # Check if the pool was closed by the time we arrived here. It is
+            # unlikely but it doesn't seem impossible, if the worker was adding
+            # this connection while the main process is closing the pool.
+            # Now that we are in the critical section we know for real.
+            if self._closed:
+                conn.close()
+                return
+
             while self._waiting:
                 # If there is a client waiting (which is still waiting and
                 # hasn't timed out), give it the connection and notify it.
index 39c0e6bbd9ba4ffe2be2f23f649116847e7197e3..a12f33c3565467f29bf6a53bf76ac297a985c236 100644 (file)
@@ -111,7 +111,8 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
             if getattr(self, "_closed", True):
                 return
 
-            self._stop_workers(timeout=5.0)
+            workers = self._signal_stop_worker()
+            agather(*workers, timeout=5.0)
 
     def _check_open_getconn(self) -> None:
         super()._check_open_getconn()
@@ -453,23 +454,30 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
             self._closed = True
             logger.debug("pool %r closed", self.name)
 
-            # Stop the worker, wait for the threads to finish.
-            await self._stop_workers(timeout=timeout)
+            # Take waiting client and pool connections out of the state
+            waiting = list(self._waiting)
+            self._waiting.clear()
+            connections = list(self._pool)
+            self._pool.clear()
 
-            # Signal to eventual clients in the queue that business is closed.
-            while self._waiting:
-                pos = self._waiting.pop()
-                await pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
-
-            # Close the connections still in the pool
-            while self._pool:
-                conn = self._pool.pop()
-                await conn.close()
+            # Take the workers out of the pool. Will stop them outside the lock
+            workers = await self._signal_stop_worker()
 
         # Now that the flag _closed is set, getconn will fail immediately,
         # putconn will just close the returned connection.
 
-    async def _stop_workers(self, timeout: float | None = None) -> None:
+        # Wait for the worker tasks to terminate
+        await agather(*workers, timeout=timeout)
+
+        # Close the connections that were still in the pool
+        for conn in connections:
+            await conn.close()
+
+        # Signal to eventual clients in the queue that business is closed.
+        for pos in waiting:
+            await pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
+
+    async def _signal_stop_worker(self) -> list[AWorker]:
         # Stop the scheduler
         await self._sched.enter(0, None)
 
@@ -482,8 +490,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
             workers.append(self._sched_runner)
             self._sched_runner = None
 
-        # Wait for the worker tasks to terminate
-        await agather(*workers, timeout=timeout)
+        return workers
 
     async def __aenter__(self) -> Self:
         self._open_implicit = False
@@ -756,9 +763,26 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
         # Also disable the warning for open connection in conn.__del__
         conn._pool = None
 
+        # Early bailout in case the pool is closed. Don't add anything to the
+        # state. There is still a remote chance that the pool will be closed
+        # between here and entering the lock. Therefore we will make another
+        # check later.
+        if self._closed:
+            await conn.close()
+            return
+
         # Critical section: if there is a client waiting give it the connection
         # otherwise put it back into the pool.
         async with self._lock:
+
+            # Check if the pool was closed by the time we arrived here. It is
+            # unlikely but it doesn't seem impossible, if the worker was adding
+            # this connection while the main process is closing the pool.
+            # Now that we are in the critical section we know for real.
+            if self._closed:
+                await conn.close()
+                return
+
             while self._waiting:
                 # If there is a client waiting (which is still waiting and
                 # hasn't timed out), give it the connection and notify it.