@pytest.mark.slow
-def test_queue_timeout(dsn):
+def test_queue_timeout(dsn, retries):
def worker(n):
t0 = time()
try:
t1 = time()
results.append((n, t1 - t0, pid))
- results = []
- errors = []
+ for retry in retries:
+ with retry:
+ results = []
+ errors = []
- with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p:
- ts = [Thread(target=worker, args=(i,)) for i in range(4)]
- [t.start() for t in ts]
- [t.join() for t in ts]
+ with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p:
+ ts = [Thread(target=worker, args=(i,)) for i in range(4)]
+ [t.start() for t in ts]
+ [t.join() for t in ts]
- assert len(results) == 2
- assert len(errors) == 2
- for e in errors:
- assert 0.1 < e[1] < 0.15
+ assert len(results) == 2
+ assert len(errors) == 2
+ for e in errors:
+ assert 0.1 < e[1] < 0.15
@pytest.mark.slow
@pytest.mark.slow
-def test_queue_timeout_override(dsn):
+def test_queue_timeout_override(dsn, retries):
def worker(n):
t0 = time()
timeout = 0.25 if n == 3 else None
t1 = time()
results.append((n, t1 - t0, pid))
- results = []
- errors = []
+ for retry in retries:
+ with retry:
+ results = []
+ errors = []
- with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p:
- ts = [Thread(target=worker, args=(i,)) for i in range(4)]
- [t.start() for t in ts]
- [t.join() for t in ts]
+ with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p:
+ ts = [Thread(target=worker, args=(i,)) for i in range(4)]
+ [t.start() for t in ts]
+ [t.join() for t in ts]
- assert len(results) == 3
- assert len(errors) == 1
- for e in errors:
- assert 0.1 < e[1] < 0.15
+ assert len(results) == 3
+ assert len(errors) == 1
+ for e in errors:
+ assert 0.1 < e[1] < 0.15
def test_broken_reconnect(dsn):
@pytest.mark.slow
-async def test_queue_timeout(dsn):
+async def test_queue_timeout(dsn, retries):
async def worker(n):
t0 = time()
try:
t1 = time()
results.append((n, t1 - t0, pid))
- results = []
- errors = []
+ async for retry in retries:
+ with retry:
+ results = []
+ errors = []
- async with pool.AsyncConnectionPool(dsn, min_size=2, timeout=0.1) as p:
- ts = [create_task(worker(i)) for i in range(4)]
- await asyncio.gather(*ts)
+ async with pool.AsyncConnectionPool(
+ dsn, min_size=2, timeout=0.1
+ ) as p:
+ ts = [create_task(worker(i)) for i in range(4)]
+ await asyncio.gather(*ts)
- assert len(results) == 2
- assert len(errors) == 2
- for e in errors:
- assert 0.1 < e[1] < 0.15
+ assert len(results) == 2
+ assert len(errors) == 2
+ for e in errors:
+ assert 0.1 < e[1] < 0.15
@pytest.mark.slow
@pytest.mark.slow
-async def test_queue_timeout_override(dsn):
+async def test_queue_timeout_override(dsn, retries):
async def worker(n):
t0 = time()
timeout = 0.25 if n == 3 else None
t1 = time()
results.append((n, t1 - t0, pid))
- results = []
- errors = []
+ async for retry in retries:
+ with retry:
+ results = []
+ errors = []
- async with pool.AsyncConnectionPool(dsn, min_size=2, timeout=0.1) as p:
- ts = [create_task(worker(i)) for i in range(4)]
- await asyncio.gather(*ts)
+ async with pool.AsyncConnectionPool(
+ dsn, min_size=2, timeout=0.1
+ ) as p:
+ ts = [create_task(worker(i)) for i in range(4)]
+ await asyncio.gather(*ts)
- assert len(results) == 3
- assert len(errors) == 1
- for e in errors:
- assert 0.1 < e[1] < 0.15
+ assert len(results) == 3
+ assert len(errors) == 1
+ for e in errors:
+ assert 0.1 < e[1] < 0.15
async def test_broken_reconnect(dsn):