]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix(pool): fix infinite loop with close_returns=True
authorbash000000 <m2588953@outlook.com>
Sat, 26 Jul 2025 13:44:49 +0000 (21:44 +0800)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 19 Oct 2025 01:32:16 +0000 (03:32 +0200)
Close #1124

docs/news_pool.rst
psycopg_pool/psycopg_pool/pool.py
psycopg_pool/psycopg_pool/pool_async.py
tests/pool/test_pool.py
tests/pool/test_pool_async.py

index a2a463e0efc7591e12181449ea3718e68e9f170b..506cb16081c8419f56f0c9c30bc0eb3605891a91 100644 (file)
@@ -17,6 +17,13 @@ psycopg_pool 3.3.0 (unreleased)
   (:ticket:`#1046`).
 
 
+psycopg_pool 3.2.7 (unreleased)
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+- Fix infinite loop with connections modified to return on close
+  (:ticket:`#1124`).
+
+
 Current release
 ---------------
 
index e67a32f249b2a01b75cbdf34bad60392ef6f58bb..d8411d070ea93b51e5fa5c7b528092f84d86f336 100644 (file)
@@ -449,6 +449,7 @@ class ConnectionPool(Generic[CT], BasePool):
 
         # Close the connections that were still in the pool
         for conn in connections:
+            conn._pool = None
             conn.close()
 
         # Signal to eventual clients in the queue that business is closed.
@@ -521,6 +522,7 @@ class ConnectionPool(Generic[CT], BasePool):
             # Check for expired connections
             if conn._expire_at <= monotonic():
                 logger.info("discarding expired connection %s", conn)
+                conn._pool = None
                 conn.close()
                 self.run_task(AddConnection(self))
                 continue
@@ -700,6 +702,7 @@ class ConnectionPool(Generic[CT], BasePool):
         if conn._expire_at <= monotonic():
             self.run_task(AddConnection(self))
             logger.info("discarding expired connection")
+            conn._pool = None
             conn.close()
             return
 
@@ -773,10 +776,12 @@ class ConnectionPool(Generic[CT], BasePool):
                     ex,
                     conn,
                 )
+                conn._pool = None
                 conn.close()
         elif status == TransactionStatus.ACTIVE:
             # Connection returned during an operation. Bad... just close it.
             logger.warning("closing returned connection: %s", conn)
+            conn._pool = None
             conn.close()
 
         if self._reset:
@@ -789,6 +794,7 @@ class ConnectionPool(Generic[CT], BasePool):
                     )
             except Exception as ex:
                 logger.warning(f"error resetting connection: {ex}")
+                conn._pool = None
                 conn.close()
 
     def _shrink_pool(self) -> None:
@@ -813,6 +819,7 @@ class ConnectionPool(Generic[CT], BasePool):
                 nconns_min,
                 self.max_idle,
             )
+            to_close._pool = None
             to_close.close()
 
     def _get_measures(self) -> dict[str, int]:
index 5731de9af1cd17a998881b1f63d8e3befbf3bc68..99e6b00bb684c55082c009453ebd8cd4cbc9d2df 100644 (file)
@@ -488,6 +488,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
 
         # Close the connections that were still in the pool
         for conn in connections:
+            conn._pool = None
             await conn.close()
 
         # Signal to eventual clients in the queue that business is closed.
@@ -560,6 +561,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
             # Check for expired connections
             if conn._expire_at <= monotonic():
                 logger.info("discarding expired connection %s", conn)
+                conn._pool = None
                 await conn.close()
                 self.run_task(AddConnection(self))
                 continue
@@ -752,6 +754,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
         if conn._expire_at <= monotonic():
             self.run_task(AddConnection(self))
             logger.info("discarding expired connection")
+            conn._pool = None
             await conn.close()
             return
 
@@ -827,11 +830,13 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
                     ex,
                     conn,
                 )
+                conn._pool = None
                 await conn.close()
 
         elif status == TransactionStatus.ACTIVE:
             # Connection returned during an operation. Bad... just close it.
             logger.warning("closing returned connection: %s", conn)
+            conn._pool = None
             await conn.close()
 
         if self._reset:
@@ -845,6 +850,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
                     )
             except Exception as ex:
                 logger.warning(f"error resetting connection: {ex}")
+                conn._pool = None
                 await conn.close()
 
     async def _shrink_pool(self) -> None:
