(:ticket:`#1046`).
+psycopg_pool 3.2.7 (unreleased)
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+- Fix infinite loop with connections modified to return on close
+ (:ticket:`#1124`).
+
+
Current release
---------------
# Close the connections that were still in the pool
for conn in connections:
+ conn._pool = None
conn.close()
# Signal to eventual clients in the queue that business is closed.
# Check for expired connections
if conn._expire_at <= monotonic():
logger.info("discarding expired connection %s", conn)
+ conn._pool = None
conn.close()
self.run_task(AddConnection(self))
continue
if conn._expire_at <= monotonic():
self.run_task(AddConnection(self))
logger.info("discarding expired connection")
+ conn._pool = None
conn.close()
return
ex,
conn,
)
+ conn._pool = None
conn.close()
elif status == TransactionStatus.ACTIVE:
# Connection returned during an operation. Bad... just close it.
logger.warning("closing returned connection: %s", conn)
+ conn._pool = None
conn.close()
if self._reset:
)
except Exception as ex:
logger.warning(f"error resetting connection: {ex}")
+ conn._pool = None
conn.close()
def _shrink_pool(self) -> None:
nconns_min,
self.max_idle,
)
+ to_close._pool = None
to_close.close()
def _get_measures(self) -> dict[str, int]:
# Close the connections that were still in the pool
for conn in connections:
+ conn._pool = None
await conn.close()
# Signal to eventual clients in the queue that business is closed.
# Check for expired connections
if conn._expire_at <= monotonic():
logger.info("discarding expired connection %s", conn)
+ conn._pool = None
await conn.close()
self.run_task(AddConnection(self))
continue
if conn._expire_at <= monotonic():
self.run_task(AddConnection(self))
logger.info("discarding expired connection")
+ conn._pool = None
await conn.close()
return
ex,
conn,
)
+ conn._pool = None
await conn.close()
elif status == TransactionStatus.ACTIVE:
# Connection returned during an operation. Bad... just close it.
logger.warning("closing returned connection: %s", conn)
+ conn._pool = None
await conn.close()
if self._reset:
)
except Exception as ex:
logger.warning(f"error resetting connection: {ex}")
+ conn._pool = None
await conn.close()
async def _shrink_pool(self) -> None:
nconns_min,
self.max_idle,
)
+ to_close._pool = None
await to_close.close()
def _get_measures(self) -> dict[str, int]:
assert conn.info.transaction_status == TransactionStatus.IDLE
-def test_override_close(dsn):
- # Verify that it's possible to override `close()` to act as `putconn()`.
- # which allows to use the psycopg pool in a sqlalchemy NullPool.
- #
- # We cannot guarantee 100% that we will never break this implementation,
- # but we can keep awareness that we use it this way, maintain it on a
- # best-effort basis, and notify upstream if we are forced to break it.
- #
- # https://github.com/sqlalchemy/sqlalchemy/discussions/12522
- # https://github.com/psycopg/psycopg/issues/1046
-
- class MyConnection(psycopg.Connection[Row]):
+class ReturningConnection(psycopg.Connection[Row]):
+ """
+ Test connection returning to the pool on close.
+
+ Verify that it's possible to override `close()` to act as `putconn()`.
+ which allows to use the psycopg pool in a sqlalchemy NullPool.
+
+ We cannot guarantee 100% that we will never break this implementation,
+ but we can keep awareness that we use it this way, maintain it on a
+ best-effort basis, and notify upstream if we are forced to break it.
+
+ https://github.com/sqlalchemy/sqlalchemy/discussions/12522
+ https://github.com/psycopg/psycopg/issues/1046
+ """
+
+ def close(self) -> None:
+ if pool := getattr(self, "_pool", None):
+ # Connection currently checked out from the pool.
+ # Instead of closing it, return it to the pool.
+ pool.putconn(self)
+ else:
+ # Connection not part of any pool, or currently into the pool.
+ # Close the connection for real.
+ super().close()
- def close(self) -> None:
- if pool := getattr(self, "_pool", None):
- # Connection currently checked out from the pool.
- # Instead of closing it, return it to the pool.
- pool.putconn(self)
- else:
- # Connection not part of any pool, or currently into the pool.
- # Close the connection for real.
- super().close()
- with pool.ConnectionPool(dsn, connection_class=MyConnection, min_size=2) as p:
+def test_override_close(dsn):
+ with pool.ConnectionPool(
+ dsn, connection_class=ReturningConnection, min_size=2
+ ) as p:
p.wait()
assert len(p._pool) == 2
conn = p.getconn()
with pytest.raises(TypeError, match="close_returns=True"):
pool.ConnectionPool(dsn, connection_class=MyConnection, close_returns=True)
+
+
+@pytest.mark.slow
+@pytest.mark.skipif(PSYCOPG_VERSION < (3, 3), reason="psycopg >= 3.3 behaviour")
+def test_close_returns_no_loop(dsn):
+ with pool.ConnectionPool(
+ dsn, min_size=1, close_returns=True, max_lifetime=0.05
+ ) as p:
+ conn = p.getconn()
+ sleep(0.1)
+ assert len(p._pool) == 0
+ sleep(0.1) # wait for the connection to expire
+ conn.close()
+ sleep(0.1)
+ assert len(p._pool) == 1
+ conn = p.getconn()
+ sleep(0.1)
+ assert len(p._pool) == 0
+ conn.close()
+ sleep(0.1)
+ assert len(p._pool) == 1
+
+
+@pytest.mark.slow
+def test_override_close_no_loop_subclass(dsn):
+ with pool.ConnectionPool(
+ dsn, min_size=1, max_lifetime=0.05, connection_class=ReturningConnection
+ ) as p:
+ conn = p.getconn()
+ sleep(0.1)
+ assert len(p._pool) == 0
+ sleep(0.1) # wait for the connection to expire
+ conn.close()
+ sleep(0.1)
+ assert len(p._pool) == 1
+ conn = p.getconn()
+ sleep(0.1)
+ assert len(p._pool) == 0
+ conn.close()
+ sleep(0.1)
+ assert len(p._pool) == 1
assert conn.info.transaction_status == TransactionStatus.IDLE
-async def test_override_close(dsn):
- # Verify that it's possible to override `close()` to act as `putconn()`.
- # which allows to use the psycopg pool in a sqlalchemy NullPool.
- #
- # We cannot guarantee 100% that we will never break this implementation,
- # but we can keep awareness that we use it this way, maintain it on a
- # best-effort basis, and notify upstream if we are forced to break it.
- #
- # https://github.com/sqlalchemy/sqlalchemy/discussions/12522
- # https://github.com/psycopg/psycopg/issues/1046
+class ReturningConnection(psycopg.AsyncConnection[Row]):
+ """
+ Test connection returning to the pool on close.
+
+ Verify that it's possible to override `close()` to act as `putconn()`.
+ which allows to use the psycopg pool in a sqlalchemy NullPool.
+
+ We cannot guarantee 100% that we will never break this implementation,
+ but we can keep awareness that we use it this way, maintain it on a
+ best-effort basis, and notify upstream if we are forced to break it.
+
+ https://github.com/sqlalchemy/sqlalchemy/discussions/12522
+ https://github.com/psycopg/psycopg/issues/1046
+ """
+
+ async def close(self) -> None:
+ if pool := getattr(self, "_pool", None):
+ # Connection currently checked out from the pool.
+ # Instead of closing it, return it to the pool.
+ await pool.putconn(self)
+ else:
+ # Connection not part of any pool, or currently into the pool.
+ # Close the connection for real.
+ await super().close()
- class MyConnection(psycopg.AsyncConnection[Row]):
- async def close(self) -> None:
- if pool := getattr(self, "_pool", None):
- # Connection currently checked out from the pool.
- # Instead of closing it, return it to the pool.
- await pool.putconn(self)
- else:
- # Connection not part of any pool, or currently into the pool.
- # Close the connection for real.
- await super().close()
+async def test_override_close(dsn):
async with pool.AsyncConnectionPool(
- dsn, connection_class=MyConnection, min_size=2
+ dsn, connection_class=ReturningConnection, min_size=2
) as p:
await p.wait()
assert len(p._pool) == 2
with pytest.raises(TypeError, match="close_returns=True"):
pool.AsyncConnectionPool(dsn, connection_class=MyConnection, close_returns=True)
+
+
+@pytest.mark.slow
+@pytest.mark.skipif(PSYCOPG_VERSION < (3, 3), reason="psycopg >= 3.3 behaviour")
+async def test_close_returns_no_loop(dsn):
+ async with pool.AsyncConnectionPool(
+ dsn, min_size=1, close_returns=True, max_lifetime=0.05
+ ) as p:
+ conn = await p.getconn()
+ await asleep(0.1)
+ assert len(p._pool) == 0
+ await asleep(0.1) # wait for the connection to expire
+ await conn.close()
+ await asleep(0.1)
+ assert len(p._pool) == 1
+ conn = await p.getconn()
+ await asleep(0.1)
+ assert len(p._pool) == 0
+ await conn.close()
+ await asleep(0.1)
+ assert len(p._pool) == 1
+
+
+@pytest.mark.slow
+async def test_override_close_no_loop_subclass(dsn):
+ async with pool.AsyncConnectionPool(
+ dsn, min_size=1, max_lifetime=0.05, connection_class=ReturningConnection
+ ) as p:
+ conn = await p.getconn()
+ await asleep(0.1)
+ assert len(p._pool) == 0
+ await asleep(0.1) # wait for the connection to expire
+ await conn.close()
+ await asleep(0.1)
+ assert len(p._pool) == 1
+ conn = await p.getconn()
+ await asleep(0.1)
+ assert len(p._pool) == 0
+ await conn.close()
+ await asleep(0.1)
+ assert len(p._pool) == 1