Behaviour implemented via subclassing Psycopg 3.2.
Close #1046
"""Close the database connection."""
if self.closed:
return
+
+ pool = getattr(self, "_pool", None)
+ if pool and getattr(pool, "close_returns", False):
+ pool.putconn(self)
+ return
+
self._closed = True
# TODO: maybe send a cancel on close, if the connection is ACTIVE?
"""Close the database connection."""
if self.closed:
return
+
+ pool = getattr(self, "_pool", None)
+ if pool and getattr(pool, "close_returns", False):
+ await pool.putconn(self)
+ return
+
self._closed = True
# TODO: maybe send a cancel on close, if the connection is ACTIVE?
min_size: int,
max_size: int | None,
name: str | None,
+ close_returns: bool,
timeout: float,
max_waiting: int,
max_lifetime: float,
self.conninfo = conninfo
self.kwargs: dict[str, Any] = kwargs or {}
self.name = name
+ self.close_returns = close_returns
self._min_size = min_size
self._max_size = max_size
self.timeout = timeout
from .base import AttemptWithBackoff, BasePool
from .sched import Scheduler
from .errors import PoolClosed, PoolTimeout, TooManyRequests
-from ._compat import Self
+from ._compat import PSYCOPG_VERSION, PoolConnection, Self
from ._acompat import Condition, Event, Lock, Queue, Worker, current_thread_name
from ._acompat import gather, sleep, spawn
check: ConnectionCB[CT] | None = None,
reset: ConnectionCB[CT] | None = None,
name: str | None = None,
+ close_returns: bool = False,
timeout: float = 30.0,
max_waiting: int = 0,
max_lifetime: float = 60 * 60.0,
reconnect_failed: ConnectFailedCB | None = None,
num_workers: int = 3,
):
+ if close_returns and PSYCOPG_VERSION < (3, 3):
+ if connection_class is Connection:
+ connection_class = cast(type[CT], PoolConnection)
+ else:
+ raise TypeError(
+ "Using 'close_returns=True' and a non-standard 'connection_class' requires psycopg 3.3 or newer."
+ )
+
self.connection_class = connection_class
self._check = check
self._configure = configure
min_size=min_size,
max_size=max_size,
name=name,
+ close_returns=close_returns,
timeout=timeout,
max_waiting=max_waiting,
max_lifetime=max_lifetime,
from .abc import ACT, AsyncConnectFailedCB, AsyncConnectionCB
from .base import AttemptWithBackoff, BasePool
from .errors import PoolClosed, PoolTimeout, TooManyRequests
-from ._compat import Self
+from ._compat import PSYCOPG_VERSION, AsyncPoolConnection, Self
from ._acompat import ACondition, AEvent, ALock, AQueue, AWorker, agather, asleep
from ._acompat import aspawn, current_task_name, ensure_async
from .sched_async import AsyncScheduler
check: AsyncConnectionCB[ACT] | None = None,
reset: AsyncConnectionCB[ACT] | None = None,
name: str | None = None,
+ close_returns: bool = False,
timeout: float = 30.0,
max_waiting: int = 0,
max_lifetime: float = 60 * 60.0,
reconnect_failed: AsyncConnectFailedCB | None = None,
num_workers: int = 3,
):
+ if close_returns and PSYCOPG_VERSION < (3, 3):
+ if connection_class is AsyncConnection:
+ connection_class = cast(type[ACT], AsyncPoolConnection)
+ else:
+ raise TypeError(
+ "Using 'close_returns=True' and a non-standard 'connection_class'"
+ " requires psycopg 3.3 or newer."
+ )
+
self.connection_class = connection_class
self._check = check
self._configure = configure
min_size=min_size,
max_size=max_size,
name=name,
+ close_returns=close_returns,
timeout=timeout,
max_waiting=max_waiting,
max_lifetime=max_lifetime,
pass
+PSYCOPG_VERSION = tuple(map(int, psycopg.__version__.split(".", 2)[:2]))
+
+
def test_default_sizes(dsn):
with pool.ConnectionPool(dsn) as p:
assert p.min_size == p.max_size == 4
assert len(p._pool) == 2
assert conn.closed
+
+
+def test_close_returns(dsn):
+ with pool.ConnectionPool(dsn, min_size=2, close_returns=True) as p:
+ p.wait()
+ assert len(p._pool) == 2
+ conn = p.getconn()
+ assert not conn.closed
+ assert len(p._pool) == 1
+ conn.close()
+ assert not conn.closed
+ assert len(p._pool) == 2
+
+ assert conn.closed
+
+
+@pytest.mark.skipif(PSYCOPG_VERSION < (3, 3), reason="psycopg >= 3.3 behaviour")
+def test_close_returns_custom_class(dsn):
+
+ class MyConnection(psycopg.Connection):
+ pass
+
+ with pool.ConnectionPool(
+ dsn, min_size=2, connection_class=MyConnection, close_returns=True
+ ) as p:
+ p.wait()
+ conn = p.getconn()
+ assert not conn.closed
+ assert len(p._pool) == 1
+ conn.close()
+ assert not conn.closed
+ assert len(p._pool) == 2
+
+ assert conn.closed
+
+
+@pytest.mark.skipif(PSYCOPG_VERSION >= (3, 3), reason="psycopg < 3.3 behaviour")
+def test_close_returns_custom_class_old(dsn):
+
+ class MyConnection(psycopg.Connection):
+ pass
+
+ with pytest.raises(TypeError, match="close_returns=True"):
+ pool.ConnectionPool(dsn, connection_class=MyConnection, close_returns=True)
if True: # ASYNC
pytestmark = [pytest.mark.anyio]
+PSYCOPG_VERSION = tuple(map(int, psycopg.__version__.split(".", 2)[:2]))
+
async def test_default_sizes(dsn):
async with pool.AsyncConnectionPool(dsn) as p:
assert len(p._pool) == 2
assert conn.closed
+
+
+async def test_close_returns(dsn):
+
+ async with pool.AsyncConnectionPool(dsn, min_size=2, close_returns=True) as p:
+ await p.wait()
+ assert len(p._pool) == 2
+ conn = await p.getconn()
+ assert not conn.closed
+ assert len(p._pool) == 1
+ await conn.close()
+ assert not conn.closed
+ assert len(p._pool) == 2
+
+ assert conn.closed
+
+
+@pytest.mark.skipif(PSYCOPG_VERSION < (3, 3), reason="psycopg >= 3.3 behaviour")
+async def test_close_returns_custom_class(dsn):
+ class MyConnection(psycopg.AsyncConnection):
+ pass
+
+ async with pool.AsyncConnectionPool(
+ dsn, min_size=2, connection_class=MyConnection, close_returns=True
+ ) as p:
+ await p.wait()
+ conn = await p.getconn()
+ assert not conn.closed
+ assert len(p._pool) == 1
+ await conn.close()
+ assert not conn.closed
+ assert len(p._pool) == 2
+
+ assert conn.closed
+
+
+@pytest.mark.skipif(PSYCOPG_VERSION >= (3, 3), reason="psycopg < 3.3 behaviour")
+async def test_close_returns_custom_class_old(dsn):
+ class MyConnection(psycopg.AsyncConnection):
+ pass
+
+ with pytest.raises(TypeError, match="close_returns=True"):
+ pool.AsyncConnectionPool(dsn, connection_class=MyConnection, close_returns=True)
"AsyncLibpqWriter": "LibpqWriter",
"AsyncNullConnectionPool": "NullConnectionPool",
"AsyncPipeline": "Pipeline",
+ "AsyncPoolConnection": "PoolConnection",
"AsyncQueuedLibpqWriter": "QueuedLibpqWriter",
"AsyncRawCursor": "RawCursor",
"AsyncRawServerCursor": "RawServerCursor",