# apart a connection in the pool too (when _pool = None)
self._pool: Optional["BasePool[Any]"]
+ # Time after which the connection should be closed
+ self._expire_at: float
+
def __del__(self) -> None:
# If fails on connection we might not have this attribute yet
if not hasattr(self, "pgconn"):
conn = await AsyncConnection.connect(self.conninfo, **self.kwargs)
await self.configure(conn)
conn._pool = self
+ # Set an expiry date, with some randomness to avoid mass reconnection
+ conn._expire_at = monotonic() + self._jitter(
+ self.max_lifetime, -0.05, 0.0
+ )
return conn
async def _add_connection(
await self._reset_connection(conn)
if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
# Connection no more in working state: create a new one.
+ self.run_task(tasks.AddConnection(self))
logger.warning("discarding closed connection: %s", conn)
+ return
+
+ # Check if the connection is past its best before date
+ if conn._expire_at <= monotonic():
self.run_task(tasks.AddConnection(self))
- else:
- await self._add_to_pool(conn)
+ logger.info("discarding expired connection")
+ await conn.close()
+ return
+
+ await self._add_to_pool(conn)
async def _add_to_pool(self, conn: AsyncConnection) -> None:
"""
maxconn: Optional[int] = None,
name: Optional[str] = None,
timeout: float = 30.0,
+ max_lifetime: float = 60 * 60.0,
max_idle: float = 10 * 60.0,
reconnect_timeout: float = 5 * 60.0,
reconnect_failed: Optional[
self._maxconn = maxconn
self.timeout = timeout
self.reconnect_timeout = reconnect_timeout
+ self.max_lifetime = max_lifetime
self.max_idle = max_idle
self.num_workers = num_workers
conn = Connection.connect(self.conninfo, **self.kwargs)
self.configure(conn)
conn._pool = self
+ # Set an expiry date, with some randomness to avoid mass reconnection
+ conn._expire_at = monotonic() + self._jitter(
+ self.max_lifetime, -0.05, 0.0
+ )
return conn
def _add_connection(self, attempt: Optional[ConnectionAttempt]) -> None:
self._reset_connection(conn)
if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
# Connection no more in working state: create a new one.
+ self.run_task(tasks.AddConnection(self))
logger.warning("discarding closed connection: %s", conn)
+ return
+
+ # Check if the connection is past its best before date
+ if conn._expire_at <= monotonic():
self.run_task(tasks.AddConnection(self))
- else:
- self._add_to_pool(conn)
+ logger.info("discarding expired connection")
+ conn.close()
+ return
+
+ self._add_to_pool(conn)
def _add_to_pool(self, conn: Connection) -> None:
"""
with pool.ConnectionPool(dsn) as p:
assert p.minconn == p.maxconn == 4
assert p.timeout == 30
- assert p.max_idle == 600
+ assert p.max_idle == 10 * 60
+ assert p.max_lifetime == 60 * 60
assert p.num_workers == 3
assert 35 < max(rnds) < 36
+@pytest.mark.slow
+def test_max_lifetime(dsn):
+ with pool.ConnectionPool(dsn, minconn=1, max_lifetime=0.2) as p:
+ sleep(0.1)
+ pids = []
+ for i in range(5):
+ with p.connection() as conn:
+ pids.append(conn.pgconn.backend_pid)
+ sleep(0.2)
+
+ assert pids[0] == pids[1] != pids[2] == pids[3] != pids[4], pids
+
+
def delay_connection(monkeypatch, sec):
"""
Return a _connect_gen function delayed by the amount of seconds
async with pool.AsyncConnectionPool(dsn) as p:
assert p.minconn == p.maxconn == 4
assert p.timeout == 30
- assert p.max_idle == 600
+ assert p.max_idle == 10 * 60
+ assert p.max_lifetime == 60 * 60
assert p.num_workers == 3
assert 35 < max(rnds) < 36
+@pytest.mark.slow
+async def test_max_lifetime(dsn):
+ async with pool.AsyncConnectionPool(dsn, minconn=1, max_lifetime=0.2) as p:
+ await asyncio.sleep(0.1)
+ pids = []
+ for i in range(5):
+ async with p.connection() as conn:
+ pids.append(conn.pgconn.backend_pid)
+ await asyncio.sleep(0.2)
+
+ assert pids[0] == pids[1] != pids[2] == pids[3] != pids[4], pids
+
+
def delay_connection(monkeypatch, sec):
"""
Return a _connect_gen function delayed by the amount of seconds