@@ -870,6 +876,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
                 nconns_min,
                 self.max_idle,
             )
+            to_close._pool = None
             await to_close.close()
 
     def _get_measures(self) -> dict[str, int]:
index df90c1c3e7c08d11c67dc5dfc4ffaa4e50431c3a..95a96db46b68018229b814fff0a7904fb7e46066 100644 (file)
@@ -1043,30 +1043,36 @@ def test_check_returns_an_ok_connection(dsn, status):
         assert conn.info.transaction_status == TransactionStatus.IDLE
 
 
-def test_override_close(dsn):
-    # Verify that it's possible to override `close()` to act as `putconn()`.
-    # which allows to use the psycopg pool in a sqlalchemy NullPool.
-    #
-    # We cannot guarantee 100% that we will never break this implementation,
-    # but we can keep awareness that we use it this way, maintain it on a
-    # best-effort basis, and notify upstream if we are forced to break it.
-    #
-    # https://github.com/sqlalchemy/sqlalchemy/discussions/12522
-    # https://github.com/psycopg/psycopg/issues/1046
-
-    class MyConnection(psycopg.Connection[Row]):
+class ReturningConnection(psycopg.Connection[Row]):
+    """
+    Test connection returning to the pool on close.
+
+    Verify that it's possible to override `close()` to act as `putconn()`.
+    which allows to use the psycopg pool in a sqlalchemy NullPool.
+
+    We cannot guarantee 100% that we will never break this implementation,
+    but we can keep awareness that we use it this way, maintain it on a
+    best-effort basis, and notify upstream if we are forced to break it.
+
+    https://github.com/sqlalchemy/sqlalchemy/discussions/12522
+    https://github.com/psycopg/psycopg/issues/1046
+    """
+
+    def close(self) -> None:
+        if pool := getattr(self, "_pool", None):
+            # Connection currently checked out from the pool.
+            # Instead of closing it, return it to the pool.
+            pool.putconn(self)
+        else:
+            # Connection not part of any pool, or currently into the pool.
+            # Close the connection for real.
+            super().close()
 
-        def close(self) -> None:
-            if pool := getattr(self, "_pool", None):
-                # Connection currently checked out from the pool.
-                # Instead of closing it, return it to the pool.
-                pool.putconn(self)
-            else:
-                # Connection not part of any pool, or currently into the pool.
-                # Close the connection for real.
-                super().close()
 
-    with pool.ConnectionPool(dsn, connection_class=MyConnection, min_size=2) as p:
+def test_override_close(dsn):
+    with pool.ConnectionPool(
+        dsn, connection_class=ReturningConnection, min_size=2
+    ) as p:
         p.wait()
         assert len(p._pool) == 2
         conn = p.getconn()
@@ -1121,3 +1127,44 @@ def test_close_returns_custom_class_old(dsn):
 
     with pytest.raises(TypeError, match="close_returns=True"):
         pool.ConnectionPool(dsn, connection_class=MyConnection, close_returns=True)
+
+
+@pytest.mark.slow
+@pytest.mark.skipif(PSYCOPG_VERSION < (3, 3), reason="psycopg >= 3.3 behaviour")
+def test_close_returns_no_loop(dsn):
+    with pool.ConnectionPool(
+        dsn, min_size=1, close_returns=True, max_lifetime=0.05
+    ) as p:
+        conn = p.getconn()
+        sleep(0.1)
+        assert len(p._pool) == 0
+        sleep(0.1)  # wait for the connection to expire
+        conn.close()
+        sleep(0.1)
+        assert len(p._pool) == 1
+        conn = p.getconn()
+        sleep(0.1)
+        assert len(p._pool) == 0
+        conn.close()
+        sleep(0.1)
+        assert len(p._pool) == 1
+
+
+@pytest.mark.slow
+def test_override_close_no_loop_subclass(dsn):
+    with pool.ConnectionPool(
+        dsn, min_size=1, max_lifetime=0.05, connection_class=ReturningConnection
+    ) as p:
+        conn = p.getconn()
+        sleep(0.1)
+        assert len(p._pool) == 0
+        sleep(0.1)  # wait for the connection to expire
+        conn.close()
+        sleep(0.1)
+        assert len(p._pool) == 1
+        conn = p.getconn()
+        sleep(0.1)
+        assert len(p._pool) == 0
+        conn.close()
+        sleep(0.1)
+        assert len(p._pool) == 1
index 6cc20e67217aacade168fd311b022987514ec474..970b2311ac464bbf207cb453064d0354eb09a8c0 100644 (file)
@@ -1044,30 +1044,35 @@ async def test_check_returns_an_ok_connection(dsn, status):
         assert conn.info.transaction_status == TransactionStatus.IDLE
 
 
