@pytest.mark.slow
-def test_max_lifetime(dsn):
- with pool.ConnectionPool(dsn, minconn=1, max_lifetime=0.2) as p:
- sleep(0.1)
- pids = []
- for i in range(5):
- with p.connection() as conn:
- pids.append(conn.pgconn.backend_pid)
- sleep(0.2)
+def test_max_lifetime(dsn, retries):
+ for retry in retries:
+ with retry:
+ with pool.ConnectionPool(dsn, minconn=1, max_lifetime=0.2) as p:
+ sleep(0.1)
+ pids = []
+ for i in range(5):
+ with p.connection() as conn:
+ pids.append(conn.pgconn.backend_pid)
+ sleep(0.2)
- assert pids[0] == pids[1] != pids[2] == pids[3] != pids[4], pids
+ assert pids[0] == pids[1] != pids[2] == pids[3] != pids[4], pids
def delay_connection(monkeypatch, sec):
@pytest.mark.slow
-async def test_max_lifetime(dsn):
- async with pool.AsyncConnectionPool(dsn, minconn=1, max_lifetime=0.2) as p:
- await asyncio.sleep(0.1)
- pids = []
- for i in range(5):
- async with p.connection() as conn:
- pids.append(conn.pgconn.backend_pid)
- await asyncio.sleep(0.2)
+async def test_max_lifetime(dsn, retries):
+ async for retry in retries:
+ with retry:
+ async with pool.AsyncConnectionPool(
+ dsn, minconn=1, max_lifetime=0.2
+ ) as p:
+ await asyncio.sleep(0.1)
+ pids = []
+ for i in range(5):
+ async with p.connection() as conn:
+ pids.append(conn.pgconn.backend_pid)
+ await asyncio.sleep(0.2)
- assert pids[0] == pids[1] != pids[2] == pids[3] != pids[4], pids
+ assert pids[0] == pids[1] != pids[2] == pids[3] != pids[4], pids
def delay_connection(monkeypatch, sec):