From: Daniele Varrazzo Date: Sun, 4 May 2025 19:21:19 +0000 (+0200) Subject: feat(pool): add close_returns X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=23755b168e2ce83f1b1844f0a919545094f50fe0;p=thirdparty%2Fpsycopg.git feat(pool): add close_returns Behaviour implemented via subclassing Psycopg 3.2. Close #1046 --- diff --git a/psycopg/psycopg/connection.py b/psycopg/psycopg/connection.py index b38326d83..0d74066f3 100644 --- a/psycopg/psycopg/connection.py +++ b/psycopg/psycopg/connection.py @@ -183,6 +183,12 @@ class Connection(BaseConnection[Row]): """Close the database connection.""" if self.closed: return + + pool = getattr(self, "_pool", None) + if pool and getattr(pool, "close_returns", False): + pool.putconn(self) + return + self._closed = True # TODO: maybe send a cancel on close, if the connection is ACTIVE? diff --git a/psycopg/psycopg/connection_async.py b/psycopg/psycopg/connection_async.py index a12527ab2..0a10ae24a 100644 --- a/psycopg/psycopg/connection_async.py +++ b/psycopg/psycopg/connection_async.py @@ -204,6 +204,12 @@ class AsyncConnection(BaseConnection[Row]): """Close the database connection.""" if self.closed: return + + pool = getattr(self, "_pool", None) + if pool and getattr(pool, "close_returns", False): + await pool.putconn(self) + return + self._closed = True # TODO: maybe send a cancel on close, if the connection is ACTIVE? diff --git a/psycopg_pool/psycopg_pool/base.py b/psycopg_pool/psycopg_pool/base.py index e708a4c60..ad109a0d4 100644 --- a/psycopg_pool/psycopg_pool/base.py +++ b/psycopg_pool/psycopg_pool/base.py @@ -50,6 +50,7 @@ class BasePool: min_size: int, max_size: int | None, name: str | None, + close_returns: bool, timeout: float, max_waiting: int, max_lifetime: float, @@ -69,6 +70,7 @@ class BasePool: self.conninfo = conninfo self.kwargs: dict[str, Any] = kwargs or {} self.name = name + self.close_returns = close_returns self._min_size = min_size self._max_size = max_size self.timeout = timeout diff --git a/psycopg_pool/psycopg_pool/null_pool.py b/psycopg_pool/psycopg_pool/null_pool.py index 884eab850..bf027e9dd 100644 --- a/psycopg_pool/psycopg_pool/null_pool.py +++ b/psycopg_pool/psycopg_pool/null_pool.py @@ -40,6 +40,7 @@ class NullConnectionPool(_BaseNullConnectionPool, ConnectionPool[CT]): check: ConnectionCB[CT] | None = None, reset: ConnectionCB[CT] | None = None, name: str | None = None, + close_returns: bool = False, timeout: float = 30.0, max_waiting: int = 0, max_lifetime: float = 60 * 60.0, @@ -49,6 +50,7 @@ class NullConnectionPool(_BaseNullConnectionPool, ConnectionPool[CT]): num_workers: int = 3, ): # Note: min_size default value changed to 0. + # close_returns=True makes no sense super().__init__( conninfo, open=open, @@ -60,6 +62,7 @@ class NullConnectionPool(_BaseNullConnectionPool, ConnectionPool[CT]): min_size=min_size, max_size=max_size, name=name, + close_returns=False, timeout=timeout, max_waiting=max_waiting, max_lifetime=max_lifetime, diff --git a/psycopg_pool/psycopg_pool/null_pool_async.py b/psycopg_pool/psycopg_pool/null_pool_async.py index a037597fa..a18c59c3f 100644 --- a/psycopg_pool/psycopg_pool/null_pool_async.py +++ b/psycopg_pool/psycopg_pool/null_pool_async.py @@ -36,6 +36,7 @@ class AsyncNullConnectionPool(_BaseNullConnectionPool, AsyncConnectionPool[ACT]) check: AsyncConnectionCB[ACT] | None = None, reset: AsyncConnectionCB[ACT] | None = None, name: str | None = None, + close_returns: bool = False, timeout: float = 30.0, max_waiting: int = 0, max_lifetime: float = 60 * 60.0, @@ -55,6 +56,7 @@ class AsyncNullConnectionPool(_BaseNullConnectionPool, AsyncConnectionPool[ACT]) min_size=min_size, max_size=max_size, name=name, + close_returns=False, # close_returns=True makes no sense timeout=timeout, max_waiting=max_waiting, max_lifetime=max_lifetime, diff --git a/psycopg_pool/psycopg_pool/pool.py b/psycopg_pool/psycopg_pool/pool.py index 8626c6c09..d5fd6eb0f 100644 --- a/psycopg_pool/psycopg_pool/pool.py +++ b/psycopg_pool/psycopg_pool/pool.py @@ -28,7 +28,7 @@ from .abc import CT, ConnectFailedCB, ConnectionCB from .base import AttemptWithBackoff, BasePool from .sched import Scheduler from .errors import PoolClosed, PoolTimeout, TooManyRequests -from ._compat import Self +from ._compat import PSYCOPG_VERSION, PoolConnection, Self from ._acompat import Condition, Event, Lock, Queue, Worker, current_thread_name from ._acompat import gather, sleep, spawn @@ -51,6 +51,7 @@ class ConnectionPool(Generic[CT], BasePool): check: ConnectionCB[CT] | None = None, reset: ConnectionCB[CT] | None = None, name: str | None = None, + close_returns: bool = False, timeout: float = 30.0, max_waiting: int = 0, max_lifetime: float = 60 * 60.0, @@ -59,6 +60,14 @@ class ConnectionPool(Generic[CT], BasePool): reconnect_failed: ConnectFailedCB | None = None, num_workers: int = 3, ): + if close_returns and PSYCOPG_VERSION < (3, 3): + if connection_class is Connection: + connection_class = cast(type[CT], PoolConnection) + else: + raise TypeError( + "Using 'close_returns=True' and a non-standard 'connection_class' requires psycopg 3.3 or newer." + ) + self.connection_class = connection_class self._check = check self._configure = configure @@ -86,6 +95,7 @@ class ConnectionPool(Generic[CT], BasePool): min_size=min_size, max_size=max_size, name=name, + close_returns=close_returns, timeout=timeout, max_waiting=max_waiting, max_lifetime=max_lifetime, diff --git a/psycopg_pool/psycopg_pool/pool_async.py b/psycopg_pool/psycopg_pool/pool_async.py index 5f882d4ee..ac4dfe83c 100644 --- a/psycopg_pool/psycopg_pool/pool_async.py +++ b/psycopg_pool/psycopg_pool/pool_async.py @@ -24,7 +24,7 @@ from psycopg.pq import TransactionStatus from .abc import ACT, AsyncConnectFailedCB, AsyncConnectionCB from .base import AttemptWithBackoff, BasePool from .errors import PoolClosed, PoolTimeout, TooManyRequests -from ._compat import Self +from ._compat import PSYCOPG_VERSION, AsyncPoolConnection, Self from ._acompat import ACondition, AEvent, ALock, AQueue, AWorker, agather, asleep from ._acompat import aspawn, current_task_name, ensure_async from .sched_async import AsyncScheduler @@ -51,6 +51,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): check: AsyncConnectionCB[ACT] | None = None, reset: AsyncConnectionCB[ACT] | None = None, name: str | None = None, + close_returns: bool = False, timeout: float = 30.0, max_waiting: int = 0, max_lifetime: float = 60 * 60.0, @@ -59,6 +60,15 @@ class AsyncConnectionPool(Generic[ACT], BasePool): reconnect_failed: AsyncConnectFailedCB | None = None, num_workers: int = 3, ): + if close_returns and PSYCOPG_VERSION < (3, 3): + if connection_class is AsyncConnection: + connection_class = cast(type[ACT], AsyncPoolConnection) + else: + raise TypeError( + "Using 'close_returns=True' and a non-standard 'connection_class'" + " requires psycopg 3.3 or newer." + ) + self.connection_class = connection_class self._check = check self._configure = configure @@ -86,6 +96,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): min_size=min_size, max_size=max_size, name=name, + close_returns=close_returns, timeout=timeout, max_waiting=max_waiting, max_lifetime=max_lifetime, diff --git a/tests/pool/test_pool.py b/tests/pool/test_pool.py index 68064a014..df90c1c3e 100644 --- a/tests/pool/test_pool.py +++ b/tests/pool/test_pool.py @@ -26,6 +26,9 @@ except ImportError: pass +PSYCOPG_VERSION = tuple(map(int, psycopg.__version__.split(".", 2)[:2])) + + def test_default_sizes(dsn): with pool.ConnectionPool(dsn) as p: assert p.min_size == p.max_size == 4 @@ -1074,3 +1077,47 @@ def test_override_close(dsn): assert len(p._pool) == 2 assert conn.closed + + +def test_close_returns(dsn): + with pool.ConnectionPool(dsn, min_size=2, close_returns=True) as p: + p.wait() + assert len(p._pool) == 2 + conn = p.getconn() + assert not conn.closed + assert len(p._pool) == 1 + conn.close() + assert not conn.closed + assert len(p._pool) == 2 + + assert conn.closed + + +@pytest.mark.skipif(PSYCOPG_VERSION < (3, 3), reason="psycopg >= 3.3 behaviour") +def test_close_returns_custom_class(dsn): + + class MyConnection(psycopg.Connection): + pass + + with pool.ConnectionPool( + dsn, min_size=2, connection_class=MyConnection, close_returns=True + ) as p: + p.wait() + conn = p.getconn() + assert not conn.closed + assert len(p._pool) == 1 + conn.close() + assert not conn.closed + assert len(p._pool) == 2 + + assert conn.closed + + +@pytest.mark.skipif(PSYCOPG_VERSION >= (3, 3), reason="psycopg < 3.3 behaviour") +def test_close_returns_custom_class_old(dsn): + + class MyConnection(psycopg.Connection): + pass + + with pytest.raises(TypeError, match="close_returns=True"): + pool.ConnectionPool(dsn, connection_class=MyConnection, close_returns=True) diff --git a/tests/pool/test_pool_async.py b/tests/pool/test_pool_async.py index d4a2d277a..6cc20e672 100644 --- a/tests/pool/test_pool_async.py +++ b/tests/pool/test_pool_async.py @@ -25,6 +25,8 @@ except ImportError: if True: # ASYNC pytestmark = [pytest.mark.anyio] +PSYCOPG_VERSION = tuple(map(int, psycopg.__version__.split(".", 2)[:2])) + async def test_default_sizes(dsn): async with pool.AsyncConnectionPool(dsn) as p: @@ -1077,3 +1079,46 @@ async def test_override_close(dsn): assert len(p._pool) == 2 assert conn.closed + + +async def test_close_returns(dsn): + + async with pool.AsyncConnectionPool(dsn, min_size=2, close_returns=True) as p: + await p.wait() + assert len(p._pool) == 2 + conn = await p.getconn() + assert not conn.closed + assert len(p._pool) == 1 + await conn.close() + assert not conn.closed + assert len(p._pool) == 2 + + assert conn.closed + + +@pytest.mark.skipif(PSYCOPG_VERSION < (3, 3), reason="psycopg >= 3.3 behaviour") +async def test_close_returns_custom_class(dsn): + class MyConnection(psycopg.AsyncConnection): + pass + + async with pool.AsyncConnectionPool( + dsn, min_size=2, connection_class=MyConnection, close_returns=True + ) as p: + await p.wait() + conn = await p.getconn() + assert not conn.closed + assert len(p._pool) == 1 + await conn.close() + assert not conn.closed + assert len(p._pool) == 2 + + assert conn.closed + + +@pytest.mark.skipif(PSYCOPG_VERSION >= (3, 3), reason="psycopg < 3.3 behaviour") +async def test_close_returns_custom_class_old(dsn): + class MyConnection(psycopg.AsyncConnection): + pass + + with pytest.raises(TypeError, match="close_returns=True"): + pool.AsyncConnectionPool(dsn, connection_class=MyConnection, close_returns=True) diff --git a/tests/pool/test_pool_null.py b/tests/pool/test_pool_null.py index ac0ab05f8..332dfd2de 100644 --- a/tests/pool/test_pool_null.py +++ b/tests/pool/test_pool_null.py @@ -497,3 +497,12 @@ def test_cancellation_in_queue(dsn): with p.connection() as conn: cur = conn.execute("select 1") assert cur.fetchone() == (1,) + + +def test_close_returns(dsn): + # Mostly test the interface; close is close even if it goes via putconn(). + with pool.NullConnectionPool(dsn, close_returns=True) as p: + conn = p.getconn() + assert not conn.closed + conn.close() + assert conn.closed diff --git a/tests/pool/test_pool_null_async.py b/tests/pool/test_pool_null_async.py index 0a7fbc7c4..0e7111224 100644 --- a/tests/pool/test_pool_null_async.py +++ b/tests/pool/test_pool_null_async.py @@ -499,3 +499,12 @@ async def test_cancellation_in_queue(dsn): async with p.connection() as conn: cur = await conn.execute("select 1") assert await cur.fetchone() == (1,) + + +async def test_close_returns(dsn): + # Mostly test the interface; close is close even if it goes via putconn(). + async with pool.AsyncNullConnectionPool(dsn, close_returns=True) as p: + conn = await p.getconn() + assert not conn.closed + await conn.close() + assert conn.closed diff --git a/tools/async_to_sync.py b/tools/async_to_sync.py index 7a78b2551..7aee1a81e 100755 --- a/tools/async_to_sync.py +++ b/tools/async_to_sync.py @@ -300,6 +300,7 @@ class RenameAsyncToSync(ast.NodeTransformer): # type: ignore "AsyncLibpqWriter": "LibpqWriter", "AsyncNullConnectionPool": "NullConnectionPool", "AsyncPipeline": "Pipeline", + "AsyncPoolConnection": "PoolConnection", "AsyncQueuedLibpqWriter": "QueuedLibpqWriter", "AsyncRawCursor": "RawCursor", "AsyncRawServerCursor": "RawServerCursor",