-async def test_override_close(dsn):
-    # Verify that it's possible to override `close()` to act as `putconn()`.
-    # which allows to use the psycopg pool in a sqlalchemy NullPool.
-    #
-    # We cannot guarantee 100% that we will never break this implementation,
-    # but we can keep awareness that we use it this way, maintain it on a
-    # best-effort basis, and notify upstream if we are forced to break it.
-    #
-    # https://github.com/sqlalchemy/sqlalchemy/discussions/12522
-    # https://github.com/psycopg/psycopg/issues/1046
+class ReturningConnection(psycopg.AsyncConnection[Row]):
+    """
+    Test connection returning to the pool on close.
+
+    Verify that it's possible to override `close()` to act as `putconn()`.
+    which allows to use the psycopg pool in a sqlalchemy NullPool.
+
+    We cannot guarantee 100% that we will never break this implementation,
+    but we can keep awareness that we use it this way, maintain it on a
+    best-effort basis, and notify upstream if we are forced to break it.
+
+    https://github.com/sqlalchemy/sqlalchemy/discussions/12522
+    https://github.com/psycopg/psycopg/issues/1046
+    """
+
+    async def close(self) -> None:
+        if pool := getattr(self, "_pool", None):
+            # Connection currently checked out from the pool.
+            # Instead of closing it, return it to the pool.
+            await pool.putconn(self)
+        else:
+            # Connection not part of any pool, or currently into the pool.
+            # Close the connection for real.
+            await super().close()
 
-    class MyConnection(psycopg.AsyncConnection[Row]):
-        async def close(self) -> None:
-            if pool := getattr(self, "_pool", None):
-                # Connection currently checked out from the pool.
-                # Instead of closing it, return it to the pool.
-                await pool.putconn(self)
-            else:
-                # Connection not part of any pool, or currently into the pool.
-                # Close the connection for real.
-                await super().close()
 
+async def test_override_close(dsn):
     async with pool.AsyncConnectionPool(
-        dsn, connection_class=MyConnection, min_size=2
+        dsn, connection_class=ReturningConnection, min_size=2
     ) as p:
         await p.wait()
         assert len(p._pool) == 2
@@ -1122,3 +1127,44 @@ async def test_close_returns_custom_class_old(dsn):
 
     with pytest.raises(TypeError, match="close_returns=True"):
         pool.AsyncConnectionPool(dsn, connection_class=MyConnection, close_returns=True)
+
+
+@pytest.mark.slow
+@pytest.mark.skipif(PSYCOPG_VERSION < (3, 3), reason="psycopg >= 3.3 behaviour")
+async def test_close_returns_no_loop(dsn):
+    async with pool.AsyncConnectionPool(
+        dsn, min_size=1, close_returns=True, max_lifetime=0.05
+    ) as p:
+        conn = await p.getconn()
+        await asleep(0.1)
+        assert len(p._pool) == 0
+        await asleep(0.1)  # wait for the connection to expire
+        await conn.close()
+        await asleep(0.1)
+        assert len(p._pool) == 1
+        conn = await p.getconn()
+        await asleep(0.1)
+        assert len(p._pool) == 0
+        await conn.close()
+        await asleep(0.1)
+        assert len(p._pool) == 1
+
+
+@pytest.mark.slow
+async def test_override_close_no_loop_subclass(dsn):
+    async with pool.AsyncConnectionPool(
+        dsn, min_size=1, max_lifetime=0.05, connection_class=ReturningConnection
+    ) as p:
+        conn = await p.getconn()
+        await asleep(0.1)
+        assert len(p._pool) == 0
+        await asleep(0.1)  # wait for the connection to expire
+        await conn.close()
+        await asleep(0.1)
+        assert len(p._pool) == 1
+        conn = await p.getconn()
+        await asleep(0.1)
+        assert len(p._pool) == 0
+        await conn.close()
+        await asleep(0.1)
+        assert len(p._pool) == 1