From 4e1cc65540ffcf85c3681d11fd6e742053bede65 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Fri, 11 Jul 2025 22:02:11 +0200 Subject: [PATCH] feat(pool): add `drain()` method Add the creation timestamp to the connection to verify that it should be immediately discarded on return. --- docs/api/pool.rst | 4 +++ docs/news_pool.rst | 9 ++++--- psycopg/psycopg/_connection_base.py | 2 ++ psycopg_pool/psycopg_pool/base.py | 4 ++- psycopg_pool/psycopg_pool/pool.py | 22 +++++++++++++++- psycopg_pool/psycopg_pool/pool_async.py | 22 +++++++++++++++- tests/pool/test_pool_common.py | 34 +++++++++++++++++++++++++ tests/pool/test_pool_common_async.py | 34 +++++++++++++++++++++++++ 8 files changed, 124 insertions(+), 7 deletions(-) diff --git a/docs/api/pool.rst b/docs/api/pool.rst index f06fa12e2..ca07b1e87 100644 --- a/docs/api/pool.rst +++ b/docs/api/pool.rst @@ -248,6 +248,10 @@ The `!ConnectionPool` class .. versionadded:: 3.2 + .. automethod:: drain + + .. versionadded:: 3.3 + .. automethod:: get_stats .. automethod:: pop_stats diff --git a/docs/news_pool.rst b/docs/news_pool.rst index 9136e6d65..657cf1b34 100644 --- a/docs/news_pool.rst +++ b/docs/news_pool.rst @@ -13,10 +13,11 @@ Future releases psycopg_pool 3.3.0 (unreleased) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -- Add `!close_returns` for :ref:`integration with SQLAlchemy ` - (:ticket:`#1046`). -- Allow `!conninfo` and `!kwargs` to be callable to allow connection - parameters# update (:ticket:`#851`). +- Add `~ConnectionPool.drain()` method (:ticket:`#1215`). +- Allow the `!conninfo` and `!kwargs` `ConnectionPool` parameters to be callable + to allow connection parameters update (:ticket:`#851`). +- Add `!close_returns` `ConnectionPool` parameter for :ref:`integration with + SQLAlchemy ` (:ticket:`#1046`). Current release diff --git a/psycopg/psycopg/_connection_base.py b/psycopg/psycopg/_connection_base.py index 07b7e523a..566ee4ab1 100644 --- a/psycopg/psycopg/_connection_base.py +++ b/psycopg/psycopg/_connection_base.py @@ -127,6 +127,8 @@ class BaseConnection(Generic[Row]): self._pipeline: BasePipeline | None = None + # Time when the connection was created (currently only used by the pool) + self._created_at: float # Time after which the connection should be closed self._expire_at: float diff --git a/psycopg_pool/psycopg_pool/base.py b/psycopg_pool/psycopg_pool/base.py index 745c995c7..9f68098fb 100644 --- a/psycopg_pool/psycopg_pool/base.py +++ b/psycopg_pool/psycopg_pool/base.py @@ -79,6 +79,7 @@ class BasePool: self._nconns = min_size # currently in the pool, out, being prepared self._pool = deque() self._stats = Counter[str]() + self._drained_at = 0.0 # Min number of connections in the pool in a max_idle unit of time. # It is reset periodically by the ShrinkPool scheduled task. @@ -196,7 +197,8 @@ class BasePool: Add some randomness to avoid mass reconnection. """ - conn._expire_at = monotonic() + self._jitter(self.max_lifetime, -0.05, 0.0) + conn._created_at = t = monotonic() + conn._expire_at = t + self._jitter(self.max_lifetime, -0.05, 0.0) class AttemptWithBackoff: diff --git a/psycopg_pool/psycopg_pool/pool.py b/psycopg_pool/psycopg_pool/pool.py index a5f062fda..245907f67 100644 --- a/psycopg_pool/psycopg_pool/pool.py +++ b/psycopg_pool/psycopg_pool/pool.py @@ -325,6 +325,26 @@ class ConnectionPool(Generic[CT], BasePool): self._putconn(conn, from_getconn=False) + def drain(self) -> None: + """ + Remove all the connections from the pool and create new ones. + + If a connection is currently out of the pool it will be closed when + returned to the pool and replaced with a new one. + + This method is useful to force a connection re-configuration, for + example when the adapters map changes after the pool was created. + """ + with self._lock: + conns = list(self._pool) + self._pool.clear() + self._drained_at = monotonic() + + # Close the connection already in the pool, open new ones. + for conn in conns: + self._close_connection(conn) + self.run_task(AddConnection(self)) + def _putconn(self, conn: CT, from_getconn: bool) -> None: # Use a worker to perform eventual maintenance work in a separate task if self._reset: @@ -715,7 +735,7 @@ class ConnectionPool(Generic[CT], BasePool): return # Check if the connection is past its best before date - if conn._expire_at <= monotonic(): + if conn._created_at <= self._drained_at or conn._expire_at <= monotonic(): logger.info("discarding expired connection") self._close_connection(conn) self.run_task(AddConnection(self)) diff --git a/psycopg_pool/psycopg_pool/pool_async.py b/psycopg_pool/psycopg_pool/pool_async.py index 6ea1b3c88..254e1367d 100644 --- a/psycopg_pool/psycopg_pool/pool_async.py +++ b/psycopg_pool/psycopg_pool/pool_async.py @@ -363,6 +363,26 @@ class AsyncConnectionPool(Generic[ACT], BasePool): await self._putconn(conn, from_getconn=False) + async def drain(self) -> None: + """ + Remove all the connections from the pool and create new ones. + + If a connection is currently out of the pool it will be closed when + returned to the pool and replaced with a new one. + + This method is useful to force a connection re-configuration, for + example when the adapters map changes after the pool was created. + """ + async with self._lock: + conns = list(self._pool) + self._pool.clear() + self._drained_at = monotonic() + + # Close the connection already in the pool, open new ones. + for conn in conns: + await self._close_connection(conn) + self.run_task(AddConnection(self)) + async def _putconn(self, conn: ACT, from_getconn: bool) -> None: # Use a worker to perform eventual maintenance work in a separate task if self._reset: @@ -773,7 +793,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): return # Check if the connection is past its best before date - if conn._expire_at <= monotonic(): + if conn._created_at <= self._drained_at or conn._expire_at <= monotonic(): logger.info("discarding expired connection") await self._close_connection(conn) self.run_task(AddConnection(self)) diff --git a/tests/pool/test_pool_common.py b/tests/pool/test_pool_common.py index 259e7d3ac..1d0732427 100644 --- a/tests/pool/test_pool_common.py +++ b/tests/pool/test_pool_common.py @@ -746,6 +746,40 @@ def test_cancel_on_rollback(pool_cls, dsn, monkeypatch): assert cur.fetchone() == (3,) +@pytest.mark.crdb_skip("backend pid") +def test_drain(pool_cls, dsn): + pids1 = set() + pids2 = set() + pids3 = set() + with pool_cls(dsn, min_size=min_size(pool_cls, 2)) as p: + p.wait() + + with p.connection() as conn: + pids1.add(conn.info.backend_pid) + with p.connection() as conn2: + pids1.add(conn2.info.backend_pid) + p.drain() + assert len(pids1) == 2 + + with p.connection() as conn: + pids2.add(conn.info.backend_pid) + with p.connection() as conn2: + pids2.add(conn2.info.backend_pid) + + assert len(pids2) == 2 + + assert not pids1 & pids2 + + with p.connection() as conn: + pids3.add(conn.info.backend_pid) + with p.connection() as conn2: + pids3.add(conn2.info.backend_pid) + + assert len(pids3) == 2 + if pool_cls is not pool.NullConnectionPool: + assert pids2 == pids3 + + def min_size(pool_cls, num=1): """Return the minimum min_size supported by the pool class.""" if pool_cls is pool.ConnectionPool: diff --git a/tests/pool/test_pool_common_async.py b/tests/pool/test_pool_common_async.py index 3dc01a238..22e56ab19 100644 --- a/tests/pool/test_pool_common_async.py +++ b/tests/pool/test_pool_common_async.py @@ -759,6 +759,40 @@ async def test_cancel_on_rollback(pool_cls, dsn, monkeypatch): assert (await cur.fetchone()) == (3,) +@pytest.mark.crdb_skip("backend pid") +async def test_drain(pool_cls, dsn): + pids1 = set() + pids2 = set() + pids3 = set() + async with pool_cls(dsn, min_size=min_size(pool_cls, 2)) as p: + await p.wait() + + async with p.connection() as conn: + pids1.add(conn.info.backend_pid) + async with p.connection() as conn2: + pids1.add(conn2.info.backend_pid) + await p.drain() + assert len(pids1) == 2 + + async with p.connection() as conn: + pids2.add(conn.info.backend_pid) + async with p.connection() as conn2: + pids2.add(conn2.info.backend_pid) + + assert len(pids2) == 2 + + assert not pids1 & pids2 + + async with p.connection() as conn: + pids3.add(conn.info.backend_pid) + async with p.connection() as conn2: + pids3.add(conn2.info.backend_pid) + + assert len(pids3) == 2 + if pool_cls is not pool.AsyncNullConnectionPool: + assert pids2 == pids3 + + def min_size(pool_cls, num=1): """Return the minimum min_size supported by the pool class.""" if pool_cls is pool.AsyncConnectionPool: -- 2.47.3