From: Daniele Varrazzo Date: Tue, 12 Sep 2023 04:48:10 +0000 (+0200) Subject: refactor(tests): generate most pool tests using parametric fixture X-Git-Tag: pool-3.2.0~12^2~32 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=2168f9aebde6f5e206ff52398d52249710713e84;p=thirdparty%2Fpsycopg.git refactor(tests): generate most pool tests using parametric fixture --- diff --git a/tests/pool/test_null_pool_async.py b/tests/pool/test_null_pool_async.py index 55708e50a..56a016397 100644 --- a/tests/pool/test_null_pool_async.py +++ b/tests/pool/test_null_pool_async.py @@ -16,23 +16,18 @@ from .test_pool_async import delay_connection, ensure_waiting pytestmark = [pytest.mark.anyio] try: - from psycopg_pool import AsyncNullConnectionPool # noqa: F401 - from psycopg_pool import PoolClosed, PoolTimeout, TooManyRequests + import psycopg_pool as pool except ImportError: pass -async def test_defaults(dsn): - async with AsyncNullConnectionPool(dsn) as p: +async def test_default_sizes(dsn): + async with pool.AsyncNullConnectionPool(dsn) as p: assert p.min_size == p.max_size == 0 - assert p.timeout == 30 - assert p.max_idle == 10 * 60 - assert p.max_lifetime == 60 * 60 - assert p.num_workers == 3 async def test_min_size_max_size(dsn): - async with AsyncNullConnectionPool(dsn, min_size=0, max_size=2) as p: + async with pool.AsyncNullConnectionPool(dsn, min_size=0, max_size=2) as p: assert p.min_size == 0 assert p.max_size == 2 @@ -40,22 +35,7 @@ async def test_min_size_max_size(dsn): @pytest.mark.parametrize("min_size, max_size", [(1, None), (-1, None), (0, -2)]) async def test_bad_size(dsn, min_size, max_size): with pytest.raises(ValueError): - AsyncNullConnectionPool(min_size=min_size, max_size=max_size) - - -async def test_connection_class(dsn): - class MyConn(psycopg.AsyncConnection[Any]): - pass - - async with AsyncNullConnectionPool(dsn, connection_class=MyConn) as p: - async with p.connection() as conn: - assert isinstance(conn, MyConn) - - -async def test_kwargs(dsn): - async with AsyncNullConnectionPool(dsn, kwargs={"autocommit": True}) as p: - async with p.connection() as conn: - assert conn.autocommit + pool.AsyncNullConnectionPool(min_size=min_size, max_size=max_size) class MyRow(Dict[str, Any]): @@ -69,7 +49,7 @@ async def test_generic_connection_type(dsn): class MyConnection(psycopg.AsyncConnection[Row]): pass - async with AsyncNullConnectionPool( + async with pool.AsyncNullConnectionPool( dsn, connection_class=MyConnection[MyRow], kwargs={"row_factory": class_row(MyRow)}, @@ -78,19 +58,19 @@ async def test_generic_connection_type(dsn): async with p1.connection() as conn1: cur1 = await conn1.execute("select 1 as x") (row1,) = await cur1.fetchall() - assert_type(p1, AsyncNullConnectionPool[MyConnection[MyRow]]) + assert_type(p1, pool.AsyncNullConnectionPool[MyConnection[MyRow]]) assert_type(conn1, MyConnection[MyRow]) assert_type(row1, MyRow) assert conn1.autocommit assert row1 == {"x": 1} - async with AsyncNullConnectionPool( + async with pool.AsyncNullConnectionPool( dsn, connection_class=MyConnection[TupleRow] ) as p2: async with p2.connection() as conn2: cur2 = await conn2.execute("select 2 as y") (row2,) = await cur2.fetchall() - assert_type(p2, AsyncNullConnectionPool[MyConnection[TupleRow]]) + assert_type(p2, pool.AsyncNullConnectionPool[MyConnection[TupleRow]]) assert_type(conn2, MyConnection[TupleRow]) assert_type(row2, TupleRow) assert row2 == (2,) @@ -105,12 +85,12 @@ async def test_non_generic_connection_type(dsn): kwargs["row_factory"] = class_row(MyRow) super().__init__(*args, **kwargs) - async with AsyncNullConnectionPool( + async with pool.AsyncNullConnectionPool( dsn, connection_class=MyConnection, configure=set_autocommit ) as p1: async with p1.connection() as conn1: (row1,) = await (await conn1.execute("select 1 as x")).fetchall() - assert_type(p1, AsyncNullConnectionPool[MyConnection]) + assert_type(p1, pool.AsyncNullConnectionPool[MyConnection]) assert_type(conn1, MyConnection) assert_type(row1, MyRow) assert conn1.autocommit @@ -119,7 +99,7 @@ async def test_non_generic_connection_type(dsn): @pytest.mark.crdb_skip("backend pid") async def test_its_no_pool_at_all(dsn): - async with AsyncNullConnectionPool(dsn, max_size=2) as p: + async with pool.AsyncNullConnectionPool(dsn, max_size=2) as p: async with p.connection() as conn: pid1 = conn.info.backend_pid @@ -130,47 +110,18 @@ async def test_its_no_pool_at_all(dsn): assert conn.info.backend_pid not in (pid1, pid2) -async def test_context(dsn): - async with AsyncNullConnectionPool(dsn) as p: - assert not p.closed - assert p.closed - - @pytest.mark.slow @pytest.mark.timing async def test_wait_ready(dsn, monkeypatch): delay_connection(monkeypatch, 0.2) - with pytest.raises(PoolTimeout): - async with AsyncNullConnectionPool(dsn, num_workers=1) as p: + with pytest.raises(pool.PoolTimeout): + async with pool.AsyncNullConnectionPool(dsn, num_workers=1) as p: await p.wait(0.1) - async with AsyncNullConnectionPool(dsn, num_workers=1) as p: + async with pool.AsyncNullConnectionPool(dsn, num_workers=1) as p: await p.wait(0.4) -async def test_wait_closed(dsn): - async with AsyncNullConnectionPool(dsn) as p: - pass - - with pytest.raises(PoolClosed): - await p.wait() - - -@pytest.mark.slow -async def test_setup_no_timeout(dsn, proxy): - with pytest.raises(PoolTimeout): - async with AsyncNullConnectionPool(proxy.client_dsn, num_workers=1) as p: - await p.wait(0.2) - - async with AsyncNullConnectionPool(proxy.client_dsn, num_workers=1) as p: - await asyncio.sleep(0.5) - assert not p._pool - proxy.start() - - async with p.connection() as conn: - await conn.execute("select 1") - - async def test_configure(dsn): inits = 0 @@ -180,7 +131,7 @@ async def test_configure(dsn): async with conn.transaction(): await conn.execute("set default_transaction_read_only to on") - async with AsyncNullConnectionPool(dsn, configure=configure) as p: + async with pool.AsyncNullConnectionPool(dsn, configure=configure) as p: async with p.connection() as conn: assert inits == 1 res = await conn.execute("show default_transaction_read_only") @@ -198,37 +149,6 @@ async def test_configure(dsn): assert (await res.fetchone())[0] == "on" # type: ignore[index] -@pytest.mark.slow -async def test_configure_badstate(dsn, caplog): - caplog.set_level(logging.WARNING, logger="psycopg.pool") - - async def configure(conn): - await conn.execute("select 1") - - async with AsyncNullConnectionPool(dsn, configure=configure) as p: - with pytest.raises(PoolTimeout): - await p.wait(timeout=0.5) - - assert caplog.records - assert "INTRANS" in caplog.records[0].message - - -@pytest.mark.slow -async def test_configure_broken(dsn, caplog): - caplog.set_level(logging.WARNING, logger="psycopg.pool") - - async def configure(conn): - async with conn.transaction(): - await conn.execute("WAT") - - async with AsyncNullConnectionPool(dsn, configure=configure) as p: - with pytest.raises(PoolTimeout): - await p.wait(timeout=0.5) - - assert caplog.records - assert "WAT" in caplog.records[0].message - - @pytest.mark.crdb_skip("backend pid") async def test_reset(dsn): resets = 0 @@ -252,7 +172,7 @@ async def test_reset(dsn): assert (await cur.fetchone()) == ("UTC",) pids.append(conn.info.backend_pid) - async with AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p: + async with pool.AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p: async with p.connection() as conn: # Queue the worker so it will take the same connection a second time # instead of making a new one. @@ -284,7 +204,7 @@ async def test_reset_badstate(dsn, caplog): await conn.execute("select 1") pids.append(conn.info.backend_pid) - async with AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p: + async with pool.AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p: async with p.connection() as conn: t = create_task(worker()) await ensure_waiting(p) @@ -314,7 +234,7 @@ async def test_reset_broken(dsn, caplog): await conn.execute("select 1") pids.append(conn.info.backend_pid) - async with AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p: + async with pool.AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p: async with p.connection() as conn: t = create_task(worker()) await ensure_waiting(p) @@ -332,171 +252,14 @@ async def test_reset_broken(dsn, caplog): @pytest.mark.slow @pytest.mark.skipif("ver(psycopg.__version__) < ver('3.0.8')") async def test_no_queue_timeout(deaf_port): - async with AsyncNullConnectionPool( + async with pool.AsyncNullConnectionPool( kwargs={"host": "localhost", "port": deaf_port} ) as p: - with pytest.raises(PoolTimeout): + with pytest.raises(pool.PoolTimeout): async with p.connection(timeout=1): pass -@pytest.mark.slow -@pytest.mark.timing -@pytest.mark.crdb_skip("backend pid") -async def test_queue(dsn): - async def worker(n): - t0 = time() - async with p.connection() as conn: - await conn.execute("select pg_sleep(0.2)") - pid = conn.info.backend_pid - t1 = time() - results.append((n, t1 - t0, pid)) - - results: List[Tuple[int, float, int]] = [] - async with AsyncNullConnectionPool(dsn, max_size=2) as p: - await p.wait() - ts = [create_task(worker(i)) for i in range(6)] - await asyncio.gather(*ts) - - times = [item[1] for item in results] - want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6] - for got, want in zip(times, want_times): - assert got == pytest.approx(want, 0.2), times - - assert len(set(r[2] for r in results)) == 2, results - - -@pytest.mark.slow -async def test_queue_size(dsn): - async def worker(t, ev=None): - try: - async with p.connection(): - if ev: - ev.set() - await asyncio.sleep(t) - except TooManyRequests as e: - errors.append(e) - else: - success.append(True) - - errors: List[Exception] = [] - success: List[bool] = [] - - async with AsyncNullConnectionPool(dsn, max_size=1, max_waiting=3) as p: - await p.wait() - ev = asyncio.Event() - create_task(worker(0.3, ev)) - await ev.wait() - - ts = [create_task(worker(0.1)) for i in range(4)] - await asyncio.gather(*ts) - - assert len(success) == 4 - assert len(errors) == 1 - assert isinstance(errors[0], TooManyRequests) - assert p.name in str(errors[0]) - assert str(p.max_waiting) in str(errors[0]) - assert p.get_stats()["requests_errors"] == 1 - - -@pytest.mark.slow -@pytest.mark.timing -@pytest.mark.crdb_skip("backend pid") -async def test_queue_timeout(dsn): - async def worker(n): - t0 = time() - try: - async with p.connection() as conn: - await conn.execute("select pg_sleep(0.2)") - pid = conn.info.backend_pid - except PoolTimeout as e: - t1 = time() - errors.append((n, t1 - t0, e)) - else: - t1 = time() - results.append((n, t1 - t0, pid)) - - results: List[Tuple[int, float, int]] = [] - errors: List[Tuple[int, float, Exception]] = [] - - async with AsyncNullConnectionPool(dsn, max_size=2, timeout=0.1) as p: - ts = [create_task(worker(i)) for i in range(4)] - await asyncio.gather(*ts) - - assert len(results) == 2 - assert len(errors) == 2 - for e in errors: - assert 0.1 < e[1] < 0.15 - - -@pytest.mark.slow -@pytest.mark.timing -async def test_dead_client(dsn): - async def worker(i, timeout): - try: - async with p.connection(timeout=timeout) as conn: - await conn.execute("select pg_sleep(0.3)") - results.append(i) - except PoolTimeout: - if timeout > 0.2: - raise - - async with AsyncNullConnectionPool(dsn, max_size=2) as p: - results: List[int] = [] - ts = [ - create_task(worker(i, timeout)) - for i, timeout in enumerate([0.4, 0.4, 0.1, 0.4, 0.4]) - ] - await asyncio.gather(*ts) - - await asyncio.sleep(0.2) - assert set(results) == set([0, 1, 3, 4]) - - -@pytest.mark.slow -@pytest.mark.timing -@pytest.mark.crdb_skip("backend pid") -async def test_queue_timeout_override(dsn): - async def worker(n): - t0 = time() - timeout = 0.25 if n == 3 else None - try: - async with p.connection(timeout=timeout) as conn: - await conn.execute("select pg_sleep(0.2)") - pid = conn.info.backend_pid - except PoolTimeout as e: - t1 = time() - errors.append((n, t1 - t0, e)) - else: - t1 = time() - results.append((n, t1 - t0, pid)) - - results: List[Tuple[int, float, int]] = [] - errors: List[Tuple[int, float, Exception]] = [] - - async with AsyncNullConnectionPool(dsn, max_size=2, timeout=0.1) as p: - ts = [create_task(worker(i)) for i in range(4)] - await asyncio.gather(*ts) - - assert len(results) == 3 - assert len(errors) == 1 - for e in errors: - assert 0.1 < e[1] < 0.15 - - -@pytest.mark.crdb_skip("backend pid") -async def test_broken_reconnect(dsn): - async with AsyncNullConnectionPool(dsn, max_size=1) as p: - async with p.connection() as conn: - pid1 = conn.info.backend_pid - await conn.close() - - async with p.connection() as conn2: - pid2 = conn2.info.backend_pid - - assert pid1 != pid2 - - @pytest.mark.crdb_skip("backend pid") async def test_intrans_rollback(dsn, caplog): caplog.set_level(logging.WARNING, logger="psycopg.pool") @@ -511,7 +274,7 @@ async def test_intrans_rollback(dsn, caplog): ) assert not await cur.fetchone() - async with AsyncNullConnectionPool(dsn, max_size=1) as p: + async with pool.AsyncNullConnectionPool(dsn, max_size=1) as p: conn = await p.getconn() # Queue the worker so it will take the connection a second time instead @@ -540,7 +303,7 @@ async def test_inerror_rollback(dsn, caplog): pids.append(conn.info.backend_pid) assert conn.info.transaction_status == TransactionStatus.IDLE - async with AsyncNullConnectionPool(dsn, max_size=1) as p: + async with pool.AsyncNullConnectionPool(dsn, max_size=1) as p: conn = await p.getconn() t = create_task(worker()) @@ -569,7 +332,7 @@ async def test_active_close(dsn, caplog): pids.append(conn.info.backend_pid) assert conn.info.transaction_status == TransactionStatus.IDLE - async with AsyncNullConnectionPool(dsn, max_size=1) as p: + async with pool.AsyncNullConnectionPool(dsn, max_size=1) as p: conn = await p.getconn() t = create_task(worker()) @@ -597,7 +360,7 @@ async def test_fail_rollback_close(dsn, caplog, monkeypatch): pids.append(conn.info.backend_pid) assert conn.info.transaction_status == TransactionStatus.IDLE - async with AsyncNullConnectionPool(dsn, max_size=1) as p: + async with pool.AsyncNullConnectionPool(dsn, max_size=1) as p: conn = await p.getconn() t = create_task(worker()) await ensure_waiting(p) @@ -624,170 +387,16 @@ async def test_fail_rollback_close(dsn, caplog, monkeypatch): assert "BAD" in caplog.records[2].message -async def test_close_no_tasks(dsn): - p = AsyncNullConnectionPool(dsn) - assert p._sched_runner and not p._sched_runner.done() - assert p._workers - workers = p._workers[:] - for t in workers: - assert not t.done() - - await p.close() - assert p._sched_runner is None - assert not p._workers - for t in workers: - assert t.done() - - -async def test_putconn_no_pool(aconn_cls, dsn): - async with AsyncNullConnectionPool(dsn) as p: - conn = await aconn_cls.connect(dsn) - with pytest.raises(ValueError): - await p.putconn(conn) - - await conn.close() - - -async def test_putconn_wrong_pool(dsn): - async with AsyncNullConnectionPool(dsn) as p1: - async with AsyncNullConnectionPool(dsn) as p2: - conn = await p1.getconn() - with pytest.raises(ValueError): - await p2.putconn(conn) - - -async def test_closed_getconn(dsn): - p = AsyncNullConnectionPool(dsn) - assert not p.closed - async with p.connection(): - pass - - await p.close() - assert p.closed - - with pytest.raises(PoolClosed): - async with p.connection(): - pass - - async def test_closed_putconn(dsn): - p = AsyncNullConnectionPool(dsn) - - async with p.connection() as conn: - pass - assert conn.closed - - async with p.connection() as conn: - await p.close() - assert conn.closed - - -async def test_closed_queue(dsn): - async def w1(): + async with pool.AsyncNullConnectionPool(dsn) as p: async with p.connection() as conn: - e1.set() # Tell w0 that w1 got a connection - cur = await conn.execute("select 1") - assert await cur.fetchone() == (1,) - await e2.wait() # Wait until w0 has tested w2 - success.append("w1") - - async def w2(): - try: - async with p.connection(): - pass # unexpected - except PoolClosed: - success.append("w2") - - e1 = asyncio.Event() - e2 = asyncio.Event() - - p = AsyncNullConnectionPool(dsn, max_size=1) - await p.wait() - success: List[str] = [] - - t1 = create_task(w1()) - # Wait until w1 has received a connection - await e1.wait() - - t2 = create_task(w2()) - # Wait until w2 is in the queue - await ensure_waiting(p) - await p.close() - - # Wait for the workers to finish - e2.set() - await asyncio.gather(t1, t2) - assert len(success) == 2 - - -async def test_open_explicit(dsn): - p = AsyncNullConnectionPool(dsn, open=False) - assert p.closed - with pytest.raises(PoolClosed): - await p.getconn() - - with pytest.raises(PoolClosed, match="is not open yet"): - async with p.connection(): pass - - await p.open() - try: - assert not p.closed - - async with p.connection() as conn: - cur = await conn.execute("select 1") - assert await cur.fetchone() == (1,) - - finally: - await p.close() - - with pytest.raises(PoolClosed, match="is already closed"): - await p.getconn() - - -async def test_open_context(dsn): - p = AsyncNullConnectionPool(dsn, open=False) - assert p.closed - - async with p: - assert not p.closed - - async with p.connection() as conn: - cur = await conn.execute("select 1") - assert await cur.fetchone() == (1,) - - assert p.closed - - -async def test_open_no_op(dsn): - p = AsyncNullConnectionPool(dsn) - try: - assert not p.closed - await p.open() - assert not p.closed - - async with p.connection() as conn: - cur = await conn.execute("select 1") - assert await cur.fetchone() == (1,) - - finally: - await p.close() - - -async def test_reopen(dsn): - p = AsyncNullConnectionPool(dsn) - async with p.connection() as conn: - await conn.execute("select 1") - await p.close() - assert p._sched_runner is None - - with pytest.raises(psycopg.OperationalError, match="cannot be reused"): - await p.open() + assert conn.closed @pytest.mark.parametrize("min_size, max_size", [(1, None), (-1, None), (0, -2)]) async def test_bad_resize(dsn, min_size, max_size): - async with AsyncNullConnectionPool() as p: + async with pool.AsyncNullConnectionPool() as p: with pytest.raises(ValueError): await p.resize(min_size=min_size, max_size=max_size) @@ -803,7 +412,7 @@ async def test_max_lifetime(dsn): pids.append(conn.info.backend_pid) await asyncio.sleep(0.1) - async with AsyncNullConnectionPool(dsn, max_size=1, max_lifetime=0.2) as p: + async with pool.AsyncNullConnectionPool(dsn, max_size=1, max_lifetime=0.2) as p: ts = [create_task(worker()) for i in range(5)] await asyncio.gather(*ts) @@ -812,88 +421,15 @@ async def test_max_lifetime(dsn): async def test_check(dsn): # no.op - async with AsyncNullConnectionPool(dsn) as p: + async with pool.AsyncNullConnectionPool(dsn) as p: await p.check() -@pytest.mark.slow -@pytest.mark.timing -async def test_stats_measures(dsn): - async def worker(n): - async with p.connection() as conn: - await conn.execute("select pg_sleep(0.2)") - - async with AsyncNullConnectionPool(dsn, max_size=4) as p: - await p.wait(2.0) - - stats = p.get_stats() - assert stats["pool_min"] == 0 - assert stats["pool_max"] == 4 - assert stats["pool_size"] == 0 - assert stats["pool_available"] == 0 - assert stats["requests_waiting"] == 0 - - ts = [create_task(worker(i)) for i in range(3)] - await asyncio.sleep(0.1) - stats = p.get_stats() - await asyncio.gather(*ts) - assert stats["pool_min"] == 0 - assert stats["pool_max"] == 4 - assert stats["pool_size"] == 3 - assert stats["pool_available"] == 0 - assert stats["requests_waiting"] == 0 - - await p.wait(2.0) - ts = [create_task(worker(i)) for i in range(7)] - await asyncio.sleep(0.1) - stats = p.get_stats() - await asyncio.gather(*ts) - assert stats["pool_min"] == 0 - assert stats["pool_max"] == 4 - assert stats["pool_size"] == 4 - assert stats["pool_available"] == 0 - assert stats["requests_waiting"] == 3 - - -@pytest.mark.slow -@pytest.mark.timing -async def test_stats_usage(dsn): - async def worker(n): - try: - async with p.connection(timeout=0.3) as conn: - await conn.execute("select pg_sleep(0.2)") - except PoolTimeout: - pass - - async with AsyncNullConnectionPool(dsn, max_size=3) as p: - await p.wait(2.0) - - ts = [create_task(worker(i)) for i in range(7)] - await asyncio.gather(*ts) - stats = p.get_stats() - assert stats["requests_num"] == 7 - assert stats["requests_queued"] == 4 - assert 850 <= stats["requests_wait_ms"] <= 950 - assert stats["requests_errors"] == 1 - assert 1150 <= stats["usage_ms"] <= 1250 - assert stats.get("returns_bad", 0) == 0 - - async with p.connection() as conn: - await conn.close() - await p.wait() - stats = p.pop_stats() - assert stats["requests_num"] == 8 - assert stats["returns_bad"] == 1 - async with p.connection(): - pass - assert p.get_stats()["requests_num"] == 1 - - @pytest.mark.slow async def test_stats_connect(dsn, proxy, monkeypatch): proxy.start() delay_connection(monkeypatch, 0.2) - async with AsyncNullConnectionPool(proxy.client_dsn, max_size=3) as p: + async with pool.AsyncNullConnectionPool(proxy.client_dsn, max_size=3) as p: await p.wait() stats = p.get_stats() assert stats["connections_num"] == 1 @@ -907,7 +443,7 @@ async def test_cancellation_in_queue(dsn): nconns = 3 - async with AsyncNullConnectionPool( + async with pool.AsyncNullConnectionPool( dsn, min_size=0, max_size=nconns, timeout=1 ) as p: await p.wait() diff --git a/tests/pool/test_pool.py b/tests/pool/test_pool.py index 2032fab89..fc353069e 100644 --- a/tests/pool/test_pool.py +++ b/tests/pool/test_pool.py @@ -13,7 +13,7 @@ from psycopg.pq import TransactionStatus from psycopg.rows import class_row, Row, TupleRow from psycopg._compat import assert_type, Counter -from ..utils import Event, spawn, gather, sleep, is_alive, is_async +from ..utils import Event, spawn, gather, sleep, is_async try: import psycopg_pool as pool @@ -22,13 +22,9 @@ except ImportError: pass -def test_defaults(dsn): +def test_default_sizes(dsn): with pool.ConnectionPool(dsn) as p: assert p.min_size == p.max_size == 4 - assert p.timeout == 30 - assert p.max_idle == 10 * 60 - assert p.max_lifetime == 60 * 60 - assert p.num_workers == 3 @pytest.mark.parametrize("min_size, max_size", [(2, None), (0, 2), (2, 4)]) @@ -44,21 +40,6 @@ def test_bad_size(dsn, min_size, max_size): pool.ConnectionPool(min_size=min_size, max_size=max_size) -def test_connection_class(dsn): - class MyConn(psycopg.Connection[Any]): - pass - - with pool.ConnectionPool(dsn, connection_class=MyConn, min_size=1) as p: - with p.connection() as conn: - assert isinstance(conn, MyConn) - - -def test_kwargs(dsn): - with pool.ConnectionPool(dsn, kwargs={"autocommit": True}, min_size=1) as p: - with p.connection() as conn: - assert conn.autocommit - - class MyRow(Dict[str, Any]): ... @@ -130,12 +111,6 @@ def test_its_really_a_pool(dsn): assert conn.info.backend_pid in (pid1, pid2) -def test_context(dsn): - with pool.ConnectionPool(dsn, min_size=1) as p: - assert not p.closed - assert p.closed - - @pytest.mark.crdb_skip("backend pid") def test_connection_not_lost(dsn): with pool.ConnectionPool(dsn, min_size=1) as p: @@ -187,29 +162,6 @@ def test_wait_ready(dsn, monkeypatch): p.wait(0.0001) # idempotent -def test_wait_closed(dsn): - with pool.ConnectionPool(dsn) as p: - pass - - with pytest.raises(pool.PoolClosed): - p.wait() - - -@pytest.mark.slow -def test_setup_no_timeout(dsn, proxy): - with pytest.raises(pool.PoolTimeout): - with pool.ConnectionPool(proxy.client_dsn, min_size=1, num_workers=1) as p: - p.wait(0.2) - - with pool.ConnectionPool(proxy.client_dsn, min_size=1, num_workers=1) as p: - sleep(0.5) - assert not p._pool - proxy.start() - - with p.connection() as conn: - conn.execute("select 1") - - def test_configure(dsn): inits = 0 @@ -238,37 +190,6 @@ def test_configure(dsn): assert res.fetchone()[0] == "on" # type: ignore[index] -@pytest.mark.slow -def test_configure_badstate(dsn, caplog): - caplog.set_level(logging.WARNING, logger="psycopg.pool") - - def configure(conn): - conn.execute("select 1") - - with pool.ConnectionPool(dsn, min_size=1, configure=configure) as p: - with pytest.raises(pool.PoolTimeout): - p.wait(timeout=0.5) - - assert caplog.records - assert "INTRANS" in caplog.records[0].message - - -@pytest.mark.slow -def test_configure_broken(dsn, caplog): - caplog.set_level(logging.WARNING, logger="psycopg.pool") - - def configure(conn): - with conn.transaction(): - conn.execute("WAT") - - with pool.ConnectionPool(dsn, min_size=1, configure=configure) as p: - with pytest.raises(pool.PoolTimeout): - p.wait(timeout=0.5) - - assert caplog.records - assert "WAT" in caplog.records[0].message - - def test_reset(dsn): resets = 0 @@ -341,164 +262,6 @@ def test_reset_broken(dsn, caplog): assert "WAT" in caplog.records[0].message -@pytest.mark.slow -@pytest.mark.timing -@pytest.mark.crdb_skip("backend pid") -def test_queue(dsn): - def worker(n): - t0 = time() - with p.connection() as conn: - conn.execute("select pg_sleep(0.2)") - pid = conn.info.backend_pid - t1 = time() - results.append((n, t1 - t0, pid)) - - results: List[Tuple[int, float, int]] = [] - with pool.ConnectionPool(dsn, min_size=2) as p: - p.wait() - ts = [spawn(worker, args=(i,)) for i in range(6)] - gather(*ts) - - times = [item[1] for item in results] - want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6] - for got, want in zip(times, want_times): - assert got == pytest.approx(want, 0.1), times - - assert len(set((r[2] for r in results))) == 2, results - - -@pytest.mark.slow -def test_queue_size(dsn): - def worker(t, ev=None): - try: - with p.connection(): - if ev: - ev.set() - sleep(t) - except pool.TooManyRequests as e: - errors.append(e) - else: - success.append(True) - - errors: List[Exception] = [] - success: List[bool] = [] - - with pool.ConnectionPool(dsn, min_size=1, max_waiting=3) as p: - p.wait() - ev = Event() - spawn(worker, args=(0.3, ev)) - ev.wait() - - ts = [spawn(worker, args=(0.1,)) for i in range(4)] - gather(*ts) - - assert len(success) == 4 - assert len(errors) == 1 - assert isinstance(errors[0], pool.TooManyRequests) - assert p.name in str(errors[0]) - assert str(p.max_waiting) in str(errors[0]) - assert p.get_stats()["requests_errors"] == 1 - - -@pytest.mark.slow -@pytest.mark.timing -@pytest.mark.crdb_skip("backend pid") -def test_queue_timeout(dsn): - def worker(n): - t0 = time() - try: - with p.connection() as conn: - conn.execute("select pg_sleep(0.2)") - pid = conn.info.backend_pid - except pool.PoolTimeout as e: - t1 = time() - errors.append((n, t1 - t0, e)) - else: - t1 = time() - results.append((n, t1 - t0, pid)) - - results: List[Tuple[int, float, int]] = [] - errors: List[Tuple[int, float, Exception]] = [] - - with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p: - ts = [spawn(worker, args=(i,)) for i in range(4)] - gather(*ts) - - assert len(results) == 2 - assert len(errors) == 2 - for e in errors: - assert 0.1 < e[1] < 0.15 - - -@pytest.mark.slow -@pytest.mark.timing -def test_dead_client(dsn): - def worker(i, timeout): - try: - with p.connection(timeout=timeout) as conn: - conn.execute("select pg_sleep(0.3)") - results.append(i) - except pool.PoolTimeout: - if timeout > 0.2: - raise - - with pool.ConnectionPool(dsn, min_size=2) as p: - results: List[int] = [] - ts = [ - spawn(worker, args=(i, timeout)) - for (i, timeout) in enumerate([0.4, 0.4, 0.1, 0.4, 0.4]) - ] - gather(*ts) - - sleep(0.2) - assert set(results) == set([0, 1, 3, 4]) - assert len(p._pool) == 2 # no connection was lost - - -@pytest.mark.slow -@pytest.mark.timing -@pytest.mark.crdb_skip("backend pid") -def test_queue_timeout_override(dsn): - def worker(n): - t0 = time() - timeout = 0.25 if n == 3 else None - try: - with p.connection(timeout=timeout) as conn: - conn.execute("select pg_sleep(0.2)") - pid = conn.info.backend_pid - except pool.PoolTimeout as e: - t1 = time() - errors.append((n, t1 - t0, e)) - else: - t1 = time() - results.append((n, t1 - t0, pid)) - - results: List[Tuple[int, float, int]] = [] - errors: List[Tuple[int, float, Exception]] = [] - - with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p: - ts = [spawn(worker, args=(i,)) for i in range(4)] - gather(*ts) - - assert len(results) == 3 - assert len(errors) == 1 - for e in errors: - assert 0.1 < e[1] < 0.15 - - -@pytest.mark.crdb_skip("backend pid") -def test_broken_reconnect(dsn): - with pool.ConnectionPool(dsn, min_size=1) as p: - with p.connection() as conn: - pid1 = conn.info.backend_pid - conn.close() - - with p.connection() as conn2: - pid2 = conn2.info.backend_pid - - assert pid1 != pid2 - - @pytest.mark.crdb_skip("backend pid") def test_intrans_rollback(dsn, caplog): caplog.set_level(logging.WARNING, logger="psycopg.pool") @@ -594,38 +357,6 @@ def test_fail_rollback_close(dsn, caplog, monkeypatch): assert "BAD" in caplog.records[2].message -def test_close_no_tasks(dsn): - p = pool.ConnectionPool(dsn) - assert p._sched_runner and is_alive(p._sched_runner) - workers = p._workers[:] - assert workers - for t in workers: - assert is_alive(t) - - p.close() - assert p._sched_runner is None - assert not p._workers - for t in workers: - assert not is_alive(t) - - -def test_putconn_no_pool(conn_cls, dsn): - with pool.ConnectionPool(dsn, min_size=1) as p: - conn = conn_cls.connect(dsn) - with pytest.raises(ValueError): - p.putconn(conn) - - conn.close() - - -def test_putconn_wrong_pool(dsn): - with pool.ConnectionPool(dsn, min_size=1) as p1: - with pool.ConnectionPool(dsn, min_size=1) as p2: - conn = p1.getconn() - with pytest.raises(ValueError): - p2.putconn(conn) - - def test_del_no_warning(dsn, recwarn): p = pool.ConnectionPool(dsn, min_size=2) with p.connection() as conn: @@ -638,132 +369,11 @@ def test_del_no_warning(dsn, recwarn): assert not recwarn, [str(w.message) for w in recwarn.list] -@pytest.mark.slow -@pytest.mark.skipif(is_async(__name__), reason="sync test only") -def test_del_stops_threads(dsn): - p = pool.ConnectionPool(dsn) - assert p._sched_runner is not None - ts = [p._sched_runner] + p._workers - del p - sleep(0.1) - for t in ts: - assert not is_alive(t), t - - -def test_closed_getconn(dsn): - p = pool.ConnectionPool(dsn, min_size=1) - assert not p.closed - with p.connection(): - pass - - p.close() - assert p.closed - - with pytest.raises(pool.PoolClosed): - with p.connection(): - pass - - def test_closed_putconn(dsn): - p = pool.ConnectionPool(dsn, min_size=1) - - with p.connection() as conn: - pass - assert not conn.closed - - with p.connection() as conn: - p.close() - assert conn.closed - - -def test_closed_queue(dsn): - def w1(): + with pool.ConnectionPool(dsn, min_size=1) as p: with p.connection() as conn: - e1.set() # Tell w0 that w1 got a connection - cur = conn.execute("select 1") - assert cur.fetchone() == (1,) - e2.wait() # Wait until w0 has tested w2 - success.append("w1") - - def w2(): - try: - with p.connection(): - pass # unexpected - except pool.PoolClosed: - success.append("w2") - - e1 = Event() - e2 = Event() - - p = pool.ConnectionPool(dsn, min_size=1) - p.wait() - success: List[str] = [] - - t1 = spawn(w1) - # Wait until w1 has received a connection - e1.wait() - - t2 = spawn(w2) - # Wait until w2 is in the queue - ensure_waiting(p) - p.close() - - # Wait for the workers to finish - e2.set() - gather(t1, t2) - assert len(success) == 2 - - -def test_open_explicit(dsn): - p = pool.ConnectionPool(dsn, open=False) - assert p.closed - with pytest.raises(pool.PoolClosed, match="is not open yet"): - p.getconn() - - with pytest.raises(pool.PoolClosed, match="is not open yet"): - with p.connection(): pass - - p.open() - try: - assert not p.closed - - with p.connection() as conn: - cur = conn.execute("select 1") - assert cur.fetchone() == (1,) - finally: - p.close() - - with pytest.raises(pool.PoolClosed, match="is already closed"): - p.getconn() - - -def test_open_context(dsn): - p = pool.ConnectionPool(dsn, open=False) - assert p.closed - - with p: - assert not p.closed - - with p.connection() as conn: - cur = conn.execute("select 1") - assert cur.fetchone() == (1,) - - assert p.closed - - -def test_open_no_op(dsn): - p = pool.ConnectionPool(dsn) - try: - assert not p.closed - p.open() - assert not p.closed - - with p.connection() as conn: - cur = conn.execute("select 1") - assert cur.fetchone() == (1,) - finally: - p.close() + assert not conn.closed @pytest.mark.slow @@ -796,18 +406,6 @@ def test_open_as_wait(dsn, monkeypatch): p.open(wait=True, timeout=0.5) -def test_reopen(dsn): - p = pool.ConnectionPool(dsn) - with p.connection() as conn: - conn.execute("select 1") - p.close() - assert p._sched_runner is None - assert not p._workers - - with pytest.raises(psycopg.OperationalError, match="cannot be reused"): - p.open() - - @pytest.mark.slow @pytest.mark.timing @pytest.mark.parametrize( @@ -1084,12 +682,6 @@ def test_bad_resize(dsn, min_size, max_size): p.resize(min_size=min_size, max_size=max_size) -def test_jitter(): - rnds = [pool.ConnectionPool._jitter(30, -0.1, +0.2) for i in range(100)] - assert 27 <= min(rnds) <= 28 - assert 35 < max(rnds) < 36 - - @pytest.mark.slow @pytest.mark.timing @pytest.mark.crdb_skip("backend pid") @@ -1148,79 +740,6 @@ def test_check_max_lifetime(dsn): assert conn.info.backend_pid != pid -@pytest.mark.slow -@pytest.mark.timing -def test_stats_measures(dsn): - def worker(n): - with p.connection() as conn: - conn.execute("select pg_sleep(0.2)") - - with pool.ConnectionPool(dsn, min_size=2, max_size=4) as p: - p.wait(2.0) - - stats = p.get_stats() - assert stats["pool_min"] == 2 - assert stats["pool_max"] == 4 - assert stats["pool_size"] == 2 - assert stats["pool_available"] == 2 - assert stats["requests_waiting"] == 0 - - ts = [spawn(worker, args=(i,)) for i in range(3)] - sleep(0.1) - stats = p.get_stats() - gather(*ts) - assert stats["pool_min"] == 2 - assert stats["pool_max"] == 4 - assert stats["pool_size"] == 3 - assert stats["pool_available"] == 0 - assert stats["requests_waiting"] == 0 - - p.wait(2.0) - ts = [spawn(worker, args=(i,)) for i in range(7)] - sleep(0.1) - stats = p.get_stats() - gather(*ts) - assert stats["pool_min"] == 2 - assert stats["pool_max"] == 4 - assert stats["pool_size"] == 4 - assert stats["pool_available"] == 0 - assert stats["requests_waiting"] == 3 - - -@pytest.mark.slow -@pytest.mark.timing -def test_stats_usage(dsn): - def worker(n): - try: - with p.connection(timeout=0.3) as conn: - conn.execute("select pg_sleep(0.2)") - except pool.PoolTimeout: - pass - - with pool.ConnectionPool(dsn, min_size=3) as p: - p.wait(2.0) - - ts = [spawn(worker, args=(i,)) for i in range(7)] - gather(*ts) - stats = p.get_stats() - assert stats["requests_num"] == 7 - assert stats["requests_queued"] == 4 - assert 850 <= stats["requests_wait_ms"] <= 950 - assert stats["requests_errors"] == 1 - assert 1150 <= stats["usage_ms"] <= 1250 - assert stats.get("returns_bad", 0) == 0 - - with p.connection() as conn: - conn.close() - p.wait() - stats = p.pop_stats() - assert stats["requests_num"] == 8 - assert stats["returns_bad"] == 1 - with p.connection(): - pass - assert p.get_stats()["requests_num"] == 1 - - @pytest.mark.slow def test_stats_connect(dsn, proxy, monkeypatch): proxy.start() diff --git a/tests/pool/test_pool_async.py b/tests/pool/test_pool_async.py index 92a8f7bf5..ef6db8d22 100644 --- a/tests/pool/test_pool_async.py +++ b/tests/pool/test_pool_async.py @@ -10,7 +10,7 @@ from psycopg.pq import TransactionStatus from psycopg.rows import class_row, Row, TupleRow from psycopg._compat import assert_type, Counter -from ..utils import AEvent, spawn, gather, asleep, is_alive, is_async +from ..utils import AEvent, spawn, gather, asleep, is_async try: import psycopg_pool as pool @@ -22,13 +22,9 @@ if True: # ASYNC pytestmark = [pytest.mark.anyio] -async def test_defaults(dsn): +async def test_default_sizes(dsn): async with pool.AsyncConnectionPool(dsn) as p: assert p.min_size == p.max_size == 4 - assert p.timeout == 30 - assert p.max_idle == 10 * 60 - assert p.max_lifetime == 60 * 60 - assert p.num_workers == 3 @pytest.mark.parametrize("min_size, max_size", [(2, None), (0, 2), (2, 4)]) @@ -44,23 +40,6 @@ async def test_bad_size(dsn, min_size, max_size): pool.AsyncConnectionPool(min_size=min_size, max_size=max_size) -async def test_connection_class(dsn): - class MyConn(psycopg.AsyncConnection[Any]): - pass - - async with pool.AsyncConnectionPool(dsn, connection_class=MyConn, min_size=1) as p: - async with p.connection() as conn: - assert isinstance(conn, MyConn) - - -async def test_kwargs(dsn): - async with pool.AsyncConnectionPool( - dsn, kwargs={"autocommit": True}, min_size=1 - ) as p: - async with p.connection() as conn: - assert conn.autocommit - - class MyRow(Dict[str, Any]): ... @@ -136,12 +115,6 @@ async def test_its_really_a_pool(dsn): assert conn.info.backend_pid in (pid1, pid2) -async def test_context(dsn): - async with pool.AsyncConnectionPool(dsn, min_size=1) as p: - assert not p.closed - assert p.closed - - @pytest.mark.crdb_skip("backend pid") async def test_connection_not_lost(dsn): async with pool.AsyncConnectionPool(dsn, min_size=1) as p: @@ -193,33 +166,6 @@ async def test_wait_ready(dsn, monkeypatch): await p.wait(0.0001) # idempotent -async def test_wait_closed(dsn): - async with pool.AsyncConnectionPool(dsn) as p: - pass - - with pytest.raises(pool.PoolClosed): - await p.wait() - - -@pytest.mark.slow -async def test_setup_no_timeout(dsn, proxy): - with pytest.raises(pool.PoolTimeout): - async with pool.AsyncConnectionPool( - proxy.client_dsn, min_size=1, num_workers=1 - ) as p: - await p.wait(0.2) - - async with pool.AsyncConnectionPool( - proxy.client_dsn, min_size=1, num_workers=1 - ) as p: - await asleep(0.5) - assert not p._pool - proxy.start() - - async with p.connection() as conn: - await conn.execute("select 1") - - async def test_configure(dsn): inits = 0 @@ -248,37 +194,6 @@ async def test_configure(dsn): assert (await res.fetchone())[0] == "on" # type: ignore[index] -@pytest.mark.slow -async def test_configure_badstate(dsn, caplog): - caplog.set_level(logging.WARNING, logger="psycopg.pool") - - async def configure(conn): - await conn.execute("select 1") - - async with pool.AsyncConnectionPool(dsn, min_size=1, configure=configure) as p: - with pytest.raises(pool.PoolTimeout): - await p.wait(timeout=0.5) - - assert caplog.records - assert "INTRANS" in caplog.records[0].message - - -@pytest.mark.slow -async def test_configure_broken(dsn, caplog): - caplog.set_level(logging.WARNING, logger="psycopg.pool") - - async def configure(conn): - async with conn.transaction(): - await conn.execute("WAT") - - async with pool.AsyncConnectionPool(dsn, min_size=1, configure=configure) as p: - with pytest.raises(pool.PoolTimeout): - await p.wait(timeout=0.5) - - assert caplog.records - assert "WAT" in caplog.records[0].message - - async def test_reset(dsn): resets = 0 @@ -351,164 +266,6 @@ async def test_reset_broken(dsn, caplog): assert "WAT" in caplog.records[0].message -@pytest.mark.slow -@pytest.mark.timing -@pytest.mark.crdb_skip("backend pid") -async def test_queue(dsn): - async def worker(n): - t0 = time() - async with p.connection() as conn: - await conn.execute("select pg_sleep(0.2)") - pid = conn.info.backend_pid - t1 = time() - results.append((n, t1 - t0, pid)) - - results: List[Tuple[int, float, int]] = [] - async with pool.AsyncConnectionPool(dsn, min_size=2) as p: - await p.wait() - ts = [spawn(worker, args=(i,)) for i in range(6)] - await gather(*ts) - - times = [item[1] for item in results] - want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6] - for got, want in zip(times, want_times): - assert got == pytest.approx(want, 0.1), times - - assert len(set(r[2] for r in results)) == 2, results - - -@pytest.mark.slow -async def test_queue_size(dsn): - async def worker(t, ev=None): - try: - async with p.connection(): - if ev: - ev.set() - await asleep(t) - except pool.TooManyRequests as e: - errors.append(e) - else: - success.append(True) - - errors: List[Exception] = [] - success: List[bool] = [] - - async with pool.AsyncConnectionPool(dsn, min_size=1, max_waiting=3) as p: - await p.wait() - ev = AEvent() - spawn(worker, args=(0.3, ev)) - await ev.wait() - - ts = [spawn(worker, args=(0.1,)) for i in range(4)] - await gather(*ts) - - assert len(success) == 4 - assert len(errors) == 1 - assert isinstance(errors[0], pool.TooManyRequests) - assert p.name in str(errors[0]) - assert str(p.max_waiting) in str(errors[0]) - assert p.get_stats()["requests_errors"] == 1 - - -@pytest.mark.slow -@pytest.mark.timing -@pytest.mark.crdb_skip("backend pid") -async def test_queue_timeout(dsn): - async def worker(n): - t0 = time() - try: - async with p.connection() as conn: - await conn.execute("select pg_sleep(0.2)") - pid = conn.info.backend_pid - except pool.PoolTimeout as e: - t1 = time() - errors.append((n, t1 - t0, e)) - else: - t1 = time() - results.append((n, t1 - t0, pid)) - - results: List[Tuple[int, float, int]] = [] - errors: List[Tuple[int, float, Exception]] = [] - - async with pool.AsyncConnectionPool(dsn, min_size=2, timeout=0.1) as p: - ts = [spawn(worker, args=(i,)) for i in range(4)] - await gather(*ts) - - assert len(results) == 2 - assert len(errors) == 2 - for e in errors: - assert 0.1 < e[1] < 0.15 - - -@pytest.mark.slow -@pytest.mark.timing -async def test_dead_client(dsn): - async def worker(i, timeout): - try: - async with p.connection(timeout=timeout) as conn: - await conn.execute("select pg_sleep(0.3)") - results.append(i) - except pool.PoolTimeout: - if timeout > 0.2: - raise - - async with pool.AsyncConnectionPool(dsn, min_size=2) as p: - results: List[int] = [] - ts = [ - spawn(worker, args=(i, timeout)) - for i, timeout in enumerate([0.4, 0.4, 0.1, 0.4, 0.4]) - ] - await gather(*ts) - - await asleep(0.2) - assert set(results) == set([0, 1, 3, 4]) - assert len(p._pool) == 2 # no connection was lost - - -@pytest.mark.slow -@pytest.mark.timing -@pytest.mark.crdb_skip("backend pid") -async def test_queue_timeout_override(dsn): - async def worker(n): - t0 = time() - timeout = 0.25 if n == 3 else None - try: - async with p.connection(timeout=timeout) as conn: - await conn.execute("select pg_sleep(0.2)") - pid = conn.info.backend_pid - except pool.PoolTimeout as e: - t1 = time() - errors.append((n, t1 - t0, e)) - else: - t1 = time() - results.append((n, t1 - t0, pid)) - - results: List[Tuple[int, float, int]] = [] - errors: List[Tuple[int, float, Exception]] = [] - - async with pool.AsyncConnectionPool(dsn, min_size=2, timeout=0.1) as p: - ts = [spawn(worker, args=(i,)) for i in range(4)] - await gather(*ts) - - assert len(results) == 3 - assert len(errors) == 1 - for e in errors: - assert 0.1 < e[1] < 0.15 - - -@pytest.mark.crdb_skip("backend pid") -async def test_broken_reconnect(dsn): - async with pool.AsyncConnectionPool(dsn, min_size=1) as p: - async with p.connection() as conn: - pid1 = conn.info.backend_pid - await conn.close() - - async with p.connection() as conn2: - pid2 = conn2.info.backend_pid - - assert pid1 != pid2 - - @pytest.mark.crdb_skip("backend pid") async def test_intrans_rollback(dsn, caplog): caplog.set_level(logging.WARNING, logger="psycopg.pool") @@ -604,38 +361,6 @@ async def test_fail_rollback_close(dsn, caplog, monkeypatch): assert "BAD" in caplog.records[2].message -async def test_close_no_tasks(dsn): - p = pool.AsyncConnectionPool(dsn) - assert p._sched_runner and is_alive(p._sched_runner) - workers = p._workers[:] - assert workers - for t in workers: - assert is_alive(t) - - await p.close() - assert p._sched_runner is None - assert not p._workers - for t in workers: - assert not is_alive(t) - - -async def test_putconn_no_pool(aconn_cls, dsn): - async with pool.AsyncConnectionPool(dsn, min_size=1) as p: - conn = await aconn_cls.connect(dsn) - with pytest.raises(ValueError): - await p.putconn(conn) - - await conn.close() - - -async def test_putconn_wrong_pool(dsn): - async with pool.AsyncConnectionPool(dsn, min_size=1) as p1: - async with pool.AsyncConnectionPool(dsn, min_size=1) as p2: - conn = await p1.getconn() - with pytest.raises(ValueError): - await p2.putconn(conn) - - async def test_del_no_warning(dsn, recwarn): p = pool.AsyncConnectionPool(dsn, min_size=2) async with p.connection() as conn: @@ -648,134 +373,11 @@ async def test_del_no_warning(dsn, recwarn): assert not recwarn, [str(w.message) for w in recwarn.list] -@pytest.mark.slow -@pytest.mark.skipif(is_async(__name__), reason="sync test only") -async def test_del_stops_threads(dsn): - p = pool.AsyncConnectionPool(dsn) - assert p._sched_runner is not None - ts = [p._sched_runner] + p._workers - del p - await asleep(0.1) - for t in ts: - assert not is_alive(t), t - - -async def test_closed_getconn(dsn): - p = pool.AsyncConnectionPool(dsn, min_size=1) - assert not p.closed - async with p.connection(): - pass - - await p.close() - assert p.closed - - with pytest.raises(pool.PoolClosed): - async with p.connection(): - pass - - async def test_closed_putconn(dsn): - p = pool.AsyncConnectionPool(dsn, min_size=1) - - async with p.connection() as conn: - pass - assert not conn.closed - - async with p.connection() as conn: - await p.close() - assert conn.closed - - -async def test_closed_queue(dsn): - async def w1(): + async with pool.AsyncConnectionPool(dsn, min_size=1) as p: async with p.connection() as conn: - e1.set() # Tell w0 that w1 got a connection - cur = await conn.execute("select 1") - assert await cur.fetchone() == (1,) - await e2.wait() # Wait until w0 has tested w2 - success.append("w1") - - async def w2(): - try: - async with p.connection(): - pass # unexpected - except pool.PoolClosed: - success.append("w2") - - e1 = AEvent() - e2 = AEvent() - - p = pool.AsyncConnectionPool(dsn, min_size=1) - await p.wait() - success: List[str] = [] - - t1 = spawn(w1) - # Wait until w1 has received a connection - await e1.wait() - - t2 = spawn(w2) - # Wait until w2 is in the queue - await ensure_waiting(p) - await p.close() - - # Wait for the workers to finish - e2.set() - await gather(t1, t2) - assert len(success) == 2 - - -async def test_open_explicit(dsn): - p = pool.AsyncConnectionPool(dsn, open=False) - assert p.closed - with pytest.raises(pool.PoolClosed, match="is not open yet"): - await p.getconn() - - with pytest.raises(pool.PoolClosed, match="is not open yet"): - async with p.connection(): pass - - await p.open() - try: - assert not p.closed - - async with p.connection() as conn: - cur = await conn.execute("select 1") - assert await cur.fetchone() == (1,) - - finally: - await p.close() - - with pytest.raises(pool.PoolClosed, match="is already closed"): - await p.getconn() - - -async def test_open_context(dsn): - p = pool.AsyncConnectionPool(dsn, open=False) - assert p.closed - - async with p: - assert not p.closed - - async with p.connection() as conn: - cur = await conn.execute("select 1") - assert await cur.fetchone() == (1,) - - assert p.closed - - -async def test_open_no_op(dsn): - p = pool.AsyncConnectionPool(dsn) - try: - assert not p.closed - await p.open() - assert not p.closed - - async with p.connection() as conn: - cur = await conn.execute("select 1") - assert await cur.fetchone() == (1,) - - finally: - await p.close() + assert not conn.closed @pytest.mark.slow @@ -808,18 +410,6 @@ async def test_open_as_wait(dsn, monkeypatch): await p.open(wait=True, timeout=0.5) -async def test_reopen(dsn): - p = pool.AsyncConnectionPool(dsn) - async with p.connection() as conn: - await conn.execute("select 1") - await p.close() - assert p._sched_runner is None - assert not p._workers - - with pytest.raises(psycopg.OperationalError, match="cannot be reused"): - await p.open() - - @pytest.mark.slow @pytest.mark.timing @pytest.mark.parametrize( @@ -1098,12 +688,6 @@ async def test_bad_resize(dsn, min_size, max_size): await p.resize(min_size=min_size, max_size=max_size) -async def test_jitter(): - rnds = [pool.AsyncConnectionPool._jitter(30, -0.1, +0.2) for i in range(100)] - assert 27 <= min(rnds) <= 28 - assert 35 < max(rnds) < 36 - - @pytest.mark.slow @pytest.mark.timing @pytest.mark.crdb_skip("backend pid") @@ -1162,79 +746,6 @@ async def test_check_max_lifetime(dsn): assert conn.info.backend_pid != pid -@pytest.mark.slow -@pytest.mark.timing -async def test_stats_measures(dsn): - async def worker(n): - async with p.connection() as conn: - await conn.execute("select pg_sleep(0.2)") - - async with pool.AsyncConnectionPool(dsn, min_size=2, max_size=4) as p: - await p.wait(2.0) - - stats = p.get_stats() - assert stats["pool_min"] == 2 - assert stats["pool_max"] == 4 - assert stats["pool_size"] == 2 - assert stats["pool_available"] == 2 - assert stats["requests_waiting"] == 0 - - ts = [spawn(worker, args=(i,)) for i in range(3)] - await asleep(0.1) - stats = p.get_stats() - await gather(*ts) - assert stats["pool_min"] == 2 - assert stats["pool_max"] == 4 - assert stats["pool_size"] == 3 - assert stats["pool_available"] == 0 - assert stats["requests_waiting"] == 0 - - await p.wait(2.0) - ts = [spawn(worker, args=(i,)) for i in range(7)] - await asleep(0.1) - stats = p.get_stats() - await gather(*ts) - assert stats["pool_min"] == 2 - assert stats["pool_max"] == 4 - assert stats["pool_size"] == 4 - assert stats["pool_available"] == 0 - assert stats["requests_waiting"] == 3 - - -@pytest.mark.slow -@pytest.mark.timing -async def test_stats_usage(dsn): - async def worker(n): - try: - async with p.connection(timeout=0.3) as conn: - await conn.execute("select pg_sleep(0.2)") - except pool.PoolTimeout: - pass - - async with pool.AsyncConnectionPool(dsn, min_size=3) as p: - await p.wait(2.0) - - ts = [spawn(worker, args=(i,)) for i in range(7)] - await gather(*ts) - stats = p.get_stats() - assert stats["requests_num"] == 7 - assert stats["requests_queued"] == 4 - assert 850 <= stats["requests_wait_ms"] <= 950 - assert stats["requests_errors"] == 1 - assert 1150 <= stats["usage_ms"] <= 1250 - assert stats.get("returns_bad", 0) == 0 - - async with p.connection() as conn: - await conn.close() - await p.wait() - stats = p.pop_stats() - assert stats["requests_num"] == 8 - assert stats["returns_bad"] == 1 - async with p.connection(): - pass - assert p.get_stats()["requests_num"] == 1 - - @pytest.mark.slow async def test_stats_connect(dsn, proxy, monkeypatch): proxy.start() diff --git a/tests/pool/test_pool_common.py b/tests/pool/test_pool_common.py new file mode 100644 index 000000000..1ae013eff --- /dev/null +++ b/tests/pool/test_pool_common.py @@ -0,0 +1,621 @@ +# WARNING: this file is auto-generated by 'async_to_sync.py' +# from the original file 'test_pool_common_async.py' +# DO NOT CHANGE! Change the original file instead. +import logging +from time import time +from typing import Any, List, Tuple + +import pytest + +import psycopg + +from ..utils import Event, spawn, gather, sleep, is_alive, is_async + +try: + import psycopg_pool as pool +except ImportError: + # Tests should have been skipped if the package is not available + pass + + +@pytest.fixture(params=[pool.ConnectionPool, pool.NullConnectionPool]) +def pool_cls(request): + return request.param + + +def test_defaults(pool_cls, dsn): + with pool_cls(dsn) as p: + assert p.timeout == 30 + assert p.max_idle == 10 * 60 + assert p.max_lifetime == 60 * 60 + assert p.num_workers == 3 + + +def test_connection_class(pool_cls, dsn): + class MyConn(psycopg.Connection[Any]): + pass + + with pool_cls(dsn, connection_class=MyConn, min_size=min_size(pool_cls)) as p: + with p.connection() as conn: + assert isinstance(conn, MyConn) + + +def test_kwargs(pool_cls, dsn): + with pool_cls(dsn, kwargs={"autocommit": True}, min_size=min_size(pool_cls)) as p: + with p.connection() as conn: + assert conn.autocommit + + +def test_context(pool_cls, dsn): + with pool_cls(dsn, min_size=min_size(pool_cls)) as p: + assert not p.closed + assert p.closed + + +def test_wait_closed(pool_cls, dsn): + with pool_cls(dsn) as p: + pass + + with pytest.raises(pool.PoolClosed): + p.wait() + + +@pytest.mark.slow +def test_setup_no_timeout(pool_cls, dsn, proxy): + with pytest.raises(pool.PoolTimeout): + with pool_cls( + proxy.client_dsn, min_size=min_size(pool_cls), num_workers=1 + ) as p: + p.wait(0.2) + + with pool_cls(proxy.client_dsn, min_size=min_size(pool_cls), num_workers=1) as p: + sleep(0.5) + assert not p._pool + proxy.start() + + with p.connection() as conn: + conn.execute("select 1") + + +@pytest.mark.slow +def test_configure_badstate(pool_cls, dsn, caplog): + caplog.set_level(logging.WARNING, logger="psycopg.pool") + + def configure(conn): + conn.execute("select 1") + + with pool_cls(dsn, min_size=min_size(pool_cls), configure=configure) as p: + with pytest.raises(pool.PoolTimeout): + p.wait(timeout=0.5) + + assert caplog.records + assert "INTRANS" in caplog.records[0].message + + +@pytest.mark.slow +def test_configure_broken(pool_cls, dsn, caplog): + caplog.set_level(logging.WARNING, logger="psycopg.pool") + + def configure(conn): + with conn.transaction(): + conn.execute("WAT") + + with pool_cls(dsn, min_size=min_size(pool_cls), configure=configure) as p: + with pytest.raises(pool.PoolTimeout): + p.wait(timeout=0.5) + + assert caplog.records + assert "WAT" in caplog.records[0].message + + +@pytest.mark.slow +@pytest.mark.timing +@pytest.mark.crdb_skip("backend pid") +def test_queue(pool_cls, dsn): + def worker(n): + t0 = time() + with p.connection() as conn: + conn.execute("select pg_sleep(0.2)") + pid = conn.info.backend_pid + t1 = time() + results.append((n, t1 - t0, pid)) + + results: List[Tuple[int, float, int]] = [] + with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2) as p: + p.wait() + ts = [spawn(worker, args=(i,)) for i in range(6)] + gather(*ts) + + times = [item[1] for item in results] + want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6] + for got, want in zip(times, want_times): + assert got == pytest.approx(want, 0.2), times + + assert len(set((r[2] for r in results))) == 2, results + + +@pytest.mark.slow +def test_queue_size(pool_cls, dsn): + def worker(t, ev=None): + try: + with p.connection(): + if ev: + ev.set() + sleep(t) + except pool.TooManyRequests as e: + errors.append(e) + else: + success.append(True) + + errors: List[Exception] = [] + success: List[bool] = [] + + with pool_cls(dsn, min_size=min_size(pool_cls), max_size=1, max_waiting=3) as p: + p.wait() + ev = Event() + spawn(worker, args=(0.3, ev)) + ev.wait() + + ts = [spawn(worker, args=(0.1,)) for i in range(4)] + gather(*ts) + + assert len(success) == 4 + assert len(errors) == 1 + assert isinstance(errors[0], pool.TooManyRequests) + assert p.name in str(errors[0]) + assert str(p.max_waiting) in str(errors[0]) + assert p.get_stats()["requests_errors"] == 1 + + +@pytest.mark.slow +@pytest.mark.timing +@pytest.mark.crdb_skip("backend pid") +def test_queue_timeout(pool_cls, dsn): + def worker(n): + t0 = time() + try: + with p.connection() as conn: + conn.execute("select pg_sleep(0.2)") + pid = conn.info.backend_pid + except pool.PoolTimeout as e: + t1 = time() + errors.append((n, t1 - t0, e)) + else: + t1 = time() + results.append((n, t1 - t0, pid)) + + results: List[Tuple[int, float, int]] = [] + errors: List[Tuple[int, float, Exception]] = [] + + with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2, timeout=0.1) as p: + ts = [spawn(worker, args=(i,)) for i in range(4)] + gather(*ts) + + assert len(results) == 2 + assert len(errors) == 2 + for e in errors: + assert 0.1 < e[1] < 0.15 + + +@pytest.mark.slow +@pytest.mark.timing +def test_dead_client(pool_cls, dsn): + def worker(i, timeout): + try: + with p.connection(timeout=timeout) as conn: + conn.execute("select pg_sleep(0.3)") + results.append(i) + except pool.PoolTimeout: + if timeout > 0.2: + raise + + with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2) as p: + results: List[int] = [] + ts = [ + spawn(worker, args=(i, timeout)) + for (i, timeout) in enumerate([0.4, 0.4, 0.1, 0.4, 0.4]) + ] + gather(*ts) + + sleep(0.2) + assert set(results) == set([0, 1, 3, 4]) + if pool_cls is pool.ConnectionPool: + assert len(p._pool) == 2 # no connection was lost + + +@pytest.mark.slow +@pytest.mark.timing +@pytest.mark.crdb_skip("backend pid") +def test_queue_timeout_override(pool_cls, dsn): + def worker(n): + t0 = time() + timeout = 0.25 if n == 3 else None + try: + with p.connection(timeout=timeout) as conn: + conn.execute("select pg_sleep(0.2)") + pid = conn.info.backend_pid + except pool.PoolTimeout as e: + t1 = time() + errors.append((n, t1 - t0, e)) + else: + t1 = time() + results.append((n, t1 - t0, pid)) + + results: List[Tuple[int, float, int]] = [] + errors: List[Tuple[int, float, Exception]] = [] + + with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2, timeout=0.1) as p: + ts = [spawn(worker, args=(i,)) for i in range(4)] + gather(*ts) + + assert len(results) == 3 + assert len(errors) == 1 + for e in errors: + assert 0.1 < e[1] < 0.15 + + +@pytest.mark.crdb_skip("backend pid") +def test_broken_reconnect(pool_cls, dsn): + with pool_cls(dsn, min_size=min_size(pool_cls), max_size=1) as p: + with p.connection() as conn: + pid1 = conn.info.backend_pid + conn.close() + + with p.connection() as conn2: + pid2 = conn2.info.backend_pid + + assert pid1 != pid2 + + +def test_close_no_tasks(pool_cls, dsn): + p = pool_cls(dsn) + assert p._sched_runner and is_alive(p._sched_runner) + workers = p._workers[:] + assert workers + for t in workers: + assert is_alive(t) + + p.close() + assert p._sched_runner is None + assert not p._workers + for t in workers: + assert not is_alive(t) + + +def test_putconn_no_pool(pool_cls, conn_cls, dsn): + with pool_cls(dsn, min_size=min_size(pool_cls)) as p: + conn = conn_cls.connect(dsn) + with pytest.raises(ValueError): + p.putconn(conn) + + conn.close() + + +def test_putconn_wrong_pool(pool_cls, dsn): + with pool_cls(dsn, min_size=min_size(pool_cls)) as p1: + with pool_cls(dsn, min_size=min_size(pool_cls)) as p2: + conn = p1.getconn() + with pytest.raises(ValueError): + p2.putconn(conn) + + +@pytest.mark.slow +@pytest.mark.skipif(is_async(__name__), reason="sync test only") +def test_del_stops_threads(pool_cls, dsn): + p = pool_cls(dsn) + assert p._sched_runner is not None + ts = [p._sched_runner] + p._workers + del p + sleep(0.1) + for t in ts: + assert not is_alive(t), t + + +def test_closed_getconn(pool_cls, dsn): + p = pool_cls(dsn, min_size=min_size(pool_cls)) + assert not p.closed + with p.connection(): + pass + + p.close() + assert p.closed + + with pytest.raises(pool.PoolClosed): + with p.connection(): + pass + + +def test_close_connection_on_pool_close(pool_cls, dsn): + p = pool_cls(dsn, min_size=min_size(pool_cls)) + with p.connection() as conn: + p.close() + assert conn.closed + + +def test_closed_queue(pool_cls, dsn): + def w1(): + with p.connection() as conn: + e1.set() # Tell w0 that w1 got a connection + cur = conn.execute("select 1") + assert cur.fetchone() == (1,) + e2.wait() # Wait until w0 has tested w2 + success.append("w1") + + def w2(): + try: + with p.connection(): + pass # unexpected + except pool.PoolClosed: + success.append("w2") + + e1 = Event() + e2 = Event() + + p = pool_cls(dsn, min_size=min_size(pool_cls), max_size=1) + p.wait() + success: List[str] = [] + + t1 = spawn(w1) + # Wait until w1 has received a connection + e1.wait() + + t2 = spawn(w2) + # Wait until w2 is in the queue + ensure_waiting(p) + p.close() + + # Wait for the workers to finish + e2.set() + gather(t1, t2) + assert len(success) == 2 + + +def test_open_explicit(pool_cls, dsn): + p = pool_cls(dsn, open=False) + assert p.closed + with pytest.raises(pool.PoolClosed, match="is not open yet"): + p.getconn() + + with pytest.raises(pool.PoolClosed, match="is not open yet"): + with p.connection(): + pass + + p.open() + try: + assert not p.closed + + with p.connection() as conn: + cur = conn.execute("select 1") + assert cur.fetchone() == (1,) + finally: + p.close() + + with pytest.raises(pool.PoolClosed, match="is already closed"): + p.getconn() + + +def test_open_context(pool_cls, dsn): + p = pool_cls(dsn, open=False) + assert p.closed + + with p: + assert not p.closed + + with p.connection() as conn: + cur = conn.execute("select 1") + assert cur.fetchone() == (1,) + + assert p.closed + + +def test_open_no_op(pool_cls, dsn): + p = pool_cls(dsn) + try: + assert not p.closed + p.open() + assert not p.closed + + with p.connection() as conn: + cur = conn.execute("select 1") + assert cur.fetchone() == (1,) + finally: + p.close() + + +def test_reopen(pool_cls, dsn): + p = pool_cls(dsn) + with p.connection() as conn: + conn.execute("select 1") + p.close() + assert p._sched_runner is None + assert not p._workers + + with pytest.raises(psycopg.OperationalError, match="cannot be reused"): + p.open() + + +def test_jitter(pool_cls): + rnds = [pool_cls._jitter(30, -0.1, +0.2) for i in range(100)] + assert 27 <= min(rnds) <= 28 + assert 35 < max(rnds) < 36 + + +@pytest.mark.slow +@pytest.mark.timing +def test_stats_measures(pool_cls, dsn): + def worker(n): + with p.connection() as conn: + conn.execute("select pg_sleep(0.2)") + + with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=4) as p: + p.wait(2.0) + + stats = p.get_stats() + assert stats["pool_min"] == min_size(pool_cls, 2) + assert stats["pool_max"] == 4 + assert stats["pool_size"] == min_size(pool_cls, 2) + assert stats["pool_available"] == min_size(pool_cls, 2) + assert stats["requests_waiting"] == 0 + + ts = [spawn(worker, args=(i,)) for i in range(3)] + sleep(0.1) + stats = p.get_stats() + gather(*ts) + assert stats["pool_min"] == min_size(pool_cls, 2) + assert stats["pool_max"] == 4 + assert stats["pool_size"] == 3 + assert stats["pool_available"] == 0 + assert stats["requests_waiting"] == 0 + + p.wait(2.0) + ts = [spawn(worker, args=(i,)) for i in range(7)] + sleep(0.1) + stats = p.get_stats() + gather(*ts) + assert stats["pool_min"] == min_size(pool_cls, 2) + assert stats["pool_max"] == 4 + assert stats["pool_size"] == 4 + assert stats["pool_available"] == 0 + assert stats["requests_waiting"] == 3 + + +@pytest.mark.slow +@pytest.mark.timing +def test_stats_usage(pool_cls, dsn): + def worker(n): + try: + with p.connection(timeout=0.3) as conn: + conn.execute("select pg_sleep(0.2)") + except pool.PoolTimeout: + pass + + with pool_cls(dsn, min_size=min_size(pool_cls, 3), max_size=3) as p: + p.wait(2.0) + + ts = [spawn(worker, args=(i,)) for i in range(7)] + gather(*ts) + stats = p.get_stats() + assert stats["requests_num"] == 7 + assert stats["requests_queued"] == 4 + assert 850 <= stats["requests_wait_ms"] <= 950 + assert stats["requests_errors"] == 1 + assert 1150 <= stats["usage_ms"] <= 1250 + assert stats.get("returns_bad", 0) == 0 + + with p.connection() as conn: + conn.close() + p.wait() + stats = p.pop_stats() + assert stats["requests_num"] == 8 + assert stats["returns_bad"] == 1 + with p.connection(): + pass + assert p.get_stats()["requests_num"] == 1 + + +def test_debug_deadlock(pool_cls, dsn): + # https://github.com/psycopg/psycopg/issues/230 + logger = logging.getLogger("psycopg") + handler = logging.StreamHandler() + old_level = logger.level + logger.setLevel(logging.DEBUG) + handler.setLevel(logging.DEBUG) + logger.addHandler(handler) + try: + with pool_cls(dsn, min_size=min_size(pool_cls, 4), open=True) as p: + p.wait(timeout=2) + finally: + logger.removeHandler(handler) + logger.setLevel(old_level) + + +@pytest.mark.skipif(not is_async(__name__), reason="async test only") +def test_cancellation_in_queue(pool_cls, dsn): + # https://github.com/psycopg/psycopg/issues/509 + + nconns = 3 + + with pool_cls( + dsn, min_size=min_size(pool_cls, nconns), max_size=nconns, timeout=1 + ) as p: + p.wait() + + got_conns = [] + ev = Event() + + def worker(i): + try: + logging.info("worker %s started", i) + nonlocal got_conns + + with p.connection() as conn: + logging.info("worker %s got conn", i) + cur = conn.execute("select 1") + assert cur.fetchone() == (1,) + + got_conns.append(conn) + if len(got_conns) >= nconns: + ev.set() + + sleep(5) + except BaseException as ex: + logging.info("worker %s stopped: %r", i, ex) + raise + + # Start tasks taking up all the connections and getting in the queue + tasks = [spawn(worker, (i,)) for i in range(nconns * 3)] + + # wait until the pool has served all the connections and clients are queued. + ev.wait(3.0) + for i in range(10): + if p.get_stats().get("requests_queued", 0): + break + else: + sleep(0.1) + else: + pytest.fail("no client got in the queue") + + [task.cancel() for task in reversed(tasks)] + gather(*tasks, return_exceptions=True, timeout=1.0) + + stats = p.get_stats() + assert stats["pool_available"] == min_size(pool_cls, nconns) + assert stats.get("requests_waiting", 0) == 0 + + with p.connection() as conn: + cur = conn.execute("select 1") + assert cur.fetchone() == (1,) + + +def min_size(pool_cls, num=1): + """Return the minimum min_size supported by the pool class.""" + if pool_cls is pool.ConnectionPool: + return num + elif pool_cls is pool.NullConnectionPool: + return 0 + else: + assert False, pool_cls + + +def delay_connection(monkeypatch, sec): + """ + Return a _connect_gen function delayed by the amount of seconds + """ + + def connect_delay(*args, **kwargs): + t0 = time() + rv = connect_orig(*args, **kwargs) + t1 = time() + sleep(max(0, sec - (t1 - t0))) + return rv + + connect_orig = psycopg.Connection.connect + monkeypatch.setattr(psycopg.Connection, "connect", connect_delay) + + +def ensure_waiting(p, num=1): + """ + Wait until there are at least *num* clients waiting in the queue. + """ + while len(p._waiting) < num: + sleep(0) diff --git a/tests/pool/test_pool_common_async.py b/tests/pool/test_pool_common_async.py new file mode 100644 index 000000000..61472e0f0 --- /dev/null +++ b/tests/pool/test_pool_common_async.py @@ -0,0 +1,634 @@ +import logging +from time import time +from typing import Any, List, Tuple + +import pytest + +import psycopg + +from ..utils import AEvent, spawn, gather, asleep, is_alive, is_async + +try: + import psycopg_pool as pool +except ImportError: + # Tests should have been skipped if the package is not available + pass + +if True: # ASYNC + pytestmark = [pytest.mark.anyio] + + +@pytest.fixture(params=[pool.AsyncConnectionPool, pool.AsyncNullConnectionPool]) +def pool_cls(request): + return request.param + + +async def test_defaults(pool_cls, dsn): + async with pool_cls(dsn) as p: + assert p.timeout == 30 + assert p.max_idle == 10 * 60 + assert p.max_lifetime == 60 * 60 + assert p.num_workers == 3 + + +async def test_connection_class(pool_cls, dsn): + class MyConn(psycopg.AsyncConnection[Any]): + pass + + async with pool_cls(dsn, connection_class=MyConn, min_size=min_size(pool_cls)) as p: + async with p.connection() as conn: + assert isinstance(conn, MyConn) + + +async def test_kwargs(pool_cls, dsn): + async with pool_cls( + dsn, kwargs={"autocommit": True}, min_size=min_size(pool_cls) + ) as p: + async with p.connection() as conn: + assert conn.autocommit + + +async def test_context(pool_cls, dsn): + async with pool_cls(dsn, min_size=min_size(pool_cls)) as p: + assert not p.closed + assert p.closed + + +async def test_wait_closed(pool_cls, dsn): + async with pool_cls(dsn) as p: + pass + + with pytest.raises(pool.PoolClosed): + await p.wait() + + +@pytest.mark.slow +async def test_setup_no_timeout(pool_cls, dsn, proxy): + with pytest.raises(pool.PoolTimeout): + async with pool_cls( + proxy.client_dsn, min_size=min_size(pool_cls), num_workers=1 + ) as p: + await p.wait(0.2) + + async with pool_cls( + proxy.client_dsn, min_size=min_size(pool_cls), num_workers=1 + ) as p: + await asleep(0.5) + assert not p._pool + proxy.start() + + async with p.connection() as conn: + await conn.execute("select 1") + + +@pytest.mark.slow +async def test_configure_badstate(pool_cls, dsn, caplog): + caplog.set_level(logging.WARNING, logger="psycopg.pool") + + async def configure(conn): + await conn.execute("select 1") + + async with pool_cls(dsn, min_size=min_size(pool_cls), configure=configure) as p: + with pytest.raises(pool.PoolTimeout): + await p.wait(timeout=0.5) + + assert caplog.records + assert "INTRANS" in caplog.records[0].message + + +@pytest.mark.slow +async def test_configure_broken(pool_cls, dsn, caplog): + caplog.set_level(logging.WARNING, logger="psycopg.pool") + + async def configure(conn): + async with conn.transaction(): + await conn.execute("WAT") + + async with pool_cls(dsn, min_size=min_size(pool_cls), configure=configure) as p: + with pytest.raises(pool.PoolTimeout): + await p.wait(timeout=0.5) + + assert caplog.records + assert "WAT" in caplog.records[0].message + + +@pytest.mark.slow +@pytest.mark.timing +@pytest.mark.crdb_skip("backend pid") +async def test_queue(pool_cls, dsn): + async def worker(n): + t0 = time() + async with p.connection() as conn: + await conn.execute("select pg_sleep(0.2)") + pid = conn.info.backend_pid + t1 = time() + results.append((n, t1 - t0, pid)) + + results: List[Tuple[int, float, int]] = [] + async with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2) as p: + await p.wait() + ts = [spawn(worker, args=(i,)) for i in range(6)] + await gather(*ts) + + times = [item[1] for item in results] + want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6] + for got, want in zip(times, want_times): + assert got == pytest.approx(want, 0.2), times + + assert len(set(r[2] for r in results)) == 2, results + + +@pytest.mark.slow +async def test_queue_size(pool_cls, dsn): + async def worker(t, ev=None): + try: + async with p.connection(): + if ev: + ev.set() + await asleep(t) + except pool.TooManyRequests as e: + errors.append(e) + else: + success.append(True) + + errors: List[Exception] = [] + success: List[bool] = [] + + async with pool_cls( + dsn, min_size=min_size(pool_cls), max_size=1, max_waiting=3 + ) as p: + await p.wait() + ev = AEvent() + spawn(worker, args=(0.3, ev)) + await ev.wait() + + ts = [spawn(worker, args=(0.1,)) for i in range(4)] + await gather(*ts) + + assert len(success) == 4 + assert len(errors) == 1 + assert isinstance(errors[0], pool.TooManyRequests) + assert p.name in str(errors[0]) + assert str(p.max_waiting) in str(errors[0]) + assert p.get_stats()["requests_errors"] == 1 + + +@pytest.mark.slow +@pytest.mark.timing +@pytest.mark.crdb_skip("backend pid") +async def test_queue_timeout(pool_cls, dsn): + async def worker(n): + t0 = time() + try: + async with p.connection() as conn: + await conn.execute("select pg_sleep(0.2)") + pid = conn.info.backend_pid + except pool.PoolTimeout as e: + t1 = time() + errors.append((n, t1 - t0, e)) + else: + t1 = time() + results.append((n, t1 - t0, pid)) + + results: List[Tuple[int, float, int]] = [] + errors: List[Tuple[int, float, Exception]] = [] + + async with pool_cls( + dsn, min_size=min_size(pool_cls, 2), max_size=2, timeout=0.1 + ) as p: + ts = [spawn(worker, args=(i,)) for i in range(4)] + await gather(*ts) + + assert len(results) == 2 + assert len(errors) == 2 + for e in errors: + assert 0.1 < e[1] < 0.15 + + +@pytest.mark.slow +@pytest.mark.timing +async def test_dead_client(pool_cls, dsn): + async def worker(i, timeout): + try: + async with p.connection(timeout=timeout) as conn: + await conn.execute("select pg_sleep(0.3)") + results.append(i) + except pool.PoolTimeout: + if timeout > 0.2: + raise + + async with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2) as p: + results: List[int] = [] + ts = [ + spawn(worker, args=(i, timeout)) + for i, timeout in enumerate([0.4, 0.4, 0.1, 0.4, 0.4]) + ] + await gather(*ts) + + await asleep(0.2) + assert set(results) == set([0, 1, 3, 4]) + if pool_cls is pool.AsyncConnectionPool: + assert len(p._pool) == 2 # no connection was lost + + +@pytest.mark.slow +@pytest.mark.timing +@pytest.mark.crdb_skip("backend pid") +async def test_queue_timeout_override(pool_cls, dsn): + async def worker(n): + t0 = time() + timeout = 0.25 if n == 3 else None + try: + async with p.connection(timeout=timeout) as conn: + await conn.execute("select pg_sleep(0.2)") + pid = conn.info.backend_pid + except pool.PoolTimeout as e: + t1 = time() + errors.append((n, t1 - t0, e)) + else: + t1 = time() + results.append((n, t1 - t0, pid)) + + results: List[Tuple[int, float, int]] = [] + errors: List[Tuple[int, float, Exception]] = [] + + async with pool_cls( + dsn, min_size=min_size(pool_cls, 2), max_size=2, timeout=0.1 + ) as p: + ts = [spawn(worker, args=(i,)) for i in range(4)] + await gather(*ts) + + assert len(results) == 3 + assert len(errors) == 1 + for e in errors: + assert 0.1 < e[1] < 0.15 + + +@pytest.mark.crdb_skip("backend pid") +async def test_broken_reconnect(pool_cls, dsn): + async with pool_cls(dsn, min_size=min_size(pool_cls), max_size=1) as p: + async with p.connection() as conn: + pid1 = conn.info.backend_pid + await conn.close() + + async with p.connection() as conn2: + pid2 = conn2.info.backend_pid + + assert pid1 != pid2 + + +async def test_close_no_tasks(pool_cls, dsn): + p = pool_cls(dsn) + assert p._sched_runner and is_alive(p._sched_runner) + workers = p._workers[:] + assert workers + for t in workers: + assert is_alive(t) + + await p.close() + assert p._sched_runner is None + assert not p._workers + for t in workers: + assert not is_alive(t) + + +async def test_putconn_no_pool(pool_cls, aconn_cls, dsn): + async with pool_cls(dsn, min_size=min_size(pool_cls)) as p: + conn = await aconn_cls.connect(dsn) + with pytest.raises(ValueError): + await p.putconn(conn) + + await conn.close() + + +async def test_putconn_wrong_pool(pool_cls, dsn): + async with pool_cls(dsn, min_size=min_size(pool_cls)) as p1: + async with pool_cls(dsn, min_size=min_size(pool_cls)) as p2: + conn = await p1.getconn() + with pytest.raises(ValueError): + await p2.putconn(conn) + + +@pytest.mark.slow +@pytest.mark.skipif(is_async(__name__), reason="sync test only") +async def test_del_stops_threads(pool_cls, dsn): + p = pool_cls(dsn) + assert p._sched_runner is not None + ts = [p._sched_runner] + p._workers + del p + await asleep(0.1) + for t in ts: + assert not is_alive(t), t + + +async def test_closed_getconn(pool_cls, dsn): + p = pool_cls(dsn, min_size=min_size(pool_cls)) + assert not p.closed + async with p.connection(): + pass + + await p.close() + assert p.closed + + with pytest.raises(pool.PoolClosed): + async with p.connection(): + pass + + +async def test_close_connection_on_pool_close(pool_cls, dsn): + p = pool_cls(dsn, min_size=min_size(pool_cls)) + async with p.connection() as conn: + await p.close() + assert conn.closed + + +async def test_closed_queue(pool_cls, dsn): + async def w1(): + async with p.connection() as conn: + e1.set() # Tell w0 that w1 got a connection + cur = await conn.execute("select 1") + assert await cur.fetchone() == (1,) + await e2.wait() # Wait until w0 has tested w2 + success.append("w1") + + async def w2(): + try: + async with p.connection(): + pass # unexpected + except pool.PoolClosed: + success.append("w2") + + e1 = AEvent() + e2 = AEvent() + + p = pool_cls(dsn, min_size=min_size(pool_cls), max_size=1) + await p.wait() + success: List[str] = [] + + t1 = spawn(w1) + # Wait until w1 has received a connection + await e1.wait() + + t2 = spawn(w2) + # Wait until w2 is in the queue + await ensure_waiting(p) + await p.close() + + # Wait for the workers to finish + e2.set() + await gather(t1, t2) + assert len(success) == 2 + + +async def test_open_explicit(pool_cls, dsn): + p = pool_cls(dsn, open=False) + assert p.closed + with pytest.raises(pool.PoolClosed, match="is not open yet"): + await p.getconn() + + with pytest.raises(pool.PoolClosed, match="is not open yet"): + async with p.connection(): + pass + + await p.open() + try: + assert not p.closed + + async with p.connection() as conn: + cur = await conn.execute("select 1") + assert await cur.fetchone() == (1,) + + finally: + await p.close() + + with pytest.raises(pool.PoolClosed, match="is already closed"): + await p.getconn() + + +async def test_open_context(pool_cls, dsn): + p = pool_cls(dsn, open=False) + assert p.closed + + async with p: + assert not p.closed + + async with p.connection() as conn: + cur = await conn.execute("select 1") + assert await cur.fetchone() == (1,) + + assert p.closed + + +async def test_open_no_op(pool_cls, dsn): + p = pool_cls(dsn) + try: + assert not p.closed + await p.open() + assert not p.closed + + async with p.connection() as conn: + cur = await conn.execute("select 1") + assert await cur.fetchone() == (1,) + + finally: + await p.close() + + +async def test_reopen(pool_cls, dsn): + p = pool_cls(dsn) + async with p.connection() as conn: + await conn.execute("select 1") + await p.close() + assert p._sched_runner is None + assert not p._workers + + with pytest.raises(psycopg.OperationalError, match="cannot be reused"): + await p.open() + + +async def test_jitter(pool_cls): + rnds = [pool_cls._jitter(30, -0.1, +0.2) for i in range(100)] + assert 27 <= min(rnds) <= 28 + assert 35 < max(rnds) < 36 + + +@pytest.mark.slow +@pytest.mark.timing +async def test_stats_measures(pool_cls, dsn): + async def worker(n): + async with p.connection() as conn: + await conn.execute("select pg_sleep(0.2)") + + async with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=4) as p: + await p.wait(2.0) + + stats = p.get_stats() + assert stats["pool_min"] == min_size(pool_cls, 2) + assert stats["pool_max"] == 4 + assert stats["pool_size"] == min_size(pool_cls, 2) + assert stats["pool_available"] == min_size(pool_cls, 2) + assert stats["requests_waiting"] == 0 + + ts = [spawn(worker, args=(i,)) for i in range(3)] + await asleep(0.1) + stats = p.get_stats() + await gather(*ts) + assert stats["pool_min"] == min_size(pool_cls, 2) + assert stats["pool_max"] == 4 + assert stats["pool_size"] == 3 + assert stats["pool_available"] == 0 + assert stats["requests_waiting"] == 0 + + await p.wait(2.0) + ts = [spawn(worker, args=(i,)) for i in range(7)] + await asleep(0.1) + stats = p.get_stats() + await gather(*ts) + assert stats["pool_min"] == min_size(pool_cls, 2) + assert stats["pool_max"] == 4 + assert stats["pool_size"] == 4 + assert stats["pool_available"] == 0 + assert stats["requests_waiting"] == 3 + + +@pytest.mark.slow +@pytest.mark.timing +async def test_stats_usage(pool_cls, dsn): + async def worker(n): + try: + async with p.connection(timeout=0.3) as conn: + await conn.execute("select pg_sleep(0.2)") + except pool.PoolTimeout: + pass + + async with pool_cls(dsn, min_size=min_size(pool_cls, 3), max_size=3) as p: + await p.wait(2.0) + + ts = [spawn(worker, args=(i,)) for i in range(7)] + await gather(*ts) + stats = p.get_stats() + assert stats["requests_num"] == 7 + assert stats["requests_queued"] == 4 + assert 850 <= stats["requests_wait_ms"] <= 950 + assert stats["requests_errors"] == 1 + assert 1150 <= stats["usage_ms"] <= 1250 + assert stats.get("returns_bad", 0) == 0 + + async with p.connection() as conn: + await conn.close() + await p.wait() + stats = p.pop_stats() + assert stats["requests_num"] == 8 + assert stats["returns_bad"] == 1 + async with p.connection(): + pass + assert p.get_stats()["requests_num"] == 1 + + +async def test_debug_deadlock(pool_cls, dsn): + # https://github.com/psycopg/psycopg/issues/230 + logger = logging.getLogger("psycopg") + handler = logging.StreamHandler() + old_level = logger.level + logger.setLevel(logging.DEBUG) + handler.setLevel(logging.DEBUG) + logger.addHandler(handler) + try: + async with pool_cls(dsn, min_size=min_size(pool_cls, 4), open=True) as p: + await p.wait(timeout=2) + finally: + logger.removeHandler(handler) + logger.setLevel(old_level) + + +@pytest.mark.skipif(not is_async(__name__), reason="async test only") +async def test_cancellation_in_queue(pool_cls, dsn): + # https://github.com/psycopg/psycopg/issues/509 + + nconns = 3 + + async with pool_cls( + dsn, min_size=min_size(pool_cls, nconns), max_size=nconns, timeout=1 + ) as p: + await p.wait() + + got_conns = [] + ev = AEvent() + + async def worker(i): + try: + logging.info("worker %s started", i) + nonlocal got_conns + + async with p.connection() as conn: + logging.info("worker %s got conn", i) + cur = await conn.execute("select 1") + assert (await cur.fetchone()) == (1,) + + got_conns.append(conn) + if len(got_conns) >= nconns: + ev.set() + + await asleep(5) + + except BaseException as ex: + logging.info("worker %s stopped: %r", i, ex) + raise + + # Start tasks taking up all the connections and getting in the queue + tasks = [spawn(worker, (i,)) for i in range(nconns * 3)] + + # wait until the pool has served all the connections and clients are queued. + await ev.wait_timeout(3.0) + for i in range(10): + if p.get_stats().get("requests_queued", 0): + break + else: + await asleep(0.1) + else: + pytest.fail("no client got in the queue") + + [task.cancel() for task in reversed(tasks)] + await gather(*tasks, return_exceptions=True, timeout=1.0) + + stats = p.get_stats() + assert stats["pool_available"] == min_size(pool_cls, nconns) + assert stats.get("requests_waiting", 0) == 0 + + async with p.connection() as conn: + cur = await conn.execute("select 1") + assert await cur.fetchone() == (1,) + + +def min_size(pool_cls, num=1): + """Return the minimum min_size supported by the pool class.""" + if pool_cls is pool.AsyncConnectionPool: + return num + elif pool_cls is pool.AsyncNullConnectionPool: + return 0 + else: + assert False, pool_cls + + +def delay_connection(monkeypatch, sec): + """ + Return a _connect_gen function delayed by the amount of seconds + """ + + async def connect_delay(*args, **kwargs): + t0 = time() + rv = await connect_orig(*args, **kwargs) + t1 = time() + await asleep(max(0, sec - (t1 - t0))) + return rv + + connect_orig = psycopg.AsyncConnection.connect + monkeypatch.setattr(psycopg.AsyncConnection, "connect", connect_delay) + + +async def ensure_waiting(p, num=1): + """ + Wait until there are at least *num* clients waiting in the queue. + """ + while len(p._waiting) < num: + await asleep(0) diff --git a/tools/async_to_sync.py b/tools/async_to_sync.py index 4183d7222..3ede5997f 100755 --- a/tools/async_to_sync.py +++ b/tools/async_to_sync.py @@ -163,6 +163,7 @@ class RenameAsyncToSync(ast.NodeTransformer): "AsyncGenerator": "Generator", "AsyncIterator": "Iterator", "AsyncLibpqWriter": "LibpqWriter", + "AsyncNullConnectionPool": "NullConnectionPool", "AsyncPipeline": "Pipeline", "AsyncQueuedLibpqWriter": "QueuedLibpqWriter", "AsyncRawCursor": "RawCursor", diff --git a/tools/convert_async_to_sync.sh b/tools/convert_async_to_sync.sh index 843fd9be3..1b05fda1f 100755 --- a/tools/convert_async_to_sync.sh +++ b/tools/convert_async_to_sync.sh @@ -22,6 +22,7 @@ for async in \ psycopg/psycopg/cursor_async.py \ psycopg_pool/psycopg_pool/sched_async.py \ tests/pool/test_pool_async.py \ + tests/pool/test_pool_common_async.py \ tests/pool/test_sched_async.py \ tests/test_connection_async.py \ tests/test_copy_async.py \