Now we just retry all failing tests automatically.
"pytest-asyncio >= 0.17",
"pytest-cov >= 3.0",
"pytest-randomly >= 3.10",
- "tenacity >= 8.0",
],
# Requirements needed for development
"dev": [
]
ignore_missing_imports = true
-[[tool.mypy.overrides]]
-module = "tenacity.*"
-implicit_reexport = true
-
[[tool.mypy.overrides]]
module = "uvloop"
ignore_missing_imports = true
import sys
import asyncio
-import inspect
import pytest
return [f"asyncio loop: {loop}"]
-@pytest.fixture
-def retries(request):
- """Retry a block in a test a few times before giving up."""
- import tenacity
-
- if inspect.iscoroutinefunction(request.function):
- return tenacity.AsyncRetrying(
- reraise=True, stop=tenacity.stop_after_attempt(3)
- )
- else:
- return tenacity.Retrying(
- reraise=True, stop=tenacity.stop_after_attempt(3)
- )
-
-
def pytest_sessionstart(session):
# Configure the async loop.
loop = session.config.getoption("--loop")
pytest-asyncio == 0.17.0
pytest-cov == 3.0.0
pytest-randomly == 3.10.0
-tenacity == 8.0.0
# From the 'dev' extra
black == 21.12b0
@pytest.mark.slow
@pytest.mark.timing
-def test_queue(dsn, retries):
+def test_queue(dsn):
def worker(n):
t0 = time()
with p.connection() as conn:
t1 = time()
results.append((n, t1 - t0, pid))
- for retry in retries:
- with retry:
- results: List[Tuple[int, float, int]] = []
- with NullConnectionPool(dsn, max_size=2) as p:
- p.wait()
- ts = [Thread(target=worker, args=(i,)) for i in range(6)]
- for t in ts:
- t.start()
- for t in ts:
- t.join()
+ results: List[Tuple[int, float, int]] = []
+ with NullConnectionPool(dsn, max_size=2) as p:
+ p.wait()
+ ts = [Thread(target=worker, args=(i,)) for i in range(6)]
+ for t in ts:
+ t.start()
+ for t in ts:
+ t.join()
- times = [item[1] for item in results]
- want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
- for got, want in zip(times, want_times):
- assert got == pytest.approx(want, 0.2), times
+ times = [item[1] for item in results]
+ want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
+ for got, want in zip(times, want_times):
+ assert got == pytest.approx(want, 0.2), times
- assert len(set(r[2] for r in results)) == 2, results
+ assert len(set(r[2] for r in results)) == 2, results
@pytest.mark.slow
@pytest.mark.slow
@pytest.mark.timing
-def test_queue_timeout(dsn, retries):
+def test_queue_timeout(dsn):
def worker(n):
t0 = time()
try:
t1 = time()
results.append((n, t1 - t0, pid))
- for retry in retries:
- with retry:
- results: List[Tuple[int, float, int]] = []
- errors: List[Tuple[int, float, Exception]] = []
+ results: List[Tuple[int, float, int]] = []
+ errors: List[Tuple[int, float, Exception]] = []
- with NullConnectionPool(dsn, max_size=2, timeout=0.1) as p:
- ts = [Thread(target=worker, args=(i,)) for i in range(4)]
- for t in ts:
- t.start()
- for t in ts:
- t.join()
+ with NullConnectionPool(dsn, max_size=2, timeout=0.1) as p:
+ ts = [Thread(target=worker, args=(i,)) for i in range(4)]
+ for t in ts:
+ t.start()
+ for t in ts:
+ t.join()
- assert len(results) == 2
- assert len(errors) == 2
- for e in errors:
- assert 0.1 < e[1] < 0.15
+ assert len(results) == 2
+ assert len(errors) == 2
+ for e in errors:
+ assert 0.1 < e[1] < 0.15
@pytest.mark.slow
@pytest.mark.slow
@pytest.mark.timing
-def test_queue_timeout_override(dsn, retries):
+def test_queue_timeout_override(dsn):
def worker(n):
t0 = time()
timeout = 0.25 if n == 3 else None
t1 = time()
results.append((n, t1 - t0, pid))
- for retry in retries:
- with retry:
- results: List[Tuple[int, float, int]] = []
- errors: List[Tuple[int, float, Exception]] = []
+ results: List[Tuple[int, float, int]] = []
+ errors: List[Tuple[int, float, Exception]] = []
- with NullConnectionPool(dsn, max_size=2, timeout=0.1) as p:
- ts = [Thread(target=worker, args=(i,)) for i in range(4)]
- for t in ts:
- t.start()
- for t in ts:
- t.join()
+ with NullConnectionPool(dsn, max_size=2, timeout=0.1) as p:
+ ts = [Thread(target=worker, args=(i,)) for i in range(4)]
+ for t in ts:
+ t.start()
+ for t in ts:
+ t.join()
- assert len(results) == 3
- assert len(errors) == 1
- for e in errors:
- assert 0.1 < e[1] < 0.15
+ assert len(results) == 3
+ assert len(errors) == 1
+ for e in errors:
+ assert 0.1 < e[1] < 0.15
def test_broken_reconnect(dsn):
@pytest.mark.slow
@pytest.mark.timing
-def test_stats_usage(dsn, retries):
+def test_stats_usage(dsn):
def worker(n):
try:
with p.connection(timeout=0.3) as conn:
except PoolTimeout:
pass
- for retry in retries:
- with retry:
- with NullConnectionPool(dsn, max_size=3) as p:
- p.wait(2.0)
-
- ts = [Thread(target=worker, args=(i,)) for i in range(7)]
- for t in ts:
- t.start()
- for t in ts:
- t.join()
- stats = p.get_stats()
- assert stats["requests_num"] == 7
- assert stats["requests_queued"] == 4
- assert 850 <= stats["requests_wait_ms"] <= 950
- assert stats["requests_errors"] == 1
- assert 1150 <= stats["usage_ms"] <= 1250
- assert stats.get("returns_bad", 0) == 0
-
- with p.connection() as conn:
- conn.close()
- p.wait()
- stats = p.pop_stats()
- assert stats["requests_num"] == 8
- assert stats["returns_bad"] == 1
- with p.connection():
- pass
- assert p.get_stats()["requests_num"] == 1
+ with NullConnectionPool(dsn, max_size=3) as p:
+ p.wait(2.0)
+
+ ts = [Thread(target=worker, args=(i,)) for i in range(7)]
+ for t in ts:
+ t.start()
+ for t in ts:
+ t.join()
+ stats = p.get_stats()
+ assert stats["requests_num"] == 7
+ assert stats["requests_queued"] == 4
+ assert 850 <= stats["requests_wait_ms"] <= 950
+ assert stats["requests_errors"] == 1
+ assert 1150 <= stats["usage_ms"] <= 1250
+ assert stats.get("returns_bad", 0) == 0
+
+ with p.connection() as conn:
+ conn.close()
+ p.wait()
+ stats = p.pop_stats()
+ assert stats["requests_num"] == 8
+ assert stats["returns_bad"] == 1
+ with p.connection():
+ pass
+ assert p.get_stats()["requests_num"] == 1
@pytest.mark.slow
@pytest.mark.slow
@pytest.mark.timing
-async def test_queue(dsn, retries):
+async def test_queue(dsn):
async def worker(n):
t0 = time()
async with p.connection() as conn:
t1 = time()
results.append((n, t1 - t0, pid))
- async for retry in retries:
- with retry:
- results: List[Tuple[int, float, int]] = []
- async with AsyncNullConnectionPool(dsn, max_size=2) as p:
- await p.wait()
- ts = [create_task(worker(i)) for i in range(6)]
- await asyncio.gather(*ts)
+ results: List[Tuple[int, float, int]] = []
+ async with AsyncNullConnectionPool(dsn, max_size=2) as p:
+ await p.wait()
+ ts = [create_task(worker(i)) for i in range(6)]
+ await asyncio.gather(*ts)
- times = [item[1] for item in results]
- want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
- for got, want in zip(times, want_times):
- assert got == pytest.approx(want, 0.2), times
+ times = [item[1] for item in results]
+ want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
+ for got, want in zip(times, want_times):
+ assert got == pytest.approx(want, 0.2), times
- assert len(set(r[2] for r in results)) == 2, results
+ assert len(set(r[2] for r in results)) == 2, results
@pytest.mark.slow
@pytest.mark.slow
@pytest.mark.timing
-async def test_queue_timeout(dsn, retries):
+async def test_queue_timeout(dsn):
async def worker(n):
t0 = time()
try:
t1 = time()
results.append((n, t1 - t0, pid))
- async for retry in retries:
- with retry:
- results: List[Tuple[int, float, int]] = []
- errors: List[Tuple[int, float, Exception]] = []
+ results: List[Tuple[int, float, int]] = []
+ errors: List[Tuple[int, float, Exception]] = []
- async with AsyncNullConnectionPool(
- dsn, max_size=2, timeout=0.1
- ) as p:
- ts = [create_task(worker(i)) for i in range(4)]
- await asyncio.gather(*ts)
+ async with AsyncNullConnectionPool(dsn, max_size=2, timeout=0.1) as p:
+ ts = [create_task(worker(i)) for i in range(4)]
+ await asyncio.gather(*ts)
- assert len(results) == 2
- assert len(errors) == 2
- for e in errors:
- assert 0.1 < e[1] < 0.15
+ assert len(results) == 2
+ assert len(errors) == 2
+ for e in errors:
+ assert 0.1 < e[1] < 0.15
@pytest.mark.slow
@pytest.mark.slow
@pytest.mark.timing
-async def test_queue_timeout_override(dsn, retries):
+async def test_queue_timeout_override(dsn):
async def worker(n):
t0 = time()
timeout = 0.25 if n == 3 else None
t1 = time()
results.append((n, t1 - t0, pid))
- async for retry in retries:
- with retry:
- results: List[Tuple[int, float, int]] = []
- errors: List[Tuple[int, float, Exception]] = []
+ results: List[Tuple[int, float, int]] = []
+ errors: List[Tuple[int, float, Exception]] = []
- async with AsyncNullConnectionPool(
- dsn, max_size=2, timeout=0.1
- ) as p:
- ts = [create_task(worker(i)) for i in range(4)]
- await asyncio.gather(*ts)
+ async with AsyncNullConnectionPool(dsn, max_size=2, timeout=0.1) as p:
+ ts = [create_task(worker(i)) for i in range(4)]
+ await asyncio.gather(*ts)
- assert len(results) == 3
- assert len(errors) == 1
- for e in errors:
- assert 0.1 < e[1] < 0.15
+ assert len(results) == 3
+ assert len(errors) == 1
+ for e in errors:
+ assert 0.1 < e[1] < 0.15
async def test_broken_reconnect(dsn):
@pytest.mark.slow
@pytest.mark.timing
-async def test_stats_usage(dsn, retries):
+async def test_stats_usage(dsn):
async def worker(n):
try:
async with p.connection(timeout=0.3) as conn:
except PoolTimeout:
pass
- async for retry in retries:
- with retry:
- async with AsyncNullConnectionPool(dsn, max_size=3) as p:
- await p.wait(2.0)
-
- ts = [create_task(worker(i)) for i in range(7)]
- await asyncio.gather(*ts)
- stats = p.get_stats()
- assert stats["requests_num"] == 7
- assert stats["requests_queued"] == 4
- assert 850 <= stats["requests_wait_ms"] <= 950
- assert stats["requests_errors"] == 1
- assert 1150 <= stats["usage_ms"] <= 1250
- assert stats.get("returns_bad", 0) == 0
-
- async with p.connection() as conn:
- await conn.close()
- await p.wait()
- stats = p.pop_stats()
- assert stats["requests_num"] == 8
- assert stats["returns_bad"] == 1
- async with p.connection():
- pass
- assert p.get_stats()["requests_num"] == 1
+ async with AsyncNullConnectionPool(dsn, max_size=3) as p:
+ await p.wait(2.0)
+
+ ts = [create_task(worker(i)) for i in range(7)]
+ await asyncio.gather(*ts)
+ stats = p.get_stats()
+ assert stats["requests_num"] == 7
+ assert stats["requests_queued"] == 4
+ assert 850 <= stats["requests_wait_ms"] <= 950
+ assert stats["requests_errors"] == 1
+ assert 1150 <= stats["usage_ms"] <= 1250
+ assert stats.get("returns_bad", 0) == 0
+
+ async with p.connection() as conn:
+ await conn.close()
+ await p.wait()
+ stats = p.pop_stats()
+ assert stats["requests_num"] == 8
+ assert stats["returns_bad"] == 1
+ async with p.connection():
+ pass
+ assert p.get_stats()["requests_num"] == 1
@pytest.mark.slow
@pytest.mark.slow
@pytest.mark.timing
-def test_concurrent_filling(dsn, monkeypatch, retries):
+def test_concurrent_filling(dsn, monkeypatch):
delay_connection(monkeypatch, 0.1)
def add_time(self, conn):
add_orig = pool.ConnectionPool._add_to_pool
monkeypatch.setattr(pool.ConnectionPool, "_add_to_pool", add_time)
- for retry in retries:
- with retry:
- times: List[float] = []
- t0 = time()
+ times: List[float] = []
+ t0 = time()
- with pool.ConnectionPool(dsn, min_size=5, num_workers=2) as p:
- p.wait(1.0)
- want_times = [0.1, 0.1, 0.2, 0.2, 0.3]
- assert len(times) == len(want_times)
- for got, want in zip(times, want_times):
- assert got == pytest.approx(want, 0.1), times
+ with pool.ConnectionPool(dsn, min_size=5, num_workers=2) as p:
+ p.wait(1.0)
+ want_times = [0.1, 0.1, 0.2, 0.2, 0.3]
+ assert len(times) == len(want_times)
+ for got, want in zip(times, want_times):
+ assert got == pytest.approx(want, 0.1), times
@pytest.mark.slow
@pytest.mark.slow
@pytest.mark.timing
-def test_queue(dsn, retries):
+def test_queue(dsn):
def worker(n):
t0 = time()
with p.connection() as conn:
t1 = time()
results.append((n, t1 - t0, pid))
- for retry in retries:
- with retry:
- results: List[Tuple[int, float, int]] = []
- with pool.ConnectionPool(dsn, min_size=2) as p:
- p.wait()
- ts = [Thread(target=worker, args=(i,)) for i in range(6)]
- for t in ts:
- t.start()
- for t in ts:
- t.join()
+ results: List[Tuple[int, float, int]] = []
+ with pool.ConnectionPool(dsn, min_size=2) as p:
+ p.wait()
+ ts = [Thread(target=worker, args=(i,)) for i in range(6)]
+ for t in ts:
+ t.start()
+ for t in ts:
+ t.join()
- times = [item[1] for item in results]
- want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
- for got, want in zip(times, want_times):
- assert got == pytest.approx(want, 0.1), times
+ times = [item[1] for item in results]
+ want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
+ for got, want in zip(times, want_times):
+ assert got == pytest.approx(want, 0.1), times
- assert len(set(r[2] for r in results)) == 2, results
+ assert len(set(r[2] for r in results)) == 2, results
@pytest.mark.slow
@pytest.mark.slow
@pytest.mark.timing
-def test_queue_timeout(dsn, retries):
+def test_queue_timeout(dsn):
def worker(n):
t0 = time()
try:
t1 = time()
results.append((n, t1 - t0, pid))
- for retry in retries:
- with retry:
- results: List[Tuple[int, float, int]] = []
- errors: List[Tuple[int, float, Exception]] = []
+ results: List[Tuple[int, float, int]] = []
+ errors: List[Tuple[int, float, Exception]] = []
- with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p:
- ts = [Thread(target=worker, args=(i,)) for i in range(4)]
- for t in ts:
- t.start()
- for t in ts:
- t.join()
+ with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p:
+ ts = [Thread(target=worker, args=(i,)) for i in range(4)]
+ for t in ts:
+ t.start()
+ for t in ts:
+ t.join()
- assert len(results) == 2
- assert len(errors) == 2
- for e in errors:
- assert 0.1 < e[1] < 0.15
+ assert len(results) == 2
+ assert len(errors) == 2
+ for e in errors:
+ assert 0.1 < e[1] < 0.15
@pytest.mark.slow
@pytest.mark.slow
@pytest.mark.timing
-def test_queue_timeout_override(dsn, retries):
+def test_queue_timeout_override(dsn):
def worker(n):
t0 = time()
timeout = 0.25 if n == 3 else None
t1 = time()
results.append((n, t1 - t0, pid))
- for retry in retries:
- with retry:
- results: List[Tuple[int, float, int]] = []
- errors: List[Tuple[int, float, Exception]] = []
+ results: List[Tuple[int, float, int]] = []
+ errors: List[Tuple[int, float, Exception]] = []
- with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p:
- ts = [Thread(target=worker, args=(i,)) for i in range(4)]
- for t in ts:
- t.start()
- for t in ts:
- t.join()
+ with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p:
+ ts = [Thread(target=worker, args=(i,)) for i in range(4)]
+ for t in ts:
+ t.start()
+ for t in ts:
+ t.join()
- assert len(results) == 3
- assert len(errors) == 1
- for e in errors:
- assert 0.1 < e[1] < 0.15
+ assert len(results) == 3
+ assert len(errors) == 1
+ for e in errors:
+ assert 0.1 < e[1] < 0.15
def test_broken_reconnect(dsn):
(0, [0.35, 0.45, 0.55, 0.60, 0.65, 0.70, 0.80, 0.85]),
],
)
-def test_grow(dsn, monkeypatch, retries, min_size, want_times):
+def test_grow(dsn, monkeypatch, min_size, want_times):
delay_connection(monkeypatch, 0.1)
def worker(n):
t1 = time()
results.append((n, t1 - t0))
- for retry in retries:
- with retry:
- with pool.ConnectionPool(
- dsn, min_size=min_size, max_size=4, num_workers=3
- ) as p:
- p.wait(1.0)
- results: List[Tuple[int, float]] = []
+ with pool.ConnectionPool(
+ dsn, min_size=min_size, max_size=4, num_workers=3
+ ) as p:
+ p.wait(1.0)
+ results: List[Tuple[int, float]] = []
- ts = [
- Thread(target=worker, args=(i,))
- for i in range(len(want_times))
- ]
- for t in ts:
- t.start()
- for t in ts:
- t.join()
+ ts = [Thread(target=worker, args=(i,)) for i in range(len(want_times))]
+ for t in ts:
+ t.start()
+ for t in ts:
+ t.join()
- times = [item[1] for item in results]
- for got, want in zip(times, want_times):
- assert got == pytest.approx(want, 0.1), times
+ times = [item[1] for item in results]
+ for got, want in zip(times, want_times):
+ assert got == pytest.approx(want, 0.1), times
@pytest.mark.slow
@pytest.mark.slow
-def test_reconnect(proxy, caplog, monkeypatch, retries):
+def test_reconnect(proxy, caplog, monkeypatch):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0
monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1)
monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0)
- for retry in retries:
- with retry:
- caplog.clear()
- proxy.start()
- with pool.ConnectionPool(proxy.client_dsn, min_size=1) as p:
- p.wait(2.0)
- proxy.stop()
-
- with pytest.raises(psycopg.OperationalError):
- with p.connection() as conn:
- conn.execute("select 1")
-
- sleep(1.0)
- proxy.start()
- p.wait()
-
- with p.connection() as conn:
- conn.execute("select 1")
-
- assert "BAD" in caplog.messages[0]
- times = [rec.created for rec in caplog.records]
- assert times[1] - times[0] < 0.05
- deltas = [
- times[i + 1] - times[i] for i in range(1, len(times) - 1)
- ]
- assert len(deltas) == 3
- want = 0.1
- for delta in deltas:
- assert delta == pytest.approx(want, 0.05), deltas
- want *= 2
+ caplog.clear()
+ proxy.start()
+ with pool.ConnectionPool(proxy.client_dsn, min_size=1) as p:
+ p.wait(2.0)
+ proxy.stop()
+
+ with pytest.raises(psycopg.OperationalError):
+ with p.connection() as conn:
+ conn.execute("select 1")
+
+ sleep(1.0)
+ proxy.start()
+ p.wait()
+
+ with p.connection() as conn:
+ conn.execute("select 1")
+
+ assert "BAD" in caplog.messages[0]
+ times = [rec.created for rec in caplog.records]
+ assert times[1] - times[0] < 0.05
+ deltas = [times[i + 1] - times[i] for i in range(1, len(times) - 1)]
+ assert len(deltas) == 3
+ want = 0.1
+ for delta in deltas:
+ assert delta == pytest.approx(want, 0.05), deltas
+ want *= 2
@pytest.mark.slow
@pytest.mark.slow
-def test_uniform_use(dsn, retries):
- for retry in retries:
- with retry:
- with pool.ConnectionPool(dsn, min_size=4) as p:
- counts = Counter[int]()
- for i in range(8):
- with p.connection() as conn:
- sleep(0.1)
- counts[id(conn)] += 1
+def test_uniform_use(dsn):
+ with pool.ConnectionPool(dsn, min_size=4) as p:
+ counts = Counter[int]()
+ for i in range(8):
+ with p.connection() as conn:
+ sleep(0.1)
+ counts[id(conn)] += 1
- assert len(counts) == 4
- assert set(counts.values()) == set([2])
+ assert len(counts) == 4
+ assert set(counts.values()) == set([2])
@pytest.mark.slow
@pytest.mark.slow
@pytest.mark.timing
-def test_stats_usage(dsn, retries):
+def test_stats_usage(dsn):
def worker(n):
try:
with p.connection(timeout=0.3) as conn:
except pool.PoolTimeout:
pass
- for retry in retries:
- with retry:
- with pool.ConnectionPool(dsn, min_size=3) as p:
- p.wait(2.0)
-
- ts = [Thread(target=worker, args=(i,)) for i in range(7)]
- for t in ts:
- t.start()
- for t in ts:
- t.join()
- stats = p.get_stats()
- assert stats["requests_num"] == 7
- assert stats["requests_queued"] == 4
- assert 850 <= stats["requests_wait_ms"] <= 950
- assert stats["requests_errors"] == 1
- assert 1150 <= stats["usage_ms"] <= 1250
- assert stats.get("returns_bad", 0) == 0
-
- with p.connection() as conn:
- conn.close()
- p.wait()
- stats = p.pop_stats()
- assert stats["requests_num"] == 8
- assert stats["returns_bad"] == 1
- with p.connection():
- pass
- assert p.get_stats()["requests_num"] == 1
+ with pool.ConnectionPool(dsn, min_size=3) as p:
+ p.wait(2.0)
+
+ ts = [Thread(target=worker, args=(i,)) for i in range(7)]
+ for t in ts:
+ t.start()
+ for t in ts:
+ t.join()
+ stats = p.get_stats()
+ assert stats["requests_num"] == 7
+ assert stats["requests_queued"] == 4
+ assert 850 <= stats["requests_wait_ms"] <= 950
+ assert stats["requests_errors"] == 1
+ assert 1150 <= stats["usage_ms"] <= 1250
+ assert stats.get("returns_bad", 0) == 0
+
+ with p.connection() as conn:
+ conn.close()
+ p.wait()
+ stats = p.pop_stats()
+ assert stats["requests_num"] == 8
+ assert stats["returns_bad"] == 1
+ with p.connection():
+ pass
+ assert p.get_stats()["requests_num"] == 1
@pytest.mark.slow
@pytest.mark.slow
@pytest.mark.timing
-async def test_concurrent_filling(dsn, monkeypatch, retries):
+async def test_concurrent_filling(dsn, monkeypatch):
delay_connection(monkeypatch, 0.1)
async def add_time(self, conn):
add_orig = pool.AsyncConnectionPool._add_to_pool
monkeypatch.setattr(pool.AsyncConnectionPool, "_add_to_pool", add_time)
- async for retry in retries:
- with retry:
- times: List[float] = []
- t0 = time()
+ times: List[float] = []
+ t0 = time()
- async with pool.AsyncConnectionPool(
- dsn, min_size=5, num_workers=2
- ) as p:
- await p.wait(1.0)
- want_times = [0.1, 0.1, 0.2, 0.2, 0.3]
- assert len(times) == len(want_times)
- for got, want in zip(times, want_times):
- assert got == pytest.approx(want, 0.1), times
+ async with pool.AsyncConnectionPool(dsn, min_size=5, num_workers=2) as p:
+ await p.wait(1.0)
+ want_times = [0.1, 0.1, 0.2, 0.2, 0.3]
+ assert len(times) == len(want_times)
+ for got, want in zip(times, want_times):
+ assert got == pytest.approx(want, 0.1), times
@pytest.mark.slow
@pytest.mark.slow
@pytest.mark.timing
-async def test_queue(dsn, retries):
+async def test_queue(dsn):
async def worker(n):
t0 = time()
async with p.connection() as conn:
t1 = time()
results.append((n, t1 - t0, pid))
- async for retry in retries:
- with retry:
- results: List[Tuple[int, float, int]] = []
- async with pool.AsyncConnectionPool(dsn, min_size=2) as p:
- await p.wait()
- ts = [create_task(worker(i)) for i in range(6)]
- await asyncio.gather(*ts)
+ results: List[Tuple[int, float, int]] = []
+ async with pool.AsyncConnectionPool(dsn, min_size=2) as p:
+ await p.wait()
+ ts = [create_task(worker(i)) for i in range(6)]
+ await asyncio.gather(*ts)
- times = [item[1] for item in results]
- want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
- for got, want in zip(times, want_times):
- assert got == pytest.approx(want, 0.1), times
+ times = [item[1] for item in results]
+ want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
+ for got, want in zip(times, want_times):
+ assert got == pytest.approx(want, 0.1), times
- assert len(set(r[2] for r in results)) == 2, results
+ assert len(set(r[2] for r in results)) == 2, results
@pytest.mark.slow
@pytest.mark.slow
@pytest.mark.timing
-async def test_queue_timeout(dsn, retries):
+async def test_queue_timeout(dsn):
async def worker(n):
t0 = time()
try:
t1 = time()
results.append((n, t1 - t0, pid))
- async for retry in retries:
- with retry:
- results: List[Tuple[int, float, int]] = []
- errors: List[Tuple[int, float, Exception]] = []
+ results: List[Tuple[int, float, int]] = []
+ errors: List[Tuple[int, float, Exception]] = []
- async with pool.AsyncConnectionPool(
- dsn, min_size=2, timeout=0.1
- ) as p:
- ts = [create_task(worker(i)) for i in range(4)]
- await asyncio.gather(*ts)
+ async with pool.AsyncConnectionPool(dsn, min_size=2, timeout=0.1) as p:
+ ts = [create_task(worker(i)) for i in range(4)]
+ await asyncio.gather(*ts)
- assert len(results) == 2
- assert len(errors) == 2
- for e in errors:
- assert 0.1 < e[1] < 0.15
+ assert len(results) == 2
+ assert len(errors) == 2
+ for e in errors:
+ assert 0.1 < e[1] < 0.15
@pytest.mark.slow
@pytest.mark.slow
@pytest.mark.timing
-async def test_queue_timeout_override(dsn, retries):
+async def test_queue_timeout_override(dsn):
async def worker(n):
t0 = time()
timeout = 0.25 if n == 3 else None
t1 = time()
results.append((n, t1 - t0, pid))
- async for retry in retries:
- with retry:
- results: List[Tuple[int, float, int]] = []
- errors: List[Tuple[int, float, Exception]] = []
+ results: List[Tuple[int, float, int]] = []
+ errors: List[Tuple[int, float, Exception]] = []
- async with pool.AsyncConnectionPool(
- dsn, min_size=2, timeout=0.1
- ) as p:
- ts = [create_task(worker(i)) for i in range(4)]
- await asyncio.gather(*ts)
+ async with pool.AsyncConnectionPool(dsn, min_size=2, timeout=0.1) as p:
+ ts = [create_task(worker(i)) for i in range(4)]
+ await asyncio.gather(*ts)
- assert len(results) == 3
- assert len(errors) == 1
- for e in errors:
- assert 0.1 < e[1] < 0.15
+ assert len(results) == 3
+ assert len(errors) == 1
+ for e in errors:
+ assert 0.1 < e[1] < 0.15
async def test_broken_reconnect(dsn):
(0, [0.35, 0.45, 0.55, 0.60, 0.65, 0.70, 0.80, 0.85]),
],
)
-async def test_grow(dsn, monkeypatch, retries, min_size, want_times):
+async def test_grow(dsn, monkeypatch, min_size, want_times):
delay_connection(monkeypatch, 0.1)
async def worker(n):
t1 = time()
results.append((n, t1 - t0))
- async for retry in retries:
- with retry:
- async with pool.AsyncConnectionPool(
- dsn, min_size=min_size, max_size=4, num_workers=3
- ) as p:
- await p.wait(1.0)
- ts = []
- results: List[Tuple[int, float]] = []
+ async with pool.AsyncConnectionPool(
+ dsn, min_size=min_size, max_size=4, num_workers=3
+ ) as p:
+ await p.wait(1.0)
+ ts = []
+ results: List[Tuple[int, float]] = []
- ts = [create_task(worker(i)) for i in range(len(want_times))]
- await asyncio.gather(*ts)
+ ts = [create_task(worker(i)) for i in range(len(want_times))]
+ await asyncio.gather(*ts)
- times = [item[1] for item in results]
- for got, want in zip(times, want_times):
- assert got == pytest.approx(want, 0.1), times
+ times = [item[1] for item in results]
+ for got, want in zip(times, want_times):
+ assert got == pytest.approx(want, 0.1), times
@pytest.mark.slow
@pytest.mark.slow
-async def test_reconnect(proxy, caplog, monkeypatch, retries):
+async def test_reconnect(proxy, caplog, monkeypatch):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0
monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1)
monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0)
- async for retry in retries:
- with retry:
- caplog.clear()
- proxy.start()
- async with pool.AsyncConnectionPool(
- proxy.client_dsn, min_size=1
- ) as p:
- await p.wait(2.0)
- proxy.stop()
-
- with pytest.raises(psycopg.OperationalError):
- async with p.connection() as conn:
- await conn.execute("select 1")
-
- await asyncio.sleep(1.0)
- proxy.start()
- await p.wait()
-
- async with p.connection() as conn:
- await conn.execute("select 1")
-
- assert "BAD" in caplog.messages[0]
- times = [rec.created for rec in caplog.records]
- assert times[1] - times[0] < 0.05
- deltas = [
- times[i + 1] - times[i] for i in range(1, len(times) - 1)
- ]
- assert len(deltas) == 3
- want = 0.1
- for delta in deltas:
- assert delta == pytest.approx(want, 0.05), deltas
- want *= 2
+ caplog.clear()
+ proxy.start()
+ async with pool.AsyncConnectionPool(proxy.client_dsn, min_size=1) as p:
+ await p.wait(2.0)
+ proxy.stop()
+
+ with pytest.raises(psycopg.OperationalError):
+ async with p.connection() as conn:
+ await conn.execute("select 1")
+
+ await asyncio.sleep(1.0)
+ proxy.start()
+ await p.wait()
+
+ async with p.connection() as conn:
+ await conn.execute("select 1")
+
+ assert "BAD" in caplog.messages[0]
+ times = [rec.created for rec in caplog.records]
+ assert times[1] - times[0] < 0.05
+ deltas = [times[i + 1] - times[i] for i in range(1, len(times) - 1)]
+ assert len(deltas) == 3
+ want = 0.1
+ for delta in deltas:
+ assert delta == pytest.approx(want, 0.05), deltas
+ want *= 2
@pytest.mark.slow
@pytest.mark.slow
-async def test_uniform_use(dsn, retries):
- async for retry in retries:
- with retry:
- async with pool.AsyncConnectionPool(dsn, min_size=4) as p:
- counts = Counter[int]()
- for i in range(8):
- async with p.connection() as conn:
- await asyncio.sleep(0.1)
- counts[id(conn)] += 1
+async def test_uniform_use(dsn):
+ async with pool.AsyncConnectionPool(dsn, min_size=4) as p:
+ counts = Counter[int]()
+ for i in range(8):
+ async with p.connection() as conn:
+ await asyncio.sleep(0.1)
+ counts[id(conn)] += 1
- assert len(counts) == 4
- assert set(counts.values()) == set([2])
+ assert len(counts) == 4
+ assert set(counts.values()) == set([2])
@pytest.mark.slow
@pytest.mark.slow
@pytest.mark.timing
-async def test_stats_usage(dsn, retries):
+async def test_stats_usage(dsn):
async def worker(n):
try:
async with p.connection(timeout=0.3) as conn:
except pool.PoolTimeout:
pass
- async for retry in retries:
- with retry:
- async with pool.AsyncConnectionPool(dsn, min_size=3) as p:
- await p.wait(2.0)
-
- ts = [create_task(worker(i)) for i in range(7)]
- await asyncio.gather(*ts)
- stats = p.get_stats()
- assert stats["requests_num"] == 7
- assert stats["requests_queued"] == 4
- assert 850 <= stats["requests_wait_ms"] <= 950
- assert stats["requests_errors"] == 1
- assert 1150 <= stats["usage_ms"] <= 1250
- assert stats.get("returns_bad", 0) == 0
-
- async with p.connection() as conn:
- await conn.close()
- await p.wait()
- stats = p.pop_stats()
- assert stats["requests_num"] == 8
- assert stats["returns_bad"] == 1
- async with p.connection():
- pass
- assert p.get_stats()["requests_num"] == 1
+ async with pool.AsyncConnectionPool(dsn, min_size=3) as p:
+ await p.wait(2.0)
+
+ ts = [create_task(worker(i)) for i in range(7)]
+ await asyncio.gather(*ts)
+ stats = p.get_stats()
+ assert stats["requests_num"] == 7
+ assert stats["requests_queued"] == 4
+ assert 850 <= stats["requests_wait_ms"] <= 950
+ assert stats["requests_errors"] == 1
+ assert 1150 <= stats["usage_ms"] <= 1250
+ assert stats.get("returns_bad", 0) == 0
+
+ async with p.connection() as conn:
+ await conn.close()
+ await p.wait()
+ stats = p.pop_stats()
+ assert stats["requests_num"] == 8
+ assert stats["returns_bad"] == 1
+ async with p.connection():
+ pass
+ assert p.get_stats()["requests_num"] == 1
@pytest.mark.slow
@pytest.mark.slow
-def test_concurrent_execution(dsn, retries):
+def test_concurrent_execution(dsn):
def worker():
cnn = psycopg.connect(dsn)
cur = cnn.cursor()
cur.close()
cnn.close()
- for retry in retries:
- with retry:
- t1 = threading.Thread(target=worker)
- t2 = threading.Thread(target=worker)
- t0 = time.time()
- t1.start()
- t2.start()
- t1.join()
- t2.join()
- assert time.time() - t0 < 0.8, "something broken in concurrency"
+ t1 = threading.Thread(target=worker)
+ t2 = threading.Thread(target=worker)
+ t0 = time.time()
+ t1.start()
+ t2.start()
+ t1.join()
+ t2.join()
+ assert time.time() - t0 < 0.8, "something broken in concurrency"
@pytest.mark.slow
@pytest.mark.slow
-def test_cancel(conn, retries):
+def test_cancel(conn):
def canceller():
try:
time.sleep(0.5)
except Exception as exc:
errors.append(exc)
- for retry in retries:
- with retry:
- errors: List[Exception] = []
+ errors: List[Exception] = []
- cur = conn.cursor()
- t = threading.Thread(target=canceller)
- t0 = time.time()
- t.start()
+ cur = conn.cursor()
+ t = threading.Thread(target=canceller)
+ t0 = time.time()
+ t.start()
- with pytest.raises(psycopg.DatabaseError):
- cur.execute("select pg_sleep(2)")
+ with pytest.raises(psycopg.DatabaseError):
+ cur.execute("select pg_sleep(2)")
- t1 = time.time()
- assert not errors
- assert 0.0 < t1 - t0 < 1.0
+ t1 = time.time()
+ assert not errors
+ assert 0.0 < t1 - t0 < 1.0
- # still working
- conn.rollback()
- assert cur.execute("select 1").fetchone()[0] == 1
+ # still working
+ conn.rollback()
+ assert cur.execute("select 1").fetchone()[0] == 1
- t.join()
+ t.join()
@pytest.mark.slow
-def test_identify_closure(dsn, retries):
+def test_identify_closure(dsn):
def closer():
time.sleep(0.2)
conn2.execute(
"select pg_terminate_backend(%s)", [conn.pgconn.backend_pid]
)
- for retry in retries:
- with retry:
- conn = psycopg.connect(dsn)
- conn2 = psycopg.connect(dsn)
- try:
- t = threading.Thread(target=closer)
- t.start()
- t0 = time.time()
- try:
- with pytest.raises(psycopg.OperationalError):
- conn.execute("select pg_sleep(1.0)")
- t1 = time.time()
- assert 0.2 < t1 - t0 < 0.4
- finally:
- t.join()
- finally:
- conn.close()
- conn2.close()
+ conn = psycopg.connect(dsn)
+ conn2 = psycopg.connect(dsn)
+ try:
+ t = threading.Thread(target=closer)
+ t.start()
+ t0 = time.time()
+ try:
+ with pytest.raises(psycopg.OperationalError):
+ conn.execute("select pg_sleep(1.0)")
+ t1 = time.time()
+ assert 0.2 < t1 - t0 < 0.4
+ finally:
+ t.join()
+ finally:
+ conn.close()
+ conn2.close()
@pytest.mark.slow
-async def test_concurrent_execution(dsn, retries):
+async def test_concurrent_execution(dsn):
async def worker():
cnn = await psycopg.AsyncConnection.connect(dsn)
cur = cnn.cursor()
await cur.close()
await cnn.close()
- async for retry in retries:
- with retry:
- workers = [worker(), worker()]
- t0 = time.time()
- await asyncio.gather(*workers)
- assert time.time() - t0 < 0.8, "something broken in concurrency"
+ workers = [worker(), worker()]
+ t0 = time.time()
+ await asyncio.gather(*workers)
+ assert time.time() - t0 < 0.8, "something broken in concurrency"
@pytest.mark.slow
@pytest.mark.slow
-async def test_cancel(aconn, retries):
+async def test_cancel(aconn):
async def canceller():
try:
await asyncio.sleep(0.5)
with pytest.raises(psycopg.DatabaseError):
await cur.execute("select pg_sleep(2)")
- async for retry in retries:
- with retry:
- errors: List[Exception] = []
- workers = [worker(), canceller()]
+ errors: List[Exception] = []
+ workers = [worker(), canceller()]
- t0 = time.time()
- await asyncio.gather(*workers)
+ t0 = time.time()
+ await asyncio.gather(*workers)
- t1 = time.time()
- assert not errors
- assert 0.0 < t1 - t0 < 1.0
+ t1 = time.time()
+ assert not errors
+ assert 0.0 < t1 - t0 < 1.0
- # still working
- await aconn.rollback()
- cur = aconn.cursor()
- await cur.execute("select 1")
- assert await cur.fetchone() == (1,)
+ # still working
+ await aconn.rollback()
+ cur = aconn.cursor()
+ await cur.execute("select 1")
+ assert await cur.fetchone() == (1,)
@pytest.mark.slow
-async def test_identify_closure(dsn, retries):
+async def test_identify_closure(dsn):
async def closer():
await asyncio.sleep(0.2)
await conn2.execute(
"select pg_terminate_backend(%s)", [aconn.pgconn.backend_pid]
)
- async for retry in retries:
- with retry:
- aconn = await psycopg.AsyncConnection.connect(dsn)
- conn2 = await psycopg.AsyncConnection.connect(dsn)
- try:
- t = create_task(closer())
- t0 = time.time()
- try:
- with pytest.raises(psycopg.OperationalError):
- await aconn.execute("select pg_sleep(1.0)")
- t1 = time.time()
- assert 0.2 < t1 - t0 < 0.4
- finally:
- await asyncio.gather(t)
- finally:
- await aconn.close()
- await conn2.close()
+ aconn = await psycopg.AsyncConnection.connect(dsn)
+ conn2 = await psycopg.AsyncConnection.connect(dsn)
+ try:
+ t = create_task(closer())
+ t0 = time.time()
+ try:
+ with pytest.raises(psycopg.OperationalError):
+ await aconn.execute("select pg_sleep(1.0)")
+ t1 = time.time()
+ assert 0.2 < t1 - t0 < 0.4
+ finally:
+ await asyncio.gather(t)
+ finally:
+ await aconn.close()
+ await conn2.close()
[(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
)
@pytest.mark.parametrize("method", ["read", "iter", "row", "rows"])
-def test_copy_to_leaks(dsn, faker, fmt, set_types, method, retries):
+def test_copy_to_leaks(dsn, faker, fmt, set_types, method):
faker.format = PyFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
list(copy.rows())
gc_collect()
- for retry in retries:
- with retry:
- n = []
- for i in range(3):
- work()
- gc_collect()
- n.append(len(gc.get_objects()))
+ n = []
+ for i in range(3):
+ work()
+ gc_collect()
+ n.append(len(gc.get_objects()))
- assert (
- n[0] == n[1] == n[2]
- ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+ assert (
+ n[0] == n[1] == n[2]
+ ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
@pytest.mark.slow
"fmt, set_types",
[(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
)
-def test_copy_from_leaks(dsn, faker, fmt, set_types, retries):
+def test_copy_from_leaks(dsn, faker, fmt, set_types):
faker.format = PyFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
faker.assert_record(got, want)
gc_collect()
- for retry in retries:
- with retry:
- n = []
- for i in range(3):
- work()
- gc_collect()
- n.append(len(gc.get_objects()))
-
- assert (
- n[0] == n[1] == n[2]
- ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+ n = []
+ for i in range(3):
+ work()
+ gc_collect()
+ n.append(len(gc.get_objects()))
+
+ assert (
+ n[0] == n[1] == n[2]
+ ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
def py_to_raw(item, fmt):
[(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
)
@pytest.mark.parametrize("method", ["read", "iter", "row", "rows"])
-async def test_copy_to_leaks(dsn, faker, fmt, set_types, method, retries):
+async def test_copy_to_leaks(dsn, faker, fmt, set_types, method):
faker.format = PyFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
await alist(copy.rows())
gc_collect()
- async for retry in retries:
- with retry:
- n = []
- for i in range(3):
- await work()
- gc_collect()
- n.append(len(gc.get_objects()))
+ n = []
+ for i in range(3):
+ await work()
+ gc_collect()
+ n.append(len(gc.get_objects()))
- assert (
- n[0] == n[1] == n[2]
- ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+ assert (
+ n[0] == n[1] == n[2]
+ ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
@pytest.mark.slow
"fmt, set_types",
[(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
)
-async def test_copy_from_leaks(dsn, faker, fmt, set_types, retries):
+async def test_copy_from_leaks(dsn, faker, fmt, set_types):
faker.format = PyFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
faker.assert_record(got, want)
gc_collect()
- async for retry in retries:
- with retry:
- n = []
- for i in range(3):
- await work()
- gc_collect()
- n.append(len(gc.get_objects()))
-
- assert (
- n[0] == n[1] == n[2]
- ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+ n = []
+ for i in range(3):
+ await work()
+ gc_collect()
+ n.append(len(gc.get_objects()))
+
+ assert (
+ n[0] == n[1] == n[2]
+ ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
async def ensure_table(cur, tabledef, name="copy_in"):
@pytest.mark.parametrize(
"row_factory", ["tuple_row", "dict_row", "namedtuple_row"]
)
-def test_leak(dsn, faker, fmt, fmt_out, fetch, row_factory, retries):
+def test_leak(dsn, faker, fmt, fmt_out, fetch, row_factory):
faker.format = fmt
faker.choose_schema(ncols=5)
faker.make_records(10)
for rec in cur:
pass
- for retry in retries:
- with retry:
- n = []
- gc_collect()
- for i in range(3):
- work()
- gc_collect()
- n.append(len(gc.get_objects()))
- assert (
- n[0] == n[1] == n[2]
- ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+ n = []
+ gc_collect()
+ for i in range(3):
+ work()
+ gc_collect()
+ n.append(len(gc.get_objects()))
+ assert (
+ n[0] == n[1] == n[2]
+ ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
def my_row_factory(
@pytest.mark.parametrize(
"row_factory", ["tuple_row", "dict_row", "namedtuple_row"]
)
-async def test_leak(dsn, faker, fmt, fmt_out, fetch, row_factory, retries):
+async def test_leak(dsn, faker, fmt, fmt_out, fetch, row_factory):
faker.format = fmt
faker.choose_schema(ncols=5)
faker.make_records(10)
async for rec in cur:
pass
- async for retry in retries:
- with retry:
- n = []
- gc_collect()
- for i in range(3):
- await work()
- gc_collect()
- n.append(len(gc.get_objects()))
-
- assert (
- n[0] == n[1] == n[2]
- ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+ n = []
+ gc_collect()
+ for i in range(3):
+ await work()
+ gc_collect()
+ n.append(len(gc.get_objects()))
+
+ assert (
+ n[0] == n[1] == n[2]
+ ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
@pytest.mark.parametrize("conninfo, want, env", samples_ok)
-def test_srv(conninfo, want, env, fake_srv, retries, monkeypatch):
+def test_srv(conninfo, want, env, fake_srv, monkeypatch):
if env:
for k, v in env.items():
monkeypatch.setenv(k, v)
- # retries are needed because weight order is random, although wrong order
- # is unlikely.
- for retry in retries:
- with retry:
- params = conninfo_to_dict(conninfo)
- params = psycopg._dns.resolve_srv(params) # type: ignore[attr-defined]
- assert conninfo_to_dict(want) == params
+ # Note: This test is flakey because weight order is random, although wrong
+ # order is unlikely.
+ params = conninfo_to_dict(conninfo)
+ params = psycopg._dns.resolve_srv(params) # type: ignore[attr-defined]
+ assert conninfo_to_dict(want) == params
@pytest.mark.asyncio
@pytest.mark.parametrize("conninfo, want, env", samples_ok)
-async def test_srv_async(conninfo, want, env, afake_srv, retries, monkeypatch):
+async def test_srv_async(conninfo, want, env, afake_srv, monkeypatch):
if env:
for k, v in env.items():
monkeypatch.setenv(k, v)
- async for retry in retries:
- with retry:
- params = conninfo_to_dict(conninfo)
- params = await (
- psycopg._dns.resolve_srv_async( # type: ignore[attr-defined]
- params
- )
- )
- assert conninfo_to_dict(want) == params
+ params = conninfo_to_dict(conninfo)
+ params = await (
+ psycopg._dns.resolve_srv_async(params) # type: ignore[attr-defined]
+ )
+ assert conninfo_to_dict(want) == params
samples_bad = [
cur.close()
-def test_close(conn, recwarn, retries):
- for retry in retries:
- with retry:
- if conn.info.transaction_status == conn.TransactionStatus.INTRANS:
- # connection dirty from previous failure
- conn.execute("close foo")
- recwarn.clear()
- cur = conn.cursor("foo")
- cur.execute("select generate_series(1, 10) as bar")
- cur.close()
- assert cur.closed
-
- assert not conn.execute(
- "select * from pg_cursors where name = 'foo'"
- ).fetchone()
- del cur
- assert not recwarn, [str(w.message) for w in recwarn.list]
+def test_close(conn, recwarn):
+ if conn.info.transaction_status == conn.TransactionStatus.INTRANS:
+ # connection dirty from previous failure
+ conn.execute("close foo")
+ recwarn.clear()
+ cur = conn.cursor("foo")
+ cur.execute("select generate_series(1, 10) as bar")
+ cur.close()
+ assert cur.closed
+
+ assert not conn.execute(
+ "select * from pg_cursors where name = 'foo'"
+ ).fetchone()
+ del cur
+ assert not recwarn, [str(w.message) for w in recwarn.list]
def test_close_idempotent(conn):
cur.fetchall()
-def test_close_noop(conn, recwarn, retries):
- for retry in retries:
- with retry:
- recwarn.clear()
- cur = conn.cursor("foo")
- cur.close()
- assert not recwarn, [str(w.message) for w in recwarn.list]
+def test_close_noop(conn, recwarn):
+ recwarn.clear()
+ cur = conn.cursor("foo")
+ cur.close()
+ assert not recwarn, [str(w.message) for w in recwarn.list]
def test_close_on_error(conn):
assert not cur.pgresult
-def test_context(conn, recwarn, retries):
- for retry in retries:
- with retry:
- recwarn.clear()
- with conn.cursor("foo") as cur:
- cur.execute("select generate_series(1, 10) as bar")
+def test_context(conn, recwarn):
+ recwarn.clear()
+ with conn.cursor("foo") as cur:
+ cur.execute("select generate_series(1, 10) as bar")
- assert cur.closed
- assert not conn.execute(
- "select * from pg_cursors where name = 'foo'"
- ).fetchone()
- del cur
- assert not recwarn, [str(w.message) for w in recwarn.list]
+ assert cur.closed
+ assert not conn.execute(
+ "select * from pg_cursors where name = 'foo'"
+ ).fetchone()
+ del cur
+ assert not recwarn, [str(w.message) for w in recwarn.list]
def test_close_no_clobber(conn):
cur.execute("select 1 / %s", (0,))
-def test_warn_close(conn, recwarn, retries):
- for retry in retries:
- with retry:
- recwarn.clear()
- cur = conn.cursor("foo")
- cur.execute("select generate_series(1, 10) as bar")
- del cur
- assert ".close()" in str(recwarn.pop(ResourceWarning).message)
+def test_warn_close(conn, recwarn):
+ recwarn.clear()
+ cur = conn.cursor("foo")
+ cur.execute("select generate_series(1, 10) as bar")
+ del cur
+ assert ".close()" in str(recwarn.pop(ResourceWarning).message)
def test_execute_reuse(conn):
await cur.close()
-async def test_close(aconn, recwarn, retries):
- async for retry in retries:
- with retry:
- if (
- aconn.info.transaction_status
- == aconn.TransactionStatus.INTRANS
- ):
- # connection dirty from previous failure
- await aconn.execute("close foo")
- recwarn.clear()
- cur = aconn.cursor("foo")
- await cur.execute("select generate_series(1, 10) as bar")
- await cur.close()
- assert cur.closed
-
- assert not await (
- await aconn.execute(
- "select * from pg_cursors where name = 'foo'"
- )
- ).fetchone()
- del cur
- assert not recwarn, [str(w.message) for w in recwarn.list]
+async def test_close(aconn, recwarn):
+ if aconn.info.transaction_status == aconn.TransactionStatus.INTRANS:
+ # connection dirty from previous failure
+ await aconn.execute("close foo")
+ recwarn.clear()
+ cur = aconn.cursor("foo")
+ await cur.execute("select generate_series(1, 10) as bar")
+ await cur.close()
+ assert cur.closed
+
+ assert not await (
+ await aconn.execute("select * from pg_cursors where name = 'foo'")
+ ).fetchone()
+ del cur
+ assert not recwarn, [str(w.message) for w in recwarn.list]
async def test_close_idempotent(aconn):
await cur.fetchall()
-async def test_close_noop(aconn, recwarn, retries):
- async for retry in retries:
- with retry:
- recwarn.clear()
- cur = aconn.cursor("foo")
- await cur.close()
- assert not recwarn, [str(w.message) for w in recwarn.list]
+async def test_close_noop(aconn, recwarn):
+ recwarn.clear()
+ cur = aconn.cursor("foo")
+ await cur.close()
+ assert not recwarn, [str(w.message) for w in recwarn.list]
async def test_close_on_error(aconn):
assert not cur.pgresult
-async def test_context(aconn, recwarn, retries):
- async for retry in retries:
- with retry:
- recwarn.clear()
- async with aconn.cursor("foo") as cur:
- await cur.execute("select generate_series(1, 10) as bar")
+async def test_context(aconn, recwarn):
+ recwarn.clear()
+ async with aconn.cursor("foo") as cur:
+ await cur.execute("select generate_series(1, 10) as bar")
- assert cur.closed
- assert not await (
- await aconn.execute(
- "select * from pg_cursors where name = 'foo'"
- )
- ).fetchone()
- del cur
- assert not recwarn, [str(w.message) for w in recwarn.list]
+ assert cur.closed
+ assert not await (
+ await aconn.execute("select * from pg_cursors where name = 'foo'")
+ ).fetchone()
+ del cur
+ assert not recwarn, [str(w.message) for w in recwarn.list]
async def test_close_no_clobber(aconn):
await cur.execute("select 1 / %s", (0,))
-async def test_warn_close(aconn, recwarn, retries):
- async for retry in retries:
- with retry:
- recwarn.clear()
- cur = aconn.cursor("foo")
- await cur.execute("select generate_series(1, 10) as bar")
- del cur
- assert ".close()" in str(recwarn.pop(ResourceWarning).message)
+async def test_warn_close(aconn, recwarn):
+ recwarn.clear()
+ cur = aconn.cursor("foo")
+ await cur.execute("select generate_series(1, 10) as bar")
+ del cur
+ assert ".close()" in str(recwarn.pop(ResourceWarning).message)
async def test_execute_reuse(aconn):
@pytest.mark.parametrize("timeout", timeouts)
-def test_wait_conn(dsn, timeout, retries):
- for retry in retries:
- with retry:
- gen = generators.connect(dsn)
- conn = waiting.wait_conn(gen, **timeout)
- assert conn.status == ConnStatus.OK
+def test_wait_conn(dsn, timeout):
+ gen = generators.connect(dsn)
+ conn = waiting.wait_conn(gen, **timeout)
+ assert conn.status == ConnStatus.OK
def test_wait_conn_bad(dsn):