From: Daniele Varrazzo Date: Sun, 28 Feb 2021 03:35:56 +0000 (+0100) Subject: Add max_lifetime to pool connections X-Git-Tag: 3.0.dev0~87^2~34 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=168acce58a8703904686dedd48d64e5da119a6e2;p=thirdparty%2Fpsycopg.git Add max_lifetime to pool connections --- diff --git a/psycopg3/psycopg3/connection.py b/psycopg3/psycopg3/connection.py index 3997061e3..d8ca89750 100644 --- a/psycopg3/psycopg3/connection.py +++ b/psycopg3/psycopg3/connection.py @@ -129,6 +129,9 @@ class BaseConnection(AdaptContext): # apart a connection in the pool too (when _pool = None) self._pool: Optional["BasePool[Any]"] + # Time after which the connection should be closed + self._expire_at: float + def __del__(self) -> None: # If fails on connection we might not have this attribute yet if not hasattr(self, "pgconn"): diff --git a/psycopg3/psycopg3/pool/async_pool.py b/psycopg3/psycopg3/pool/async_pool.py index c024214d4..c3494dd41 100644 --- a/psycopg3/psycopg3/pool/async_pool.py +++ b/psycopg3/psycopg3/pool/async_pool.py @@ -283,6 +283,10 @@ class AsyncConnectionPool(BasePool[AsyncConnection]): conn = await AsyncConnection.connect(self.conninfo, **self.kwargs) await self.configure(conn) conn._pool = self + # Set an expiry date, with some randomness to avoid mass reconnection + conn._expire_at = monotonic() + self._jitter( + self.max_lifetime, -0.05, 0.0 + ) return conn async def _add_connection( @@ -329,10 +333,18 @@ class AsyncConnectionPool(BasePool[AsyncConnection]): await self._reset_connection(conn) if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN: # Connection no more in working state: create a new one. + self.run_task(tasks.AddConnection(self)) logger.warning("discarding closed connection: %s", conn) + return + + # Check if the connection is past its best before date + if conn._expire_at <= monotonic(): self.run_task(tasks.AddConnection(self)) - else: - await self._add_to_pool(conn) + logger.info("discarding expired connection") + await conn.close() + return + + await self._add_to_pool(conn) async def _add_to_pool(self, conn: AsyncConnection) -> None: """ diff --git a/psycopg3/psycopg3/pool/base.py b/psycopg3/psycopg3/pool/base.py index c56b116c5..c342e69d3 100644 --- a/psycopg3/psycopg3/pool/base.py +++ b/psycopg3/psycopg3/pool/base.py @@ -35,6 +35,7 @@ class BasePool(Generic[ConnectionType]): maxconn: Optional[int] = None, name: Optional[str] = None, timeout: float = 30.0, + max_lifetime: float = 60 * 60.0, max_idle: float = 10 * 60.0, reconnect_timeout: float = 5 * 60.0, reconnect_failed: Optional[ @@ -62,6 +63,7 @@ class BasePool(Generic[ConnectionType]): self._maxconn = maxconn self.timeout = timeout self.reconnect_timeout = reconnect_timeout + self.max_lifetime = max_lifetime self.max_idle = max_idle self.num_workers = num_workers diff --git a/psycopg3/psycopg3/pool/pool.py b/psycopg3/psycopg3/pool/pool.py index 71c07ebc7..5154f2c84 100644 --- a/psycopg3/psycopg3/pool/pool.py +++ b/psycopg3/psycopg3/pool/pool.py @@ -261,6 +261,10 @@ class ConnectionPool(BasePool[Connection]): conn = Connection.connect(self.conninfo, **self.kwargs) self.configure(conn) conn._pool = self + # Set an expiry date, with some randomness to avoid mass reconnection + conn._expire_at = monotonic() + self._jitter( + self.max_lifetime, -0.05, 0.0 + ) return conn def _add_connection(self, attempt: Optional[ConnectionAttempt]) -> None: @@ -305,10 +309,18 @@ class ConnectionPool(BasePool[Connection]): self._reset_connection(conn) if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN: # Connection no more in working state: create a new one. + self.run_task(tasks.AddConnection(self)) logger.warning("discarding closed connection: %s", conn) + return + + # Check if the connection is past its best before date + if conn._expire_at <= monotonic(): self.run_task(tasks.AddConnection(self)) - else: - self._add_to_pool(conn) + logger.info("discarding expired connection") + conn.close() + return + + self._add_to_pool(conn) def _add_to_pool(self, conn: Connection) -> None: """ diff --git a/tests/pool/test_pool.py b/tests/pool/test_pool.py index 49792724a..7cfc9435d 100644 --- a/tests/pool/test_pool.py +++ b/tests/pool/test_pool.py @@ -15,7 +15,8 @@ def test_defaults(dsn): with pool.ConnectionPool(dsn) as p: assert p.minconn == p.maxconn == 4 assert p.timeout == 30 - assert p.max_idle == 600 + assert p.max_idle == 10 * 60 + assert p.max_lifetime == 60 * 60 assert p.num_workers == 3 @@ -643,6 +644,19 @@ def test_jitter(): assert 35 < max(rnds) < 36 +@pytest.mark.slow +def test_max_lifetime(dsn): + with pool.ConnectionPool(dsn, minconn=1, max_lifetime=0.2) as p: + sleep(0.1) + pids = [] + for i in range(5): + with p.connection() as conn: + pids.append(conn.pgconn.backend_pid) + sleep(0.2) + + assert pids[0] == pids[1] != pids[2] == pids[3] != pids[4], pids + + def delay_connection(monkeypatch, sec): """ Return a _connect_gen function delayed by the amount of seconds diff --git a/tests/pool/test_pool_async.py b/tests/pool/test_pool_async.py index 7a19e036b..60c91c126 100644 --- a/tests/pool/test_pool_async.py +++ b/tests/pool/test_pool_async.py @@ -24,7 +24,8 @@ async def test_defaults(dsn): async with pool.AsyncConnectionPool(dsn) as p: assert p.minconn == p.maxconn == 4 assert p.timeout == 30 - assert p.max_idle == 600 + assert p.max_idle == 10 * 60 + assert p.max_lifetime == 60 * 60 assert p.num_workers == 3 @@ -677,6 +678,19 @@ def test_jitter(): assert 35 < max(rnds) < 36 +@pytest.mark.slow +async def test_max_lifetime(dsn): + async with pool.AsyncConnectionPool(dsn, minconn=1, max_lifetime=0.2) as p: + await asyncio.sleep(0.1) + pids = [] + for i in range(5): + async with p.connection() as conn: + pids.append(conn.pgconn.backend_pid) + await asyncio.sleep(0.2) + + assert pids[0] == pids[1] != pids[2] == pids[3] != pids[4], pids + + def delay_connection(monkeypatch, sec): """ Return a _connect_gen function delayed by the amount of seconds