pytestmark = [pytest.mark.anyio]
try:
- from psycopg_pool import AsyncNullConnectionPool # noqa: F401
- from psycopg_pool import PoolClosed, PoolTimeout, TooManyRequests
+ import psycopg_pool as pool
except ImportError:
pass
-async def test_defaults(dsn):
- async with AsyncNullConnectionPool(dsn) as p:
+async def test_default_sizes(dsn):
+ async with pool.AsyncNullConnectionPool(dsn) as p:
assert p.min_size == p.max_size == 0
- assert p.timeout == 30
- assert p.max_idle == 10 * 60
- assert p.max_lifetime == 60 * 60
- assert p.num_workers == 3
async def test_min_size_max_size(dsn):
- async with AsyncNullConnectionPool(dsn, min_size=0, max_size=2) as p:
+ async with pool.AsyncNullConnectionPool(dsn, min_size=0, max_size=2) as p:
assert p.min_size == 0
assert p.max_size == 2
@pytest.mark.parametrize("min_size, max_size", [(1, None), (-1, None), (0, -2)])
async def test_bad_size(dsn, min_size, max_size):
with pytest.raises(ValueError):
- AsyncNullConnectionPool(min_size=min_size, max_size=max_size)
-
-
-async def test_connection_class(dsn):
- class MyConn(psycopg.AsyncConnection[Any]):
- pass
-
- async with AsyncNullConnectionPool(dsn, connection_class=MyConn) as p:
- async with p.connection() as conn:
- assert isinstance(conn, MyConn)
-
-
-async def test_kwargs(dsn):
- async with AsyncNullConnectionPool(dsn, kwargs={"autocommit": True}) as p:
- async with p.connection() as conn:
- assert conn.autocommit
+ pool.AsyncNullConnectionPool(min_size=min_size, max_size=max_size)
class MyRow(Dict[str, Any]):
class MyConnection(psycopg.AsyncConnection[Row]):
pass
- async with AsyncNullConnectionPool(
+ async with pool.AsyncNullConnectionPool(
dsn,
connection_class=MyConnection[MyRow],
kwargs={"row_factory": class_row(MyRow)},
async with p1.connection() as conn1:
cur1 = await conn1.execute("select 1 as x")
(row1,) = await cur1.fetchall()
- assert_type(p1, AsyncNullConnectionPool[MyConnection[MyRow]])
+ assert_type(p1, pool.AsyncNullConnectionPool[MyConnection[MyRow]])
assert_type(conn1, MyConnection[MyRow])
assert_type(row1, MyRow)
assert conn1.autocommit
assert row1 == {"x": 1}
- async with AsyncNullConnectionPool(
+ async with pool.AsyncNullConnectionPool(
dsn, connection_class=MyConnection[TupleRow]
) as p2:
async with p2.connection() as conn2:
cur2 = await conn2.execute("select 2 as y")
(row2,) = await cur2.fetchall()
- assert_type(p2, AsyncNullConnectionPool[MyConnection[TupleRow]])
+ assert_type(p2, pool.AsyncNullConnectionPool[MyConnection[TupleRow]])
assert_type(conn2, MyConnection[TupleRow])
assert_type(row2, TupleRow)
assert row2 == (2,)
kwargs["row_factory"] = class_row(MyRow)
super().__init__(*args, **kwargs)
- async with AsyncNullConnectionPool(
+ async with pool.AsyncNullConnectionPool(
dsn, connection_class=MyConnection, configure=set_autocommit
) as p1:
async with p1.connection() as conn1:
(row1,) = await (await conn1.execute("select 1 as x")).fetchall()
- assert_type(p1, AsyncNullConnectionPool[MyConnection])
+ assert_type(p1, pool.AsyncNullConnectionPool[MyConnection])
assert_type(conn1, MyConnection)
assert_type(row1, MyRow)
assert conn1.autocommit
@pytest.mark.crdb_skip("backend pid")
async def test_its_no_pool_at_all(dsn):
- async with AsyncNullConnectionPool(dsn, max_size=2) as p:
+ async with pool.AsyncNullConnectionPool(dsn, max_size=2) as p:
async with p.connection() as conn:
pid1 = conn.info.backend_pid
assert conn.info.backend_pid not in (pid1, pid2)
-async def test_context(dsn):
- async with AsyncNullConnectionPool(dsn) as p:
- assert not p.closed
- assert p.closed
-
-
@pytest.mark.slow
@pytest.mark.timing
async def test_wait_ready(dsn, monkeypatch):
delay_connection(monkeypatch, 0.2)
- with pytest.raises(PoolTimeout):
- async with AsyncNullConnectionPool(dsn, num_workers=1) as p:
+ with pytest.raises(pool.PoolTimeout):
+ async with pool.AsyncNullConnectionPool(dsn, num_workers=1) as p:
await p.wait(0.1)
- async with AsyncNullConnectionPool(dsn, num_workers=1) as p:
+ async with pool.AsyncNullConnectionPool(dsn, num_workers=1) as p:
await p.wait(0.4)
-async def test_wait_closed(dsn):
- async with AsyncNullConnectionPool(dsn) as p:
- pass
-
- with pytest.raises(PoolClosed):
- await p.wait()
-
-
-@pytest.mark.slow
-async def test_setup_no_timeout(dsn, proxy):
- with pytest.raises(PoolTimeout):
- async with AsyncNullConnectionPool(proxy.client_dsn, num_workers=1) as p:
- await p.wait(0.2)
-
- async with AsyncNullConnectionPool(proxy.client_dsn, num_workers=1) as p:
- await asyncio.sleep(0.5)
- assert not p._pool
- proxy.start()
-
- async with p.connection() as conn:
- await conn.execute("select 1")
-
-
async def test_configure(dsn):
inits = 0
async with conn.transaction():
await conn.execute("set default_transaction_read_only to on")
- async with AsyncNullConnectionPool(dsn, configure=configure) as p:
+ async with pool.AsyncNullConnectionPool(dsn, configure=configure) as p:
async with p.connection() as conn:
assert inits == 1
res = await conn.execute("show default_transaction_read_only")
assert (await res.fetchone())[0] == "on" # type: ignore[index]
-@pytest.mark.slow
-async def test_configure_badstate(dsn, caplog):
- caplog.set_level(logging.WARNING, logger="psycopg.pool")
-
- async def configure(conn):
- await conn.execute("select 1")
-
- async with AsyncNullConnectionPool(dsn, configure=configure) as p:
- with pytest.raises(PoolTimeout):
- await p.wait(timeout=0.5)
-
- assert caplog.records
- assert "INTRANS" in caplog.records[0].message
-
-
-@pytest.mark.slow
-async def test_configure_broken(dsn, caplog):
- caplog.set_level(logging.WARNING, logger="psycopg.pool")
-
- async def configure(conn):
- async with conn.transaction():
- await conn.execute("WAT")
-
- async with AsyncNullConnectionPool(dsn, configure=configure) as p:
- with pytest.raises(PoolTimeout):
- await p.wait(timeout=0.5)
-
- assert caplog.records
- assert "WAT" in caplog.records[0].message
-
-
@pytest.mark.crdb_skip("backend pid")
async def test_reset(dsn):
resets = 0
assert (await cur.fetchone()) == ("UTC",)
pids.append(conn.info.backend_pid)
- async with AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p:
+ async with pool.AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p:
async with p.connection() as conn:
# Queue the worker so it will take the same connection a second time
# instead of making a new one.
await conn.execute("select 1")
pids.append(conn.info.backend_pid)
- async with AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p:
+ async with pool.AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p:
async with p.connection() as conn:
t = create_task(worker())
await ensure_waiting(p)
await conn.execute("select 1")
pids.append(conn.info.backend_pid)
- async with AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p:
+ async with pool.AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p:
async with p.connection() as conn:
t = create_task(worker())
await ensure_waiting(p)
@pytest.mark.slow
@pytest.mark.skipif("ver(psycopg.__version__) < ver('3.0.8')")
async def test_no_queue_timeout(deaf_port):
- async with AsyncNullConnectionPool(
+ async with pool.AsyncNullConnectionPool(
kwargs={"host": "localhost", "port": deaf_port}
) as p:
- with pytest.raises(PoolTimeout):
+ with pytest.raises(pool.PoolTimeout):
async with p.connection(timeout=1):
pass
-@pytest.mark.slow
-@pytest.mark.timing
-@pytest.mark.crdb_skip("backend pid")
-async def test_queue(dsn):
- async def worker(n):
- t0 = time()
- async with p.connection() as conn:
- await conn.execute("select pg_sleep(0.2)")
- pid = conn.info.backend_pid
- t1 = time()
- results.append((n, t1 - t0, pid))
-
- results: List[Tuple[int, float, int]] = []
- async with AsyncNullConnectionPool(dsn, max_size=2) as p:
- await p.wait()
- ts = [create_task(worker(i)) for i in range(6)]
- await asyncio.gather(*ts)
-
- times = [item[1] for item in results]
- want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
- for got, want in zip(times, want_times):
- assert got == pytest.approx(want, 0.2), times
-
- assert len(set(r[2] for r in results)) == 2, results
-
-
-@pytest.mark.slow
-async def test_queue_size(dsn):
- async def worker(t, ev=None):
- try:
- async with p.connection():
- if ev:
- ev.set()
- await asyncio.sleep(t)
- except TooManyRequests as e:
- errors.append(e)
- else:
- success.append(True)
-
- errors: List[Exception] = []
- success: List[bool] = []
-
- async with AsyncNullConnectionPool(dsn, max_size=1, max_waiting=3) as p:
- await p.wait()
- ev = asyncio.Event()
- create_task(worker(0.3, ev))
- await ev.wait()
-
- ts = [create_task(worker(0.1)) for i in range(4)]
- await asyncio.gather(*ts)
-
- assert len(success) == 4
- assert len(errors) == 1
- assert isinstance(errors[0], TooManyRequests)
- assert p.name in str(errors[0])
- assert str(p.max_waiting) in str(errors[0])
- assert p.get_stats()["requests_errors"] == 1
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-@pytest.mark.crdb_skip("backend pid")
-async def test_queue_timeout(dsn):
- async def worker(n):
- t0 = time()
- try:
- async with p.connection() as conn:
- await conn.execute("select pg_sleep(0.2)")
- pid = conn.info.backend_pid
- except PoolTimeout as e:
- t1 = time()
- errors.append((n, t1 - t0, e))
- else:
- t1 = time()
- results.append((n, t1 - t0, pid))
-
- results: List[Tuple[int, float, int]] = []
- errors: List[Tuple[int, float, Exception]] = []
-
- async with AsyncNullConnectionPool(dsn, max_size=2, timeout=0.1) as p:
- ts = [create_task(worker(i)) for i in range(4)]
- await asyncio.gather(*ts)
-
- assert len(results) == 2
- assert len(errors) == 2
- for e in errors:
- assert 0.1 < e[1] < 0.15
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-async def test_dead_client(dsn):
- async def worker(i, timeout):
- try:
- async with p.connection(timeout=timeout) as conn:
- await conn.execute("select pg_sleep(0.3)")
- results.append(i)
- except PoolTimeout:
- if timeout > 0.2:
- raise
-
- async with AsyncNullConnectionPool(dsn, max_size=2) as p:
- results: List[int] = []
- ts = [
- create_task(worker(i, timeout))
- for i, timeout in enumerate([0.4, 0.4, 0.1, 0.4, 0.4])
- ]
- await asyncio.gather(*ts)
-
- await asyncio.sleep(0.2)
- assert set(results) == set([0, 1, 3, 4])
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-@pytest.mark.crdb_skip("backend pid")
-async def test_queue_timeout_override(dsn):
- async def worker(n):
- t0 = time()
- timeout = 0.25 if n == 3 else None
- try:
- async with p.connection(timeout=timeout) as conn:
- await conn.execute("select pg_sleep(0.2)")
- pid = conn.info.backend_pid
- except PoolTimeout as e:
- t1 = time()
- errors.append((n, t1 - t0, e))
- else:
- t1 = time()
- results.append((n, t1 - t0, pid))
-
- results: List[Tuple[int, float, int]] = []
- errors: List[Tuple[int, float, Exception]] = []
-
- async with AsyncNullConnectionPool(dsn, max_size=2, timeout=0.1) as p:
- ts = [create_task(worker(i)) for i in range(4)]
- await asyncio.gather(*ts)
-
- assert len(results) == 3
- assert len(errors) == 1
- for e in errors:
- assert 0.1 < e[1] < 0.15
-
-
-@pytest.mark.crdb_skip("backend pid")
-async def test_broken_reconnect(dsn):
- async with AsyncNullConnectionPool(dsn, max_size=1) as p:
- async with p.connection() as conn:
- pid1 = conn.info.backend_pid
- await conn.close()
-
- async with p.connection() as conn2:
- pid2 = conn2.info.backend_pid
-
- assert pid1 != pid2
-
-
@pytest.mark.crdb_skip("backend pid")
async def test_intrans_rollback(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
)
assert not await cur.fetchone()
- async with AsyncNullConnectionPool(dsn, max_size=1) as p:
+ async with pool.AsyncNullConnectionPool(dsn, max_size=1) as p:
conn = await p.getconn()
# Queue the worker so it will take the connection a second time instead
pids.append(conn.info.backend_pid)
assert conn.info.transaction_status == TransactionStatus.IDLE
- async with AsyncNullConnectionPool(dsn, max_size=1) as p:
+ async with pool.AsyncNullConnectionPool(dsn, max_size=1) as p:
conn = await p.getconn()
t = create_task(worker())
pids.append(conn.info.backend_pid)
assert conn.info.transaction_status == TransactionStatus.IDLE
- async with AsyncNullConnectionPool(dsn, max_size=1) as p:
+ async with pool.AsyncNullConnectionPool(dsn, max_size=1) as p:
conn = await p.getconn()
t = create_task(worker())
pids.append(conn.info.backend_pid)
assert conn.info.transaction_status == TransactionStatus.IDLE
- async with AsyncNullConnectionPool(dsn, max_size=1) as p:
+ async with pool.AsyncNullConnectionPool(dsn, max_size=1) as p:
conn = await p.getconn()
t = create_task(worker())
await ensure_waiting(p)
assert "BAD" in caplog.records[2].message
-async def test_close_no_tasks(dsn):
- p = AsyncNullConnectionPool(dsn)
- assert p._sched_runner and not p._sched_runner.done()
- assert p._workers
- workers = p._workers[:]
- for t in workers:
- assert not t.done()
-
- await p.close()
- assert p._sched_runner is None
- assert not p._workers
- for t in workers:
- assert t.done()
-
-
-async def test_putconn_no_pool(aconn_cls, dsn):
- async with AsyncNullConnectionPool(dsn) as p:
- conn = await aconn_cls.connect(dsn)
- with pytest.raises(ValueError):
- await p.putconn(conn)
-
- await conn.close()
-
-
-async def test_putconn_wrong_pool(dsn):
- async with AsyncNullConnectionPool(dsn) as p1:
- async with AsyncNullConnectionPool(dsn) as p2:
- conn = await p1.getconn()
- with pytest.raises(ValueError):
- await p2.putconn(conn)
-
-
-async def test_closed_getconn(dsn):
- p = AsyncNullConnectionPool(dsn)
- assert not p.closed
- async with p.connection():
- pass
-
- await p.close()
- assert p.closed
-
- with pytest.raises(PoolClosed):
- async with p.connection():
- pass
-
-
async def test_closed_putconn(dsn):
- p = AsyncNullConnectionPool(dsn)
-
- async with p.connection() as conn:
- pass
- assert conn.closed
-
- async with p.connection() as conn:
- await p.close()
- assert conn.closed
-
-
-async def test_closed_queue(dsn):
- async def w1():
+ async with pool.AsyncNullConnectionPool(dsn) as p:
async with p.connection() as conn:
- e1.set() # Tell w0 that w1 got a connection
- cur = await conn.execute("select 1")
- assert await cur.fetchone() == (1,)
- await e2.wait() # Wait until w0 has tested w2
- success.append("w1")
-
- async def w2():
- try:
- async with p.connection():
- pass # unexpected
- except PoolClosed:
- success.append("w2")
-
- e1 = asyncio.Event()
- e2 = asyncio.Event()
-
- p = AsyncNullConnectionPool(dsn, max_size=1)
- await p.wait()
- success: List[str] = []
-
- t1 = create_task(w1())
- # Wait until w1 has received a connection
- await e1.wait()
-
- t2 = create_task(w2())
- # Wait until w2 is in the queue
- await ensure_waiting(p)
- await p.close()
-
- # Wait for the workers to finish
- e2.set()
- await asyncio.gather(t1, t2)
- assert len(success) == 2
-
-
-async def test_open_explicit(dsn):
- p = AsyncNullConnectionPool(dsn, open=False)
- assert p.closed
- with pytest.raises(PoolClosed):
- await p.getconn()
-
- with pytest.raises(PoolClosed, match="is not open yet"):
- async with p.connection():
pass
-
- await p.open()
- try:
- assert not p.closed
-
- async with p.connection() as conn:
- cur = await conn.execute("select 1")
- assert await cur.fetchone() == (1,)
-
- finally:
- await p.close()
-
- with pytest.raises(PoolClosed, match="is already closed"):
- await p.getconn()
-
-
-async def test_open_context(dsn):
- p = AsyncNullConnectionPool(dsn, open=False)
- assert p.closed
-
- async with p:
- assert not p.closed
-
- async with p.connection() as conn:
- cur = await conn.execute("select 1")
- assert await cur.fetchone() == (1,)
-
- assert p.closed
-
-
-async def test_open_no_op(dsn):
- p = AsyncNullConnectionPool(dsn)
- try:
- assert not p.closed
- await p.open()
- assert not p.closed
-
- async with p.connection() as conn:
- cur = await conn.execute("select 1")
- assert await cur.fetchone() == (1,)
-
- finally:
- await p.close()
-
-
-async def test_reopen(dsn):
- p = AsyncNullConnectionPool(dsn)
- async with p.connection() as conn:
- await conn.execute("select 1")
- await p.close()
- assert p._sched_runner is None
-
- with pytest.raises(psycopg.OperationalError, match="cannot be reused"):
- await p.open()
+ assert conn.closed
@pytest.mark.parametrize("min_size, max_size", [(1, None), (-1, None), (0, -2)])
async def test_bad_resize(dsn, min_size, max_size):
- async with AsyncNullConnectionPool() as p:
+ async with pool.AsyncNullConnectionPool() as p:
with pytest.raises(ValueError):
await p.resize(min_size=min_size, max_size=max_size)
pids.append(conn.info.backend_pid)
await asyncio.sleep(0.1)
- async with AsyncNullConnectionPool(dsn, max_size=1, max_lifetime=0.2) as p:
+ async with pool.AsyncNullConnectionPool(dsn, max_size=1, max_lifetime=0.2) as p:
ts = [create_task(worker()) for i in range(5)]
await asyncio.gather(*ts)
async def test_check(dsn):
# no.op
- async with AsyncNullConnectionPool(dsn) as p:
+ async with pool.AsyncNullConnectionPool(dsn) as p:
await p.check()
-@pytest.mark.slow
-@pytest.mark.timing
-async def test_stats_measures(dsn):
- async def worker(n):
- async with p.connection() as conn:
- await conn.execute("select pg_sleep(0.2)")
-
- async with AsyncNullConnectionPool(dsn, max_size=4) as p:
- await p.wait(2.0)
-
- stats = p.get_stats()
- assert stats["pool_min"] == 0
- assert stats["pool_max"] == 4
- assert stats["pool_size"] == 0
- assert stats["pool_available"] == 0
- assert stats["requests_waiting"] == 0
-
- ts = [create_task(worker(i)) for i in range(3)]
- await asyncio.sleep(0.1)
- stats = p.get_stats()
- await asyncio.gather(*ts)
- assert stats["pool_min"] == 0
- assert stats["pool_max"] == 4
- assert stats["pool_size"] == 3
- assert stats["pool_available"] == 0
- assert stats["requests_waiting"] == 0
-
- await p.wait(2.0)
- ts = [create_task(worker(i)) for i in range(7)]
- await asyncio.sleep(0.1)
- stats = p.get_stats()
- await asyncio.gather(*ts)
- assert stats["pool_min"] == 0
- assert stats["pool_max"] == 4
- assert stats["pool_size"] == 4
- assert stats["pool_available"] == 0
- assert stats["requests_waiting"] == 3
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-async def test_stats_usage(dsn):
- async def worker(n):
- try:
- async with p.connection(timeout=0.3) as conn:
- await conn.execute("select pg_sleep(0.2)")
- except PoolTimeout:
- pass
-
- async with AsyncNullConnectionPool(dsn, max_size=3) as p:
- await p.wait(2.0)
-
- ts = [create_task(worker(i)) for i in range(7)]
- await asyncio.gather(*ts)
- stats = p.get_stats()
- assert stats["requests_num"] == 7
- assert stats["requests_queued"] == 4
- assert 850 <= stats["requests_wait_ms"] <= 950
- assert stats["requests_errors"] == 1
- assert 1150 <= stats["usage_ms"] <= 1250
- assert stats.get("returns_bad", 0) == 0
-
- async with p.connection() as conn:
- await conn.close()
- await p.wait()
- stats = p.pop_stats()
- assert stats["requests_num"] == 8
- assert stats["returns_bad"] == 1
- async with p.connection():
- pass
- assert p.get_stats()["requests_num"] == 1
-
-
@pytest.mark.slow
async def test_stats_connect(dsn, proxy, monkeypatch):
proxy.start()
delay_connection(monkeypatch, 0.2)
- async with AsyncNullConnectionPool(proxy.client_dsn, max_size=3) as p:
+ async with pool.AsyncNullConnectionPool(proxy.client_dsn, max_size=3) as p:
await p.wait()
stats = p.get_stats()
assert stats["connections_num"] == 1
nconns = 3
- async with AsyncNullConnectionPool(
+ async with pool.AsyncNullConnectionPool(
dsn, min_size=0, max_size=nconns, timeout=1
) as p:
await p.wait()
from psycopg.rows import class_row, Row, TupleRow
from psycopg._compat import assert_type, Counter
-from ..utils import Event, spawn, gather, sleep, is_alive, is_async
+from ..utils import Event, spawn, gather, sleep, is_async
try:
import psycopg_pool as pool
pass
-def test_defaults(dsn):
+def test_default_sizes(dsn):
with pool.ConnectionPool(dsn) as p:
assert p.min_size == p.max_size == 4
- assert p.timeout == 30
- assert p.max_idle == 10 * 60
- assert p.max_lifetime == 60 * 60
- assert p.num_workers == 3
@pytest.mark.parametrize("min_size, max_size", [(2, None), (0, 2), (2, 4)])
pool.ConnectionPool(min_size=min_size, max_size=max_size)
-def test_connection_class(dsn):
- class MyConn(psycopg.Connection[Any]):
- pass
-
- with pool.ConnectionPool(dsn, connection_class=MyConn, min_size=1) as p:
- with p.connection() as conn:
- assert isinstance(conn, MyConn)
-
-
-def test_kwargs(dsn):
- with pool.ConnectionPool(dsn, kwargs={"autocommit": True}, min_size=1) as p:
- with p.connection() as conn:
- assert conn.autocommit
-
-
class MyRow(Dict[str, Any]):
...
assert conn.info.backend_pid in (pid1, pid2)
-def test_context(dsn):
- with pool.ConnectionPool(dsn, min_size=1) as p:
- assert not p.closed
- assert p.closed
-
-
@pytest.mark.crdb_skip("backend pid")
def test_connection_not_lost(dsn):
with pool.ConnectionPool(dsn, min_size=1) as p:
p.wait(0.0001) # idempotent
-def test_wait_closed(dsn):
- with pool.ConnectionPool(dsn) as p:
- pass
-
- with pytest.raises(pool.PoolClosed):
- p.wait()
-
-
-@pytest.mark.slow
-def test_setup_no_timeout(dsn, proxy):
- with pytest.raises(pool.PoolTimeout):
- with pool.ConnectionPool(proxy.client_dsn, min_size=1, num_workers=1) as p:
- p.wait(0.2)
-
- with pool.ConnectionPool(proxy.client_dsn, min_size=1, num_workers=1) as p:
- sleep(0.5)
- assert not p._pool
- proxy.start()
-
- with p.connection() as conn:
- conn.execute("select 1")
-
-
def test_configure(dsn):
inits = 0
assert res.fetchone()[0] == "on" # type: ignore[index]
-@pytest.mark.slow
-def test_configure_badstate(dsn, caplog):
- caplog.set_level(logging.WARNING, logger="psycopg.pool")
-
- def configure(conn):
- conn.execute("select 1")
-
- with pool.ConnectionPool(dsn, min_size=1, configure=configure) as p:
- with pytest.raises(pool.PoolTimeout):
- p.wait(timeout=0.5)
-
- assert caplog.records
- assert "INTRANS" in caplog.records[0].message
-
-
-@pytest.mark.slow
-def test_configure_broken(dsn, caplog):
- caplog.set_level(logging.WARNING, logger="psycopg.pool")
-
- def configure(conn):
- with conn.transaction():
- conn.execute("WAT")
-
- with pool.ConnectionPool(dsn, min_size=1, configure=configure) as p:
- with pytest.raises(pool.PoolTimeout):
- p.wait(timeout=0.5)
-
- assert caplog.records
- assert "WAT" in caplog.records[0].message
-
-
def test_reset(dsn):
resets = 0
assert "WAT" in caplog.records[0].message
-@pytest.mark.slow
-@pytest.mark.timing
-@pytest.mark.crdb_skip("backend pid")
-def test_queue(dsn):
- def worker(n):
- t0 = time()
- with p.connection() as conn:
- conn.execute("select pg_sleep(0.2)")
- pid = conn.info.backend_pid
- t1 = time()
- results.append((n, t1 - t0, pid))
-
- results: List[Tuple[int, float, int]] = []
- with pool.ConnectionPool(dsn, min_size=2) as p:
- p.wait()
- ts = [spawn(worker, args=(i,)) for i in range(6)]
- gather(*ts)
-
- times = [item[1] for item in results]
- want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
- for got, want in zip(times, want_times):
- assert got == pytest.approx(want, 0.1), times
-
- assert len(set((r[2] for r in results))) == 2, results
-
-
-@pytest.mark.slow
-def test_queue_size(dsn):
- def worker(t, ev=None):
- try:
- with p.connection():
- if ev:
- ev.set()
- sleep(t)
- except pool.TooManyRequests as e:
- errors.append(e)
- else:
- success.append(True)
-
- errors: List[Exception] = []
- success: List[bool] = []
-
- with pool.ConnectionPool(dsn, min_size=1, max_waiting=3) as p:
- p.wait()
- ev = Event()
- spawn(worker, args=(0.3, ev))
- ev.wait()
-
- ts = [spawn(worker, args=(0.1,)) for i in range(4)]
- gather(*ts)
-
- assert len(success) == 4
- assert len(errors) == 1
- assert isinstance(errors[0], pool.TooManyRequests)
- assert p.name in str(errors[0])
- assert str(p.max_waiting) in str(errors[0])
- assert p.get_stats()["requests_errors"] == 1
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-@pytest.mark.crdb_skip("backend pid")
-def test_queue_timeout(dsn):
- def worker(n):
- t0 = time()
- try:
- with p.connection() as conn:
- conn.execute("select pg_sleep(0.2)")
- pid = conn.info.backend_pid
- except pool.PoolTimeout as e:
- t1 = time()
- errors.append((n, t1 - t0, e))
- else:
- t1 = time()
- results.append((n, t1 - t0, pid))
-
- results: List[Tuple[int, float, int]] = []
- errors: List[Tuple[int, float, Exception]] = []
-
- with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p:
- ts = [spawn(worker, args=(i,)) for i in range(4)]
- gather(*ts)
-
- assert len(results) == 2
- assert len(errors) == 2
- for e in errors:
- assert 0.1 < e[1] < 0.15
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-def test_dead_client(dsn):
- def worker(i, timeout):
- try:
- with p.connection(timeout=timeout) as conn:
- conn.execute("select pg_sleep(0.3)")
- results.append(i)
- except pool.PoolTimeout:
- if timeout > 0.2:
- raise
-
- with pool.ConnectionPool(dsn, min_size=2) as p:
- results: List[int] = []
- ts = [
- spawn(worker, args=(i, timeout))
- for (i, timeout) in enumerate([0.4, 0.4, 0.1, 0.4, 0.4])
- ]
- gather(*ts)
-
- sleep(0.2)
- assert set(results) == set([0, 1, 3, 4])
- assert len(p._pool) == 2 # no connection was lost
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-@pytest.mark.crdb_skip("backend pid")
-def test_queue_timeout_override(dsn):
- def worker(n):
- t0 = time()
- timeout = 0.25 if n == 3 else None
- try:
- with p.connection(timeout=timeout) as conn:
- conn.execute("select pg_sleep(0.2)")
- pid = conn.info.backend_pid
- except pool.PoolTimeout as e:
- t1 = time()
- errors.append((n, t1 - t0, e))
- else:
- t1 = time()
- results.append((n, t1 - t0, pid))
-
- results: List[Tuple[int, float, int]] = []
- errors: List[Tuple[int, float, Exception]] = []
-
- with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p:
- ts = [spawn(worker, args=(i,)) for i in range(4)]
- gather(*ts)
-
- assert len(results) == 3
- assert len(errors) == 1
- for e in errors:
- assert 0.1 < e[1] < 0.15
-
-
-@pytest.mark.crdb_skip("backend pid")
-def test_broken_reconnect(dsn):
- with pool.ConnectionPool(dsn, min_size=1) as p:
- with p.connection() as conn:
- pid1 = conn.info.backend_pid
- conn.close()
-
- with p.connection() as conn2:
- pid2 = conn2.info.backend_pid
-
- assert pid1 != pid2
-
-
@pytest.mark.crdb_skip("backend pid")
def test_intrans_rollback(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
assert "BAD" in caplog.records[2].message
-def test_close_no_tasks(dsn):
- p = pool.ConnectionPool(dsn)
- assert p._sched_runner and is_alive(p._sched_runner)
- workers = p._workers[:]
- assert workers
- for t in workers:
- assert is_alive(t)
-
- p.close()
- assert p._sched_runner is None
- assert not p._workers
- for t in workers:
- assert not is_alive(t)
-
-
-def test_putconn_no_pool(conn_cls, dsn):
- with pool.ConnectionPool(dsn, min_size=1) as p:
- conn = conn_cls.connect(dsn)
- with pytest.raises(ValueError):
- p.putconn(conn)
-
- conn.close()
-
-
-def test_putconn_wrong_pool(dsn):
- with pool.ConnectionPool(dsn, min_size=1) as p1:
- with pool.ConnectionPool(dsn, min_size=1) as p2:
- conn = p1.getconn()
- with pytest.raises(ValueError):
- p2.putconn(conn)
-
-
def test_del_no_warning(dsn, recwarn):
p = pool.ConnectionPool(dsn, min_size=2)
with p.connection() as conn:
assert not recwarn, [str(w.message) for w in recwarn.list]
-@pytest.mark.slow
-@pytest.mark.skipif(is_async(__name__), reason="sync test only")
-def test_del_stops_threads(dsn):
- p = pool.ConnectionPool(dsn)
- assert p._sched_runner is not None
- ts = [p._sched_runner] + p._workers
- del p
- sleep(0.1)
- for t in ts:
- assert not is_alive(t), t
-
-
-def test_closed_getconn(dsn):
- p = pool.ConnectionPool(dsn, min_size=1)
- assert not p.closed
- with p.connection():
- pass
-
- p.close()
- assert p.closed
-
- with pytest.raises(pool.PoolClosed):
- with p.connection():
- pass
-
-
def test_closed_putconn(dsn):
- p = pool.ConnectionPool(dsn, min_size=1)
-
- with p.connection() as conn:
- pass
- assert not conn.closed
-
- with p.connection() as conn:
- p.close()
- assert conn.closed
-
-
-def test_closed_queue(dsn):
- def w1():
+ with pool.ConnectionPool(dsn, min_size=1) as p:
with p.connection() as conn:
- e1.set() # Tell w0 that w1 got a connection
- cur = conn.execute("select 1")
- assert cur.fetchone() == (1,)
- e2.wait() # Wait until w0 has tested w2
- success.append("w1")
-
- def w2():
- try:
- with p.connection():
- pass # unexpected
- except pool.PoolClosed:
- success.append("w2")
-
- e1 = Event()
- e2 = Event()
-
- p = pool.ConnectionPool(dsn, min_size=1)
- p.wait()
- success: List[str] = []
-
- t1 = spawn(w1)
- # Wait until w1 has received a connection
- e1.wait()
-
- t2 = spawn(w2)
- # Wait until w2 is in the queue
- ensure_waiting(p)
- p.close()
-
- # Wait for the workers to finish
- e2.set()
- gather(t1, t2)
- assert len(success) == 2
-
-
-def test_open_explicit(dsn):
- p = pool.ConnectionPool(dsn, open=False)
- assert p.closed
- with pytest.raises(pool.PoolClosed, match="is not open yet"):
- p.getconn()
-
- with pytest.raises(pool.PoolClosed, match="is not open yet"):
- with p.connection():
pass
-
- p.open()
- try:
- assert not p.closed
-
- with p.connection() as conn:
- cur = conn.execute("select 1")
- assert cur.fetchone() == (1,)
- finally:
- p.close()
-
- with pytest.raises(pool.PoolClosed, match="is already closed"):
- p.getconn()
-
-
-def test_open_context(dsn):
- p = pool.ConnectionPool(dsn, open=False)
- assert p.closed
-
- with p:
- assert not p.closed
-
- with p.connection() as conn:
- cur = conn.execute("select 1")
- assert cur.fetchone() == (1,)
-
- assert p.closed
-
-
-def test_open_no_op(dsn):
- p = pool.ConnectionPool(dsn)
- try:
- assert not p.closed
- p.open()
- assert not p.closed
-
- with p.connection() as conn:
- cur = conn.execute("select 1")
- assert cur.fetchone() == (1,)
- finally:
- p.close()
+ assert not conn.closed
@pytest.mark.slow
p.open(wait=True, timeout=0.5)
-def test_reopen(dsn):
- p = pool.ConnectionPool(dsn)
- with p.connection() as conn:
- conn.execute("select 1")
- p.close()
- assert p._sched_runner is None
- assert not p._workers
-
- with pytest.raises(psycopg.OperationalError, match="cannot be reused"):
- p.open()
-
-
@pytest.mark.slow
@pytest.mark.timing
@pytest.mark.parametrize(
p.resize(min_size=min_size, max_size=max_size)
-def test_jitter():
- rnds = [pool.ConnectionPool._jitter(30, -0.1, +0.2) for i in range(100)]
- assert 27 <= min(rnds) <= 28
- assert 35 < max(rnds) < 36
-
-
@pytest.mark.slow
@pytest.mark.timing
@pytest.mark.crdb_skip("backend pid")
assert conn.info.backend_pid != pid
-@pytest.mark.slow
-@pytest.mark.timing
-def test_stats_measures(dsn):
- def worker(n):
- with p.connection() as conn:
- conn.execute("select pg_sleep(0.2)")
-
- with pool.ConnectionPool(dsn, min_size=2, max_size=4) as p:
- p.wait(2.0)
-
- stats = p.get_stats()
- assert stats["pool_min"] == 2
- assert stats["pool_max"] == 4
- assert stats["pool_size"] == 2
- assert stats["pool_available"] == 2
- assert stats["requests_waiting"] == 0
-
- ts = [spawn(worker, args=(i,)) for i in range(3)]
- sleep(0.1)
- stats = p.get_stats()
- gather(*ts)
- assert stats["pool_min"] == 2
- assert stats["pool_max"] == 4
- assert stats["pool_size"] == 3
- assert stats["pool_available"] == 0
- assert stats["requests_waiting"] == 0
-
- p.wait(2.0)
- ts = [spawn(worker, args=(i,)) for i in range(7)]
- sleep(0.1)
- stats = p.get_stats()
- gather(*ts)
- assert stats["pool_min"] == 2
- assert stats["pool_max"] == 4
- assert stats["pool_size"] == 4
- assert stats["pool_available"] == 0
- assert stats["requests_waiting"] == 3
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-def test_stats_usage(dsn):
- def worker(n):
- try:
- with p.connection(timeout=0.3) as conn:
- conn.execute("select pg_sleep(0.2)")
- except pool.PoolTimeout:
- pass
-
- with pool.ConnectionPool(dsn, min_size=3) as p:
- p.wait(2.0)
-
- ts = [spawn(worker, args=(i,)) for i in range(7)]
- gather(*ts)
- stats = p.get_stats()
- assert stats["requests_num"] == 7
- assert stats["requests_queued"] == 4
- assert 850 <= stats["requests_wait_ms"] <= 950
- assert stats["requests_errors"] == 1
- assert 1150 <= stats["usage_ms"] <= 1250
- assert stats.get("returns_bad", 0) == 0
-
- with p.connection() as conn:
- conn.close()
- p.wait()
- stats = p.pop_stats()
- assert stats["requests_num"] == 8
- assert stats["returns_bad"] == 1
- with p.connection():
- pass
- assert p.get_stats()["requests_num"] == 1
-
-
@pytest.mark.slow
def test_stats_connect(dsn, proxy, monkeypatch):
proxy.start()
from psycopg.rows import class_row, Row, TupleRow
from psycopg._compat import assert_type, Counter
-from ..utils import AEvent, spawn, gather, asleep, is_alive, is_async
+from ..utils import AEvent, spawn, gather, asleep, is_async
try:
import psycopg_pool as pool
pytestmark = [pytest.mark.anyio]
-async def test_defaults(dsn):
+async def test_default_sizes(dsn):
async with pool.AsyncConnectionPool(dsn) as p:
assert p.min_size == p.max_size == 4
- assert p.timeout == 30
- assert p.max_idle == 10 * 60
- assert p.max_lifetime == 60 * 60
- assert p.num_workers == 3
@pytest.mark.parametrize("min_size, max_size", [(2, None), (0, 2), (2, 4)])
pool.AsyncConnectionPool(min_size=min_size, max_size=max_size)
-async def test_connection_class(dsn):
- class MyConn(psycopg.AsyncConnection[Any]):
- pass
-
- async with pool.AsyncConnectionPool(dsn, connection_class=MyConn, min_size=1) as p:
- async with p.connection() as conn:
- assert isinstance(conn, MyConn)
-
-
-async def test_kwargs(dsn):
- async with pool.AsyncConnectionPool(
- dsn, kwargs={"autocommit": True}, min_size=1
- ) as p:
- async with p.connection() as conn:
- assert conn.autocommit
-
-
class MyRow(Dict[str, Any]):
...
assert conn.info.backend_pid in (pid1, pid2)
-async def test_context(dsn):
- async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
- assert not p.closed
- assert p.closed
-
-
@pytest.mark.crdb_skip("backend pid")
async def test_connection_not_lost(dsn):
async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
await p.wait(0.0001) # idempotent
-async def test_wait_closed(dsn):
- async with pool.AsyncConnectionPool(dsn) as p:
- pass
-
- with pytest.raises(pool.PoolClosed):
- await p.wait()
-
-
-@pytest.mark.slow
-async def test_setup_no_timeout(dsn, proxy):
- with pytest.raises(pool.PoolTimeout):
- async with pool.AsyncConnectionPool(
- proxy.client_dsn, min_size=1, num_workers=1
- ) as p:
- await p.wait(0.2)
-
- async with pool.AsyncConnectionPool(
- proxy.client_dsn, min_size=1, num_workers=1
- ) as p:
- await asleep(0.5)
- assert not p._pool
- proxy.start()
-
- async with p.connection() as conn:
- await conn.execute("select 1")
-
-
async def test_configure(dsn):
inits = 0
assert (await res.fetchone())[0] == "on" # type: ignore[index]
-@pytest.mark.slow
-async def test_configure_badstate(dsn, caplog):
- caplog.set_level(logging.WARNING, logger="psycopg.pool")
-
- async def configure(conn):
- await conn.execute("select 1")
-
- async with pool.AsyncConnectionPool(dsn, min_size=1, configure=configure) as p:
- with pytest.raises(pool.PoolTimeout):
- await p.wait(timeout=0.5)
-
- assert caplog.records
- assert "INTRANS" in caplog.records[0].message
-
-
-@pytest.mark.slow
-async def test_configure_broken(dsn, caplog):
- caplog.set_level(logging.WARNING, logger="psycopg.pool")
-
- async def configure(conn):
- async with conn.transaction():
- await conn.execute("WAT")
-
- async with pool.AsyncConnectionPool(dsn, min_size=1, configure=configure) as p:
- with pytest.raises(pool.PoolTimeout):
- await p.wait(timeout=0.5)
-
- assert caplog.records
- assert "WAT" in caplog.records[0].message
-
-
async def test_reset(dsn):
resets = 0
assert "WAT" in caplog.records[0].message
-@pytest.mark.slow
-@pytest.mark.timing
-@pytest.mark.crdb_skip("backend pid")
-async def test_queue(dsn):
- async def worker(n):
- t0 = time()
- async with p.connection() as conn:
- await conn.execute("select pg_sleep(0.2)")
- pid = conn.info.backend_pid
- t1 = time()
- results.append((n, t1 - t0, pid))
-
- results: List[Tuple[int, float, int]] = []
- async with pool.AsyncConnectionPool(dsn, min_size=2) as p:
- await p.wait()
- ts = [spawn(worker, args=(i,)) for i in range(6)]
- await gather(*ts)
-
- times = [item[1] for item in results]
- want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
- for got, want in zip(times, want_times):
- assert got == pytest.approx(want, 0.1), times
-
- assert len(set(r[2] for r in results)) == 2, results
-
-
-@pytest.mark.slow
-async def test_queue_size(dsn):
- async def worker(t, ev=None):
- try:
- async with p.connection():
- if ev:
- ev.set()
- await asleep(t)
- except pool.TooManyRequests as e:
- errors.append(e)
- else:
- success.append(True)
-
- errors: List[Exception] = []
- success: List[bool] = []
-
- async with pool.AsyncConnectionPool(dsn, min_size=1, max_waiting=3) as p:
- await p.wait()
- ev = AEvent()
- spawn(worker, args=(0.3, ev))
- await ev.wait()
-
- ts = [spawn(worker, args=(0.1,)) for i in range(4)]
- await gather(*ts)
-
- assert len(success) == 4
- assert len(errors) == 1
- assert isinstance(errors[0], pool.TooManyRequests)
- assert p.name in str(errors[0])
- assert str(p.max_waiting) in str(errors[0])
- assert p.get_stats()["requests_errors"] == 1
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-@pytest.mark.crdb_skip("backend pid")
-async def test_queue_timeout(dsn):
- async def worker(n):
- t0 = time()
- try:
- async with p.connection() as conn:
- await conn.execute("select pg_sleep(0.2)")
- pid = conn.info.backend_pid
- except pool.PoolTimeout as e:
- t1 = time()
- errors.append((n, t1 - t0, e))
- else:
- t1 = time()
- results.append((n, t1 - t0, pid))
-
- results: List[Tuple[int, float, int]] = []
- errors: List[Tuple[int, float, Exception]] = []
-
- async with pool.AsyncConnectionPool(dsn, min_size=2, timeout=0.1) as p:
- ts = [spawn(worker, args=(i,)) for i in range(4)]
- await gather(*ts)
-
- assert len(results) == 2
- assert len(errors) == 2
- for e in errors:
- assert 0.1 < e[1] < 0.15
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-async def test_dead_client(dsn):
- async def worker(i, timeout):
- try:
- async with p.connection(timeout=timeout) as conn:
- await conn.execute("select pg_sleep(0.3)")
- results.append(i)
- except pool.PoolTimeout:
- if timeout > 0.2:
- raise
-
- async with pool.AsyncConnectionPool(dsn, min_size=2) as p:
- results: List[int] = []
- ts = [
- spawn(worker, args=(i, timeout))
- for i, timeout in enumerate([0.4, 0.4, 0.1, 0.4, 0.4])
- ]
- await gather(*ts)
-
- await asleep(0.2)
- assert set(results) == set([0, 1, 3, 4])
- assert len(p._pool) == 2 # no connection was lost
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-@pytest.mark.crdb_skip("backend pid")
-async def test_queue_timeout_override(dsn):
- async def worker(n):
- t0 = time()
- timeout = 0.25 if n == 3 else None
- try:
- async with p.connection(timeout=timeout) as conn:
- await conn.execute("select pg_sleep(0.2)")
- pid = conn.info.backend_pid
- except pool.PoolTimeout as e:
- t1 = time()
- errors.append((n, t1 - t0, e))
- else:
- t1 = time()
- results.append((n, t1 - t0, pid))
-
- results: List[Tuple[int, float, int]] = []
- errors: List[Tuple[int, float, Exception]] = []
-
- async with pool.AsyncConnectionPool(dsn, min_size=2, timeout=0.1) as p:
- ts = [spawn(worker, args=(i,)) for i in range(4)]
- await gather(*ts)
-
- assert len(results) == 3
- assert len(errors) == 1
- for e in errors:
- assert 0.1 < e[1] < 0.15
-
-
-@pytest.mark.crdb_skip("backend pid")
-async def test_broken_reconnect(dsn):
- async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
- async with p.connection() as conn:
- pid1 = conn.info.backend_pid
- await conn.close()
-
- async with p.connection() as conn2:
- pid2 = conn2.info.backend_pid
-
- assert pid1 != pid2
-
-
@pytest.mark.crdb_skip("backend pid")
async def test_intrans_rollback(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
assert "BAD" in caplog.records[2].message
-async def test_close_no_tasks(dsn):
- p = pool.AsyncConnectionPool(dsn)
- assert p._sched_runner and is_alive(p._sched_runner)
- workers = p._workers[:]
- assert workers
- for t in workers:
- assert is_alive(t)
-
- await p.close()
- assert p._sched_runner is None
- assert not p._workers
- for t in workers:
- assert not is_alive(t)
-
-
-async def test_putconn_no_pool(aconn_cls, dsn):
- async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
- conn = await aconn_cls.connect(dsn)
- with pytest.raises(ValueError):
- await p.putconn(conn)
-
- await conn.close()
-
-
-async def test_putconn_wrong_pool(dsn):
- async with pool.AsyncConnectionPool(dsn, min_size=1) as p1:
- async with pool.AsyncConnectionPool(dsn, min_size=1) as p2:
- conn = await p1.getconn()
- with pytest.raises(ValueError):
- await p2.putconn(conn)
-
-
async def test_del_no_warning(dsn, recwarn):
p = pool.AsyncConnectionPool(dsn, min_size=2)
async with p.connection() as conn:
assert not recwarn, [str(w.message) for w in recwarn.list]
-@pytest.mark.slow
-@pytest.mark.skipif(is_async(__name__), reason="sync test only")
-async def test_del_stops_threads(dsn):
- p = pool.AsyncConnectionPool(dsn)
- assert p._sched_runner is not None
- ts = [p._sched_runner] + p._workers
- del p
- await asleep(0.1)
- for t in ts:
- assert not is_alive(t), t
-
-
-async def test_closed_getconn(dsn):
- p = pool.AsyncConnectionPool(dsn, min_size=1)
- assert not p.closed
- async with p.connection():
- pass
-
- await p.close()
- assert p.closed
-
- with pytest.raises(pool.PoolClosed):
- async with p.connection():
- pass
-
-
async def test_closed_putconn(dsn):
- p = pool.AsyncConnectionPool(dsn, min_size=1)
-
- async with p.connection() as conn:
- pass
- assert not conn.closed
-
- async with p.connection() as conn:
- await p.close()
- assert conn.closed
-
-
-async def test_closed_queue(dsn):
- async def w1():
+ async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
async with p.connection() as conn:
- e1.set() # Tell w0 that w1 got a connection
- cur = await conn.execute("select 1")
- assert await cur.fetchone() == (1,)
- await e2.wait() # Wait until w0 has tested w2
- success.append("w1")
-
- async def w2():
- try:
- async with p.connection():
- pass # unexpected
- except pool.PoolClosed:
- success.append("w2")
-
- e1 = AEvent()
- e2 = AEvent()
-
- p = pool.AsyncConnectionPool(dsn, min_size=1)
- await p.wait()
- success: List[str] = []
-
- t1 = spawn(w1)
- # Wait until w1 has received a connection
- await e1.wait()
-
- t2 = spawn(w2)
- # Wait until w2 is in the queue
- await ensure_waiting(p)
- await p.close()
-
- # Wait for the workers to finish
- e2.set()
- await gather(t1, t2)
- assert len(success) == 2
-
-
-async def test_open_explicit(dsn):
- p = pool.AsyncConnectionPool(dsn, open=False)
- assert p.closed
- with pytest.raises(pool.PoolClosed, match="is not open yet"):
- await p.getconn()
-
- with pytest.raises(pool.PoolClosed, match="is not open yet"):
- async with p.connection():
pass
-
- await p.open()
- try:
- assert not p.closed
-
- async with p.connection() as conn:
- cur = await conn.execute("select 1")
- assert await cur.fetchone() == (1,)
-
- finally:
- await p.close()
-
- with pytest.raises(pool.PoolClosed, match="is already closed"):
- await p.getconn()
-
-
-async def test_open_context(dsn):
- p = pool.AsyncConnectionPool(dsn, open=False)
- assert p.closed
-
- async with p:
- assert not p.closed
-
- async with p.connection() as conn:
- cur = await conn.execute("select 1")
- assert await cur.fetchone() == (1,)
-
- assert p.closed
-
-
-async def test_open_no_op(dsn):
- p = pool.AsyncConnectionPool(dsn)
- try:
- assert not p.closed
- await p.open()
- assert not p.closed
-
- async with p.connection() as conn:
- cur = await conn.execute("select 1")
- assert await cur.fetchone() == (1,)
-
- finally:
- await p.close()
+ assert not conn.closed
@pytest.mark.slow
await p.open(wait=True, timeout=0.5)
-async def test_reopen(dsn):
- p = pool.AsyncConnectionPool(dsn)
- async with p.connection() as conn:
- await conn.execute("select 1")
- await p.close()
- assert p._sched_runner is None
- assert not p._workers
-
- with pytest.raises(psycopg.OperationalError, match="cannot be reused"):
- await p.open()
-
-
@pytest.mark.slow
@pytest.mark.timing
@pytest.mark.parametrize(
await p.resize(min_size=min_size, max_size=max_size)
-async def test_jitter():
- rnds = [pool.AsyncConnectionPool._jitter(30, -0.1, +0.2) for i in range(100)]
- assert 27 <= min(rnds) <= 28
- assert 35 < max(rnds) < 36
-
-
@pytest.mark.slow
@pytest.mark.timing
@pytest.mark.crdb_skip("backend pid")
assert conn.info.backend_pid != pid
-@pytest.mark.slow
-@pytest.mark.timing
-async def test_stats_measures(dsn):
- async def worker(n):
- async with p.connection() as conn:
- await conn.execute("select pg_sleep(0.2)")
-
- async with pool.AsyncConnectionPool(dsn, min_size=2, max_size=4) as p:
- await p.wait(2.0)
-
- stats = p.get_stats()
- assert stats["pool_min"] == 2
- assert stats["pool_max"] == 4
- assert stats["pool_size"] == 2
- assert stats["pool_available"] == 2
- assert stats["requests_waiting"] == 0
-
- ts = [spawn(worker, args=(i,)) for i in range(3)]
- await asleep(0.1)
- stats = p.get_stats()
- await gather(*ts)
- assert stats["pool_min"] == 2
- assert stats["pool_max"] == 4
- assert stats["pool_size"] == 3
- assert stats["pool_available"] == 0
- assert stats["requests_waiting"] == 0
-
- await p.wait(2.0)
- ts = [spawn(worker, args=(i,)) for i in range(7)]
- await asleep(0.1)
- stats = p.get_stats()
- await gather(*ts)
- assert stats["pool_min"] == 2
- assert stats["pool_max"] == 4
- assert stats["pool_size"] == 4
- assert stats["pool_available"] == 0
- assert stats["requests_waiting"] == 3
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-async def test_stats_usage(dsn):
- async def worker(n):
- try:
- async with p.connection(timeout=0.3) as conn:
- await conn.execute("select pg_sleep(0.2)")
- except pool.PoolTimeout:
- pass
-
- async with pool.AsyncConnectionPool(dsn, min_size=3) as p:
- await p.wait(2.0)
-
- ts = [spawn(worker, args=(i,)) for i in range(7)]
- await gather(*ts)
- stats = p.get_stats()
- assert stats["requests_num"] == 7
- assert stats["requests_queued"] == 4
- assert 850 <= stats["requests_wait_ms"] <= 950
- assert stats["requests_errors"] == 1
- assert 1150 <= stats["usage_ms"] <= 1250
- assert stats.get("returns_bad", 0) == 0
-
- async with p.connection() as conn:
- await conn.close()
- await p.wait()
- stats = p.pop_stats()
- assert stats["requests_num"] == 8
- assert stats["returns_bad"] == 1
- async with p.connection():
- pass
- assert p.get_stats()["requests_num"] == 1
-
-
@pytest.mark.slow
async def test_stats_connect(dsn, proxy, monkeypatch):
proxy.start()
--- /dev/null
+# WARNING: this file is auto-generated by 'async_to_sync.py'
+# from the original file 'test_pool_common_async.py'
+# DO NOT CHANGE! Change the original file instead.
+import logging
+from time import time
+from typing import Any, List, Tuple
+
+import pytest
+
+import psycopg
+
+from ..utils import Event, spawn, gather, sleep, is_alive, is_async
+
+try:
+ import psycopg_pool as pool
+except ImportError:
+ # Tests should have been skipped if the package is not available
+ pass
+
+
+@pytest.fixture(params=[pool.ConnectionPool, pool.NullConnectionPool])
+def pool_cls(request):
+ return request.param
+
+
+def test_defaults(pool_cls, dsn):
+ with pool_cls(dsn) as p:
+ assert p.timeout == 30
+ assert p.max_idle == 10 * 60
+ assert p.max_lifetime == 60 * 60
+ assert p.num_workers == 3
+
+
+def test_connection_class(pool_cls, dsn):
+ class MyConn(psycopg.Connection[Any]):
+ pass
+
+ with pool_cls(dsn, connection_class=MyConn, min_size=min_size(pool_cls)) as p:
+ with p.connection() as conn:
+ assert isinstance(conn, MyConn)
+
+
+def test_kwargs(pool_cls, dsn):
+ with pool_cls(dsn, kwargs={"autocommit": True}, min_size=min_size(pool_cls)) as p:
+ with p.connection() as conn:
+ assert conn.autocommit
+
+
+def test_context(pool_cls, dsn):
+ with pool_cls(dsn, min_size=min_size(pool_cls)) as p:
+ assert not p.closed
+ assert p.closed
+
+
+def test_wait_closed(pool_cls, dsn):
+ with pool_cls(dsn) as p:
+ pass
+
+ with pytest.raises(pool.PoolClosed):
+ p.wait()
+
+
+@pytest.mark.slow
+def test_setup_no_timeout(pool_cls, dsn, proxy):
+ with pytest.raises(pool.PoolTimeout):
+ with pool_cls(
+ proxy.client_dsn, min_size=min_size(pool_cls), num_workers=1
+ ) as p:
+ p.wait(0.2)
+
+ with pool_cls(proxy.client_dsn, min_size=min_size(pool_cls), num_workers=1) as p:
+ sleep(0.5)
+ assert not p._pool
+ proxy.start()
+
+ with p.connection() as conn:
+ conn.execute("select 1")
+
+
+@pytest.mark.slow
+def test_configure_badstate(pool_cls, dsn, caplog):
+ caplog.set_level(logging.WARNING, logger="psycopg.pool")
+
+ def configure(conn):
+ conn.execute("select 1")
+
+ with pool_cls(dsn, min_size=min_size(pool_cls), configure=configure) as p:
+ with pytest.raises(pool.PoolTimeout):
+ p.wait(timeout=0.5)
+
+ assert caplog.records
+ assert "INTRANS" in caplog.records[0].message
+
+
+@pytest.mark.slow
+def test_configure_broken(pool_cls, dsn, caplog):
+ caplog.set_level(logging.WARNING, logger="psycopg.pool")
+
+ def configure(conn):
+ with conn.transaction():
+ conn.execute("WAT")
+
+ with pool_cls(dsn, min_size=min_size(pool_cls), configure=configure) as p:
+ with pytest.raises(pool.PoolTimeout):
+ p.wait(timeout=0.5)
+
+ assert caplog.records
+ assert "WAT" in caplog.records[0].message
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+@pytest.mark.crdb_skip("backend pid")
+def test_queue(pool_cls, dsn):
+ def worker(n):
+ t0 = time()
+ with p.connection() as conn:
+ conn.execute("select pg_sleep(0.2)")
+ pid = conn.info.backend_pid
+ t1 = time()
+ results.append((n, t1 - t0, pid))
+
+ results: List[Tuple[int, float, int]] = []
+ with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2) as p:
+ p.wait()
+ ts = [spawn(worker, args=(i,)) for i in range(6)]
+ gather(*ts)
+
+ times = [item[1] for item in results]
+ want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
+ for got, want in zip(times, want_times):
+ assert got == pytest.approx(want, 0.2), times
+
+ assert len(set((r[2] for r in results))) == 2, results
+
+
+@pytest.mark.slow
+def test_queue_size(pool_cls, dsn):
+ def worker(t, ev=None):
+ try:
+ with p.connection():
+ if ev:
+ ev.set()
+ sleep(t)
+ except pool.TooManyRequests as e:
+ errors.append(e)
+ else:
+ success.append(True)
+
+ errors: List[Exception] = []
+ success: List[bool] = []
+
+ with pool_cls(dsn, min_size=min_size(pool_cls), max_size=1, max_waiting=3) as p:
+ p.wait()
+ ev = Event()
+ spawn(worker, args=(0.3, ev))
+ ev.wait()
+
+ ts = [spawn(worker, args=(0.1,)) for i in range(4)]
+ gather(*ts)
+
+ assert len(success) == 4
+ assert len(errors) == 1
+ assert isinstance(errors[0], pool.TooManyRequests)
+ assert p.name in str(errors[0])
+ assert str(p.max_waiting) in str(errors[0])
+ assert p.get_stats()["requests_errors"] == 1
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+@pytest.mark.crdb_skip("backend pid")
+def test_queue_timeout(pool_cls, dsn):
+ def worker(n):
+ t0 = time()
+ try:
+ with p.connection() as conn:
+ conn.execute("select pg_sleep(0.2)")
+ pid = conn.info.backend_pid
+ except pool.PoolTimeout as e:
+ t1 = time()
+ errors.append((n, t1 - t0, e))
+ else:
+ t1 = time()
+ results.append((n, t1 - t0, pid))
+
+ results: List[Tuple[int, float, int]] = []
+ errors: List[Tuple[int, float, Exception]] = []
+
+ with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2, timeout=0.1) as p:
+ ts = [spawn(worker, args=(i,)) for i in range(4)]
+ gather(*ts)
+
+ assert len(results) == 2
+ assert len(errors) == 2
+ for e in errors:
+ assert 0.1 < e[1] < 0.15
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+def test_dead_client(pool_cls, dsn):
+ def worker(i, timeout):
+ try:
+ with p.connection(timeout=timeout) as conn:
+ conn.execute("select pg_sleep(0.3)")
+ results.append(i)
+ except pool.PoolTimeout:
+ if timeout > 0.2:
+ raise
+
+ with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2) as p:
+ results: List[int] = []
+ ts = [
+ spawn(worker, args=(i, timeout))
+ for (i, timeout) in enumerate([0.4, 0.4, 0.1, 0.4, 0.4])
+ ]
+ gather(*ts)
+
+ sleep(0.2)
+ assert set(results) == set([0, 1, 3, 4])
+ if pool_cls is pool.ConnectionPool:
+ assert len(p._pool) == 2 # no connection was lost
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+@pytest.mark.crdb_skip("backend pid")
+def test_queue_timeout_override(pool_cls, dsn):
+ def worker(n):
+ t0 = time()
+ timeout = 0.25 if n == 3 else None
+ try:
+ with p.connection(timeout=timeout) as conn:
+ conn.execute("select pg_sleep(0.2)")
+ pid = conn.info.backend_pid
+ except pool.PoolTimeout as e:
+ t1 = time()
+ errors.append((n, t1 - t0, e))
+ else:
+ t1 = time()
+ results.append((n, t1 - t0, pid))
+
+ results: List[Tuple[int, float, int]] = []
+ errors: List[Tuple[int, float, Exception]] = []
+
+ with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2, timeout=0.1) as p:
+ ts = [spawn(worker, args=(i,)) for i in range(4)]
+ gather(*ts)
+
+ assert len(results) == 3
+ assert len(errors) == 1
+ for e in errors:
+ assert 0.1 < e[1] < 0.15
+
+
+@pytest.mark.crdb_skip("backend pid")
+def test_broken_reconnect(pool_cls, dsn):
+ with pool_cls(dsn, min_size=min_size(pool_cls), max_size=1) as p:
+ with p.connection() as conn:
+ pid1 = conn.info.backend_pid
+ conn.close()
+
+ with p.connection() as conn2:
+ pid2 = conn2.info.backend_pid
+
+ assert pid1 != pid2
+
+
+def test_close_no_tasks(pool_cls, dsn):
+ p = pool_cls(dsn)
+ assert p._sched_runner and is_alive(p._sched_runner)
+ workers = p._workers[:]
+ assert workers
+ for t in workers:
+ assert is_alive(t)
+
+ p.close()
+ assert p._sched_runner is None
+ assert not p._workers
+ for t in workers:
+ assert not is_alive(t)
+
+
+def test_putconn_no_pool(pool_cls, conn_cls, dsn):
+ with pool_cls(dsn, min_size=min_size(pool_cls)) as p:
+ conn = conn_cls.connect(dsn)
+ with pytest.raises(ValueError):
+ p.putconn(conn)
+
+ conn.close()
+
+
+def test_putconn_wrong_pool(pool_cls, dsn):
+ with pool_cls(dsn, min_size=min_size(pool_cls)) as p1:
+ with pool_cls(dsn, min_size=min_size(pool_cls)) as p2:
+ conn = p1.getconn()
+ with pytest.raises(ValueError):
+ p2.putconn(conn)
+
+
+@pytest.mark.slow
+@pytest.mark.skipif(is_async(__name__), reason="sync test only")
+def test_del_stops_threads(pool_cls, dsn):
+ p = pool_cls(dsn)
+ assert p._sched_runner is not None
+ ts = [p._sched_runner] + p._workers
+ del p
+ sleep(0.1)
+ for t in ts:
+ assert not is_alive(t), t
+
+
+def test_closed_getconn(pool_cls, dsn):
+ p = pool_cls(dsn, min_size=min_size(pool_cls))
+ assert not p.closed
+ with p.connection():
+ pass
+
+ p.close()
+ assert p.closed
+
+ with pytest.raises(pool.PoolClosed):
+ with p.connection():
+ pass
+
+
+def test_close_connection_on_pool_close(pool_cls, dsn):
+ p = pool_cls(dsn, min_size=min_size(pool_cls))
+ with p.connection() as conn:
+ p.close()
+ assert conn.closed
+
+
+def test_closed_queue(pool_cls, dsn):
+ def w1():
+ with p.connection() as conn:
+ e1.set() # Tell w0 that w1 got a connection
+ cur = conn.execute("select 1")
+ assert cur.fetchone() == (1,)
+ e2.wait() # Wait until w0 has tested w2
+ success.append("w1")
+
+ def w2():
+ try:
+ with p.connection():
+ pass # unexpected
+ except pool.PoolClosed:
+ success.append("w2")
+
+ e1 = Event()
+ e2 = Event()
+
+ p = pool_cls(dsn, min_size=min_size(pool_cls), max_size=1)
+ p.wait()
+ success: List[str] = []
+
+ t1 = spawn(w1)
+ # Wait until w1 has received a connection
+ e1.wait()
+
+ t2 = spawn(w2)
+ # Wait until w2 is in the queue
+ ensure_waiting(p)
+ p.close()
+
+ # Wait for the workers to finish
+ e2.set()
+ gather(t1, t2)
+ assert len(success) == 2
+
+
+def test_open_explicit(pool_cls, dsn):
+ p = pool_cls(dsn, open=False)
+ assert p.closed
+ with pytest.raises(pool.PoolClosed, match="is not open yet"):
+ p.getconn()
+
+ with pytest.raises(pool.PoolClosed, match="is not open yet"):
+ with p.connection():
+ pass
+
+ p.open()
+ try:
+ assert not p.closed
+
+ with p.connection() as conn:
+ cur = conn.execute("select 1")
+ assert cur.fetchone() == (1,)
+ finally:
+ p.close()
+
+ with pytest.raises(pool.PoolClosed, match="is already closed"):
+ p.getconn()
+
+
+def test_open_context(pool_cls, dsn):
+ p = pool_cls(dsn, open=False)
+ assert p.closed
+
+ with p:
+ assert not p.closed
+
+ with p.connection() as conn:
+ cur = conn.execute("select 1")
+ assert cur.fetchone() == (1,)
+
+ assert p.closed
+
+
+def test_open_no_op(pool_cls, dsn):
+ p = pool_cls(dsn)
+ try:
+ assert not p.closed
+ p.open()
+ assert not p.closed
+
+ with p.connection() as conn:
+ cur = conn.execute("select 1")
+ assert cur.fetchone() == (1,)
+ finally:
+ p.close()
+
+
+def test_reopen(pool_cls, dsn):
+ p = pool_cls(dsn)
+ with p.connection() as conn:
+ conn.execute("select 1")
+ p.close()
+ assert p._sched_runner is None
+ assert not p._workers
+
+ with pytest.raises(psycopg.OperationalError, match="cannot be reused"):
+ p.open()
+
+
+def test_jitter(pool_cls):
+ rnds = [pool_cls._jitter(30, -0.1, +0.2) for i in range(100)]
+ assert 27 <= min(rnds) <= 28
+ assert 35 < max(rnds) < 36
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+def test_stats_measures(pool_cls, dsn):
+ def worker(n):
+ with p.connection() as conn:
+ conn.execute("select pg_sleep(0.2)")
+
+ with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=4) as p:
+ p.wait(2.0)
+
+ stats = p.get_stats()
+ assert stats["pool_min"] == min_size(pool_cls, 2)
+ assert stats["pool_max"] == 4
+ assert stats["pool_size"] == min_size(pool_cls, 2)
+ assert stats["pool_available"] == min_size(pool_cls, 2)
+ assert stats["requests_waiting"] == 0
+
+ ts = [spawn(worker, args=(i,)) for i in range(3)]
+ sleep(0.1)
+ stats = p.get_stats()
+ gather(*ts)
+ assert stats["pool_min"] == min_size(pool_cls, 2)
+ assert stats["pool_max"] == 4
+ assert stats["pool_size"] == 3
+ assert stats["pool_available"] == 0
+ assert stats["requests_waiting"] == 0
+
+ p.wait(2.0)
+ ts = [spawn(worker, args=(i,)) for i in range(7)]
+ sleep(0.1)
+ stats = p.get_stats()
+ gather(*ts)
+ assert stats["pool_min"] == min_size(pool_cls, 2)
+ assert stats["pool_max"] == 4
+ assert stats["pool_size"] == 4
+ assert stats["pool_available"] == 0
+ assert stats["requests_waiting"] == 3
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+def test_stats_usage(pool_cls, dsn):
+ def worker(n):
+ try:
+ with p.connection(timeout=0.3) as conn:
+ conn.execute("select pg_sleep(0.2)")
+ except pool.PoolTimeout:
+ pass
+
+ with pool_cls(dsn, min_size=min_size(pool_cls, 3), max_size=3) as p:
+ p.wait(2.0)
+
+ ts = [spawn(worker, args=(i,)) for i in range(7)]
+ gather(*ts)
+ stats = p.get_stats()
+ assert stats["requests_num"] == 7
+ assert stats["requests_queued"] == 4
+ assert 850 <= stats["requests_wait_ms"] <= 950
+ assert stats["requests_errors"] == 1
+ assert 1150 <= stats["usage_ms"] <= 1250
+ assert stats.get("returns_bad", 0) == 0
+
+ with p.connection() as conn:
+ conn.close()
+ p.wait()
+ stats = p.pop_stats()
+ assert stats["requests_num"] == 8
+ assert stats["returns_bad"] == 1
+ with p.connection():
+ pass
+ assert p.get_stats()["requests_num"] == 1
+
+
+def test_debug_deadlock(pool_cls, dsn):
+ # https://github.com/psycopg/psycopg/issues/230
+ logger = logging.getLogger("psycopg")
+ handler = logging.StreamHandler()
+ old_level = logger.level
+ logger.setLevel(logging.DEBUG)
+ handler.setLevel(logging.DEBUG)
+ logger.addHandler(handler)
+ try:
+ with pool_cls(dsn, min_size=min_size(pool_cls, 4), open=True) as p:
+ p.wait(timeout=2)
+ finally:
+ logger.removeHandler(handler)
+ logger.setLevel(old_level)
+
+
+@pytest.mark.skipif(not is_async(__name__), reason="async test only")
+def test_cancellation_in_queue(pool_cls, dsn):
+ # https://github.com/psycopg/psycopg/issues/509
+
+ nconns = 3
+
+ with pool_cls(
+ dsn, min_size=min_size(pool_cls, nconns), max_size=nconns, timeout=1
+ ) as p:
+ p.wait()
+
+ got_conns = []
+ ev = Event()
+
+ def worker(i):
+ try:
+ logging.info("worker %s started", i)
+ nonlocal got_conns
+
+ with p.connection() as conn:
+ logging.info("worker %s got conn", i)
+ cur = conn.execute("select 1")
+ assert cur.fetchone() == (1,)
+
+ got_conns.append(conn)
+ if len(got_conns) >= nconns:
+ ev.set()
+
+ sleep(5)
+ except BaseException as ex:
+ logging.info("worker %s stopped: %r", i, ex)
+ raise
+
+ # Start tasks taking up all the connections and getting in the queue
+ tasks = [spawn(worker, (i,)) for i in range(nconns * 3)]
+
+ # wait until the pool has served all the connections and clients are queued.
+ ev.wait(3.0)
+ for i in range(10):
+ if p.get_stats().get("requests_queued", 0):
+ break
+ else:
+ sleep(0.1)
+ else:
+ pytest.fail("no client got in the queue")
+
+ [task.cancel() for task in reversed(tasks)]
+ gather(*tasks, return_exceptions=True, timeout=1.0)
+
+ stats = p.get_stats()
+ assert stats["pool_available"] == min_size(pool_cls, nconns)
+ assert stats.get("requests_waiting", 0) == 0
+
+ with p.connection() as conn:
+ cur = conn.execute("select 1")
+ assert cur.fetchone() == (1,)
+
+
+def min_size(pool_cls, num=1):
+ """Return the minimum min_size supported by the pool class."""
+ if pool_cls is pool.ConnectionPool:
+ return num
+ elif pool_cls is pool.NullConnectionPool:
+ return 0
+ else:
+ assert False, pool_cls
+
+
+def delay_connection(monkeypatch, sec):
+ """
+ Return a _connect_gen function delayed by the amount of seconds
+ """
+
+ def connect_delay(*args, **kwargs):
+ t0 = time()
+ rv = connect_orig(*args, **kwargs)
+ t1 = time()
+ sleep(max(0, sec - (t1 - t0)))
+ return rv
+
+ connect_orig = psycopg.Connection.connect
+ monkeypatch.setattr(psycopg.Connection, "connect", connect_delay)
+
+
+def ensure_waiting(p, num=1):
+ """
+ Wait until there are at least *num* clients waiting in the queue.
+ """
+ while len(p._waiting) < num:
+ sleep(0)
--- /dev/null
+import logging
+from time import time
+from typing import Any, List, Tuple
+
+import pytest
+
+import psycopg
+
+from ..utils import AEvent, spawn, gather, asleep, is_alive, is_async
+
+try:
+ import psycopg_pool as pool
+except ImportError:
+ # Tests should have been skipped if the package is not available
+ pass
+
+if True: # ASYNC
+ pytestmark = [pytest.mark.anyio]
+
+
+@pytest.fixture(params=[pool.AsyncConnectionPool, pool.AsyncNullConnectionPool])
+def pool_cls(request):
+ return request.param
+
+
+async def test_defaults(pool_cls, dsn):
+ async with pool_cls(dsn) as p:
+ assert p.timeout == 30
+ assert p.max_idle == 10 * 60
+ assert p.max_lifetime == 60 * 60
+ assert p.num_workers == 3
+
+
+async def test_connection_class(pool_cls, dsn):
+ class MyConn(psycopg.AsyncConnection[Any]):
+ pass
+
+ async with pool_cls(dsn, connection_class=MyConn, min_size=min_size(pool_cls)) as p:
+ async with p.connection() as conn:
+ assert isinstance(conn, MyConn)
+
+
+async def test_kwargs(pool_cls, dsn):
+ async with pool_cls(
+ dsn, kwargs={"autocommit": True}, min_size=min_size(pool_cls)
+ ) as p:
+ async with p.connection() as conn:
+ assert conn.autocommit
+
+
+async def test_context(pool_cls, dsn):
+ async with pool_cls(dsn, min_size=min_size(pool_cls)) as p:
+ assert not p.closed
+ assert p.closed
+
+
+async def test_wait_closed(pool_cls, dsn):
+ async with pool_cls(dsn) as p:
+ pass
+
+ with pytest.raises(pool.PoolClosed):
+ await p.wait()
+
+
+@pytest.mark.slow
+async def test_setup_no_timeout(pool_cls, dsn, proxy):
+ with pytest.raises(pool.PoolTimeout):
+ async with pool_cls(
+ proxy.client_dsn, min_size=min_size(pool_cls), num_workers=1
+ ) as p:
+ await p.wait(0.2)
+
+ async with pool_cls(
+ proxy.client_dsn, min_size=min_size(pool_cls), num_workers=1
+ ) as p:
+ await asleep(0.5)
+ assert not p._pool
+ proxy.start()
+
+ async with p.connection() as conn:
+ await conn.execute("select 1")
+
+
+@pytest.mark.slow
+async def test_configure_badstate(pool_cls, dsn, caplog):
+ caplog.set_level(logging.WARNING, logger="psycopg.pool")
+
+ async def configure(conn):
+ await conn.execute("select 1")
+
+ async with pool_cls(dsn, min_size=min_size(pool_cls), configure=configure) as p:
+ with pytest.raises(pool.PoolTimeout):
+ await p.wait(timeout=0.5)
+
+ assert caplog.records
+ assert "INTRANS" in caplog.records[0].message
+
+
+@pytest.mark.slow
+async def test_configure_broken(pool_cls, dsn, caplog):
+ caplog.set_level(logging.WARNING, logger="psycopg.pool")
+
+ async def configure(conn):
+ async with conn.transaction():
+ await conn.execute("WAT")
+
+ async with pool_cls(dsn, min_size=min_size(pool_cls), configure=configure) as p:
+ with pytest.raises(pool.PoolTimeout):
+ await p.wait(timeout=0.5)
+
+ assert caplog.records
+ assert "WAT" in caplog.records[0].message
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+@pytest.mark.crdb_skip("backend pid")
+async def test_queue(pool_cls, dsn):
+ async def worker(n):
+ t0 = time()
+ async with p.connection() as conn:
+ await conn.execute("select pg_sleep(0.2)")
+ pid = conn.info.backend_pid
+ t1 = time()
+ results.append((n, t1 - t0, pid))
+
+ results: List[Tuple[int, float, int]] = []
+ async with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2) as p:
+ await p.wait()
+ ts = [spawn(worker, args=(i,)) for i in range(6)]
+ await gather(*ts)
+
+ times = [item[1] for item in results]
+ want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
+ for got, want in zip(times, want_times):
+ assert got == pytest.approx(want, 0.2), times
+
+ assert len(set(r[2] for r in results)) == 2, results
+
+
+@pytest.mark.slow
+async def test_queue_size(pool_cls, dsn):
+ async def worker(t, ev=None):
+ try:
+ async with p.connection():
+ if ev:
+ ev.set()
+ await asleep(t)
+ except pool.TooManyRequests as e:
+ errors.append(e)
+ else:
+ success.append(True)
+
+ errors: List[Exception] = []
+ success: List[bool] = []
+
+ async with pool_cls(
+ dsn, min_size=min_size(pool_cls), max_size=1, max_waiting=3
+ ) as p:
+ await p.wait()
+ ev = AEvent()
+ spawn(worker, args=(0.3, ev))
+ await ev.wait()
+
+ ts = [spawn(worker, args=(0.1,)) for i in range(4)]
+ await gather(*ts)
+
+ assert len(success) == 4
+ assert len(errors) == 1
+ assert isinstance(errors[0], pool.TooManyRequests)
+ assert p.name in str(errors[0])
+ assert str(p.max_waiting) in str(errors[0])
+ assert p.get_stats()["requests_errors"] == 1
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+@pytest.mark.crdb_skip("backend pid")
+async def test_queue_timeout(pool_cls, dsn):
+ async def worker(n):
+ t0 = time()
+ try:
+ async with p.connection() as conn:
+ await conn.execute("select pg_sleep(0.2)")
+ pid = conn.info.backend_pid
+ except pool.PoolTimeout as e:
+ t1 = time()
+ errors.append((n, t1 - t0, e))
+ else:
+ t1 = time()
+ results.append((n, t1 - t0, pid))
+
+ results: List[Tuple[int, float, int]] = []
+ errors: List[Tuple[int, float, Exception]] = []
+
+ async with pool_cls(
+ dsn, min_size=min_size(pool_cls, 2), max_size=2, timeout=0.1
+ ) as p:
+ ts = [spawn(worker, args=(i,)) for i in range(4)]
+ await gather(*ts)
+
+ assert len(results) == 2
+ assert len(errors) == 2
+ for e in errors:
+ assert 0.1 < e[1] < 0.15
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+async def test_dead_client(pool_cls, dsn):
+ async def worker(i, timeout):
+ try:
+ async with p.connection(timeout=timeout) as conn:
+ await conn.execute("select pg_sleep(0.3)")
+ results.append(i)
+ except pool.PoolTimeout:
+ if timeout > 0.2:
+ raise
+
+ async with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2) as p:
+ results: List[int] = []
+ ts = [
+ spawn(worker, args=(i, timeout))
+ for i, timeout in enumerate([0.4, 0.4, 0.1, 0.4, 0.4])
+ ]
+ await gather(*ts)
+
+ await asleep(0.2)
+ assert set(results) == set([0, 1, 3, 4])
+ if pool_cls is pool.AsyncConnectionPool:
+ assert len(p._pool) == 2 # no connection was lost
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+@pytest.mark.crdb_skip("backend pid")
+async def test_queue_timeout_override(pool_cls, dsn):
+ async def worker(n):
+ t0 = time()
+ timeout = 0.25 if n == 3 else None
+ try:
+ async with p.connection(timeout=timeout) as conn:
+ await conn.execute("select pg_sleep(0.2)")
+ pid = conn.info.backend_pid
+ except pool.PoolTimeout as e:
+ t1 = time()
+ errors.append((n, t1 - t0, e))
+ else:
+ t1 = time()
+ results.append((n, t1 - t0, pid))
+
+ results: List[Tuple[int, float, int]] = []
+ errors: List[Tuple[int, float, Exception]] = []
+
+ async with pool_cls(
+ dsn, min_size=min_size(pool_cls, 2), max_size=2, timeout=0.1
+ ) as p:
+ ts = [spawn(worker, args=(i,)) for i in range(4)]
+ await gather(*ts)
+
+ assert len(results) == 3
+ assert len(errors) == 1
+ for e in errors:
+ assert 0.1 < e[1] < 0.15
+
+
+@pytest.mark.crdb_skip("backend pid")
+async def test_broken_reconnect(pool_cls, dsn):
+ async with pool_cls(dsn, min_size=min_size(pool_cls), max_size=1) as p:
+ async with p.connection() as conn:
+ pid1 = conn.info.backend_pid
+ await conn.close()
+
+ async with p.connection() as conn2:
+ pid2 = conn2.info.backend_pid
+
+ assert pid1 != pid2
+
+
+async def test_close_no_tasks(pool_cls, dsn):
+ p = pool_cls(dsn)
+ assert p._sched_runner and is_alive(p._sched_runner)
+ workers = p._workers[:]
+ assert workers
+ for t in workers:
+ assert is_alive(t)
+
+ await p.close()
+ assert p._sched_runner is None
+ assert not p._workers
+ for t in workers:
+ assert not is_alive(t)
+
+
+async def test_putconn_no_pool(pool_cls, aconn_cls, dsn):
+ async with pool_cls(dsn, min_size=min_size(pool_cls)) as p:
+ conn = await aconn_cls.connect(dsn)
+ with pytest.raises(ValueError):
+ await p.putconn(conn)
+
+ await conn.close()
+
+
+async def test_putconn_wrong_pool(pool_cls, dsn):
+ async with pool_cls(dsn, min_size=min_size(pool_cls)) as p1:
+ async with pool_cls(dsn, min_size=min_size(pool_cls)) as p2:
+ conn = await p1.getconn()
+ with pytest.raises(ValueError):
+ await p2.putconn(conn)
+
+
+@pytest.mark.slow
+@pytest.mark.skipif(is_async(__name__), reason="sync test only")
+async def test_del_stops_threads(pool_cls, dsn):
+ p = pool_cls(dsn)
+ assert p._sched_runner is not None
+ ts = [p._sched_runner] + p._workers
+ del p
+ await asleep(0.1)
+ for t in ts:
+ assert not is_alive(t), t
+
+
+async def test_closed_getconn(pool_cls, dsn):
+ p = pool_cls(dsn, min_size=min_size(pool_cls))
+ assert not p.closed
+ async with p.connection():
+ pass
+
+ await p.close()
+ assert p.closed
+
+ with pytest.raises(pool.PoolClosed):
+ async with p.connection():
+ pass
+
+
+async def test_close_connection_on_pool_close(pool_cls, dsn):
+ p = pool_cls(dsn, min_size=min_size(pool_cls))
+ async with p.connection() as conn:
+ await p.close()
+ assert conn.closed
+
+
+async def test_closed_queue(pool_cls, dsn):
+ async def w1():
+ async with p.connection() as conn:
+ e1.set() # Tell w0 that w1 got a connection
+ cur = await conn.execute("select 1")
+ assert await cur.fetchone() == (1,)
+ await e2.wait() # Wait until w0 has tested w2
+ success.append("w1")
+
+ async def w2():
+ try:
+ async with p.connection():
+ pass # unexpected
+ except pool.PoolClosed:
+ success.append("w2")
+
+ e1 = AEvent()
+ e2 = AEvent()
+
+ p = pool_cls(dsn, min_size=min_size(pool_cls), max_size=1)
+ await p.wait()
+ success: List[str] = []
+
+ t1 = spawn(w1)
+ # Wait until w1 has received a connection
+ await e1.wait()
+
+ t2 = spawn(w2)
+ # Wait until w2 is in the queue
+ await ensure_waiting(p)
+ await p.close()
+
+ # Wait for the workers to finish
+ e2.set()
+ await gather(t1, t2)
+ assert len(success) == 2
+
+
+async def test_open_explicit(pool_cls, dsn):
+ p = pool_cls(dsn, open=False)
+ assert p.closed
+ with pytest.raises(pool.PoolClosed, match="is not open yet"):
+ await p.getconn()
+
+ with pytest.raises(pool.PoolClosed, match="is not open yet"):
+ async with p.connection():
+ pass
+
+ await p.open()
+ try:
+ assert not p.closed
+
+ async with p.connection() as conn:
+ cur = await conn.execute("select 1")
+ assert await cur.fetchone() == (1,)
+
+ finally:
+ await p.close()
+
+ with pytest.raises(pool.PoolClosed, match="is already closed"):
+ await p.getconn()
+
+
+async def test_open_context(pool_cls, dsn):
+ p = pool_cls(dsn, open=False)
+ assert p.closed
+
+ async with p:
+ assert not p.closed
+
+ async with p.connection() as conn:
+ cur = await conn.execute("select 1")
+ assert await cur.fetchone() == (1,)
+
+ assert p.closed
+
+
+async def test_open_no_op(pool_cls, dsn):
+ p = pool_cls(dsn)
+ try:
+ assert not p.closed
+ await p.open()
+ assert not p.closed
+
+ async with p.connection() as conn:
+ cur = await conn.execute("select 1")
+ assert await cur.fetchone() == (1,)
+
+ finally:
+ await p.close()
+
+
+async def test_reopen(pool_cls, dsn):
+ p = pool_cls(dsn)
+ async with p.connection() as conn:
+ await conn.execute("select 1")
+ await p.close()
+ assert p._sched_runner is None
+ assert not p._workers
+
+ with pytest.raises(psycopg.OperationalError, match="cannot be reused"):
+ await p.open()
+
+
+async def test_jitter(pool_cls):
+ rnds = [pool_cls._jitter(30, -0.1, +0.2) for i in range(100)]
+ assert 27 <= min(rnds) <= 28
+ assert 35 < max(rnds) < 36
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+async def test_stats_measures(pool_cls, dsn):
+ async def worker(n):
+ async with p.connection() as conn:
+ await conn.execute("select pg_sleep(0.2)")
+
+ async with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=4) as p:
+ await p.wait(2.0)
+
+ stats = p.get_stats()
+ assert stats["pool_min"] == min_size(pool_cls, 2)
+ assert stats["pool_max"] == 4
+ assert stats["pool_size"] == min_size(pool_cls, 2)
+ assert stats["pool_available"] == min_size(pool_cls, 2)
+ assert stats["requests_waiting"] == 0
+
+ ts = [spawn(worker, args=(i,)) for i in range(3)]
+ await asleep(0.1)
+ stats = p.get_stats()
+ await gather(*ts)
+ assert stats["pool_min"] == min_size(pool_cls, 2)
+ assert stats["pool_max"] == 4
+ assert stats["pool_size"] == 3
+ assert stats["pool_available"] == 0
+ assert stats["requests_waiting"] == 0
+
+ await p.wait(2.0)
+ ts = [spawn(worker, args=(i,)) for i in range(7)]
+ await asleep(0.1)
+ stats = p.get_stats()
+ await gather(*ts)
+ assert stats["pool_min"] == min_size(pool_cls, 2)
+ assert stats["pool_max"] == 4
+ assert stats["pool_size"] == 4
+ assert stats["pool_available"] == 0
+ assert stats["requests_waiting"] == 3
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+async def test_stats_usage(pool_cls, dsn):
+ async def worker(n):
+ try:
+ async with p.connection(timeout=0.3) as conn:
+ await conn.execute("select pg_sleep(0.2)")
+ except pool.PoolTimeout:
+ pass
+
+ async with pool_cls(dsn, min_size=min_size(pool_cls, 3), max_size=3) as p:
+ await p.wait(2.0)
+
+ ts = [spawn(worker, args=(i,)) for i in range(7)]
+ await gather(*ts)
+ stats = p.get_stats()
+ assert stats["requests_num"] == 7
+ assert stats["requests_queued"] == 4
+ assert 850 <= stats["requests_wait_ms"] <= 950
+ assert stats["requests_errors"] == 1
+ assert 1150 <= stats["usage_ms"] <= 1250
+ assert stats.get("returns_bad", 0) == 0
+
+ async with p.connection() as conn:
+ await conn.close()
+ await p.wait()
+ stats = p.pop_stats()
+ assert stats["requests_num"] == 8
+ assert stats["returns_bad"] == 1
+ async with p.connection():
+ pass
+ assert p.get_stats()["requests_num"] == 1
+
+
+async def test_debug_deadlock(pool_cls, dsn):
+ # https://github.com/psycopg/psycopg/issues/230
+ logger = logging.getLogger("psycopg")
+ handler = logging.StreamHandler()
+ old_level = logger.level
+ logger.setLevel(logging.DEBUG)
+ handler.setLevel(logging.DEBUG)
+ logger.addHandler(handler)
+ try:
+ async with pool_cls(dsn, min_size=min_size(pool_cls, 4), open=True) as p:
+ await p.wait(timeout=2)
+ finally:
+ logger.removeHandler(handler)
+ logger.setLevel(old_level)
+
+
+@pytest.mark.skipif(not is_async(__name__), reason="async test only")
+async def test_cancellation_in_queue(pool_cls, dsn):
+ # https://github.com/psycopg/psycopg/issues/509
+
+ nconns = 3
+
+ async with pool_cls(
+ dsn, min_size=min_size(pool_cls, nconns), max_size=nconns, timeout=1
+ ) as p:
+ await p.wait()
+
+ got_conns = []
+ ev = AEvent()
+
+ async def worker(i):
+ try:
+ logging.info("worker %s started", i)
+ nonlocal got_conns
+
+ async with p.connection() as conn:
+ logging.info("worker %s got conn", i)
+ cur = await conn.execute("select 1")
+ assert (await cur.fetchone()) == (1,)
+
+ got_conns.append(conn)
+ if len(got_conns) >= nconns:
+ ev.set()
+
+ await asleep(5)
+
+ except BaseException as ex:
+ logging.info("worker %s stopped: %r", i, ex)
+ raise
+
+ # Start tasks taking up all the connections and getting in the queue
+ tasks = [spawn(worker, (i,)) for i in range(nconns * 3)]
+
+ # wait until the pool has served all the connections and clients are queued.
+ await ev.wait_timeout(3.0)
+ for i in range(10):
+ if p.get_stats().get("requests_queued", 0):
+ break
+ else:
+ await asleep(0.1)
+ else:
+ pytest.fail("no client got in the queue")
+
+ [task.cancel() for task in reversed(tasks)]
+ await gather(*tasks, return_exceptions=True, timeout=1.0)
+
+ stats = p.get_stats()
+ assert stats["pool_available"] == min_size(pool_cls, nconns)
+ assert stats.get("requests_waiting", 0) == 0
+
+ async with p.connection() as conn:
+ cur = await conn.execute("select 1")
+ assert await cur.fetchone() == (1,)
+
+
+def min_size(pool_cls, num=1):
+ """Return the minimum min_size supported by the pool class."""
+ if pool_cls is pool.AsyncConnectionPool:
+ return num
+ elif pool_cls is pool.AsyncNullConnectionPool:
+ return 0
+ else:
+ assert False, pool_cls
+
+
+def delay_connection(monkeypatch, sec):
+ """
+ Return a _connect_gen function delayed by the amount of seconds
+ """
+
+ async def connect_delay(*args, **kwargs):
+ t0 = time()
+ rv = await connect_orig(*args, **kwargs)
+ t1 = time()
+ await asleep(max(0, sec - (t1 - t0)))
+ return rv
+
+ connect_orig = psycopg.AsyncConnection.connect
+ monkeypatch.setattr(psycopg.AsyncConnection, "connect", connect_delay)
+
+
+async def ensure_waiting(p, num=1):
+ """
+ Wait until there are at least *num* clients waiting in the queue.
+ """
+ while len(p._waiting) < num:
+ await asleep(0)
"AsyncGenerator": "Generator",
"AsyncIterator": "Iterator",
"AsyncLibpqWriter": "LibpqWriter",
+ "AsyncNullConnectionPool": "NullConnectionPool",
"AsyncPipeline": "Pipeline",
"AsyncQueuedLibpqWriter": "QueuedLibpqWriter",
"AsyncRawCursor": "RawCursor",
psycopg/psycopg/cursor_async.py \
psycopg_pool/psycopg_pool/sched_async.py \
tests/pool/test_pool_async.py \
+ tests/pool/test_pool_common_async.py \
tests/pool/test_sched_async.py \
tests/test_connection_async.py \
tests/test_copy_async.py \