.. versionadded:: 3.2
+ .. automethod:: drain
+
+ .. versionadded:: 3.3
+
.. automethod:: get_stats
.. automethod:: pop_stats
psycopg_pool 3.3.0 (unreleased)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-- Add `!close_returns` for :ref:`integration with SQLAlchemy <pool-sqlalchemy>`
- (:ticket:`#1046`).
-- Allow `!conninfo` and `!kwargs` to be callable to allow connection
- parameters# update (:ticket:`#851`).
+- Add `~ConnectionPool.drain()` method (:ticket:`#1215`).
+- Allow the `!conninfo` and `!kwargs` `ConnectionPool` parameters to be callable
+ to allow connection parameters update (:ticket:`#851`).
+- Add `!close_returns` `ConnectionPool` parameter for :ref:`integration with
+ SQLAlchemy <pool-sqlalchemy>` (:ticket:`#1046`).
Current release
self._pipeline: BasePipeline | None = None
+ # Time when the connection was created (currently only used by the pool)
+ self._created_at: float
# Time after which the connection should be closed
self._expire_at: float
self._nconns = min_size # currently in the pool, out, being prepared
self._pool = deque()
self._stats = Counter[str]()
+ self._drained_at = 0.0
# Min number of connections in the pool in a max_idle unit of time.
# It is reset periodically by the ShrinkPool scheduled task.
Add some randomness to avoid mass reconnection.
"""
- conn._expire_at = monotonic() + self._jitter(self.max_lifetime, -0.05, 0.0)
+ conn._created_at = t = monotonic()
+ conn._expire_at = t + self._jitter(self.max_lifetime, -0.05, 0.0)
class AttemptWithBackoff:
self._putconn(conn, from_getconn=False)
+ def drain(self) -> None:
+ """
+ Remove all the connections from the pool and create new ones.
+
+ If a connection is currently out of the pool it will be closed when
+ returned to the pool and replaced with a new one.
+
+ This method is useful to force a connection re-configuration, for
+ example when the adapters map changes after the pool was created.
+ """
+ with self._lock:
+ conns = list(self._pool)
+ self._pool.clear()
+ self._drained_at = monotonic()
+
+ # Close the connection already in the pool, open new ones.
+ for conn in conns:
+ self._close_connection(conn)
+ self.run_task(AddConnection(self))
+
def _putconn(self, conn: CT, from_getconn: bool) -> None:
# Use a worker to perform eventual maintenance work in a separate task
if self._reset:
return
# Check if the connection is past its best before date
- if conn._expire_at <= monotonic():
+ if conn._created_at <= self._drained_at or conn._expire_at <= monotonic():
logger.info("discarding expired connection")
self._close_connection(conn)
self.run_task(AddConnection(self))
await self._putconn(conn, from_getconn=False)
+ async def drain(self) -> None:
+ """
+ Remove all the connections from the pool and create new ones.
+
+ If a connection is currently out of the pool it will be closed when
+ returned to the pool and replaced with a new one.
+
+ This method is useful to force a connection re-configuration, for
+ example when the adapters map changes after the pool was created.
+ """
+ async with self._lock:
+ conns = list(self._pool)
+ self._pool.clear()
+ self._drained_at = monotonic()
+
+ # Close the connection already in the pool, open new ones.
+ for conn in conns:
+ await self._close_connection(conn)
+ self.run_task(AddConnection(self))
+
async def _putconn(self, conn: ACT, from_getconn: bool) -> None:
# Use a worker to perform eventual maintenance work in a separate task
if self._reset:
return
# Check if the connection is past its best before date
- if conn._expire_at <= monotonic():
+ if conn._created_at <= self._drained_at or conn._expire_at <= monotonic():
logger.info("discarding expired connection")
await self._close_connection(conn)
self.run_task(AddConnection(self))
assert cur.fetchone() == (3,)
+@pytest.mark.crdb_skip("backend pid")
+def test_drain(pool_cls, dsn):
+ pids1 = set()
+ pids2 = set()
+ pids3 = set()
+ with pool_cls(dsn, min_size=min_size(pool_cls, 2)) as p:
+ p.wait()
+
+ with p.connection() as conn:
+ pids1.add(conn.info.backend_pid)
+ with p.connection() as conn2:
+ pids1.add(conn2.info.backend_pid)
+ p.drain()
+ assert len(pids1) == 2
+
+ with p.connection() as conn:
+ pids2.add(conn.info.backend_pid)
+ with p.connection() as conn2:
+ pids2.add(conn2.info.backend_pid)
+
+ assert len(pids2) == 2
+
+ assert not pids1 & pids2
+
+ with p.connection() as conn:
+ pids3.add(conn.info.backend_pid)
+ with p.connection() as conn2:
+ pids3.add(conn2.info.backend_pid)
+
+ assert len(pids3) == 2
+ if pool_cls is not pool.NullConnectionPool:
+ assert pids2 == pids3
+
+
def min_size(pool_cls, num=1):
"""Return the minimum min_size supported by the pool class."""
if pool_cls is pool.ConnectionPool:
assert (await cur.fetchone()) == (3,)
+@pytest.mark.crdb_skip("backend pid")
+async def test_drain(pool_cls, dsn):
+ pids1 = set()
+ pids2 = set()
+ pids3 = set()
+ async with pool_cls(dsn, min_size=min_size(pool_cls, 2)) as p:
+ await p.wait()
+
+ async with p.connection() as conn:
+ pids1.add(conn.info.backend_pid)
+ async with p.connection() as conn2:
+ pids1.add(conn2.info.backend_pid)
+ await p.drain()
+ assert len(pids1) == 2
+
+ async with p.connection() as conn:
+ pids2.add(conn.info.backend_pid)
+ async with p.connection() as conn2:
+ pids2.add(conn2.info.backend_pid)
+
+ assert len(pids2) == 2
+
+ assert not pids1 & pids2
+
+ async with p.connection() as conn:
+ pids3.add(conn.info.backend_pid)
+ async with p.connection() as conn2:
+ pids3.add(conn2.info.backend_pid)
+
+ assert len(pids3) == 2
+ if pool_cls is not pool.AsyncNullConnectionPool:
+ assert pids2 == pids3
+
+
def min_size(pool_cls, num=1):
"""Return the minimum min_size supported by the pool class."""
if pool_cls is pool.AsyncConnectionPool: