]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
refactor(tests): generate most pool tests using parametric fixture
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Tue, 12 Sep 2023 04:48:10 +0000 (06:48 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 11 Oct 2023 21:45:38 +0000 (23:45 +0200)
tests/pool/test_null_pool_async.py
tests/pool/test_pool.py
tests/pool/test_pool_async.py
tests/pool/test_pool_common.py [new file with mode: 0644]
tests/pool/test_pool_common_async.py [new file with mode: 0644]
tools/async_to_sync.py
tools/convert_async_to_sync.sh

index 55708e50aa0e39dec21cc9e0d75f9de1068e7757..56a016397d4b456760418d1bbcc9f30f9450b538 100644 (file)
@@ -16,23 +16,18 @@ from .test_pool_async import delay_connection, ensure_waiting
 pytestmark = [pytest.mark.anyio]
 
 try:
-    from psycopg_pool import AsyncNullConnectionPool  # noqa: F401
-    from psycopg_pool import PoolClosed, PoolTimeout, TooManyRequests
+    import psycopg_pool as pool
 except ImportError:
     pass
 
 
-async def test_defaults(dsn):
-    async with AsyncNullConnectionPool(dsn) as p:
+async def test_default_sizes(dsn):
+    async with pool.AsyncNullConnectionPool(dsn) as p:
         assert p.min_size == p.max_size == 0
-        assert p.timeout == 30
-        assert p.max_idle == 10 * 60
-        assert p.max_lifetime == 60 * 60
-        assert p.num_workers == 3
 
 
 async def test_min_size_max_size(dsn):
-    async with AsyncNullConnectionPool(dsn, min_size=0, max_size=2) as p:
+    async with pool.AsyncNullConnectionPool(dsn, min_size=0, max_size=2) as p:
         assert p.min_size == 0
         assert p.max_size == 2
 
@@ -40,22 +35,7 @@ async def test_min_size_max_size(dsn):
 @pytest.mark.parametrize("min_size, max_size", [(1, None), (-1, None), (0, -2)])
 async def test_bad_size(dsn, min_size, max_size):
     with pytest.raises(ValueError):
-        AsyncNullConnectionPool(min_size=min_size, max_size=max_size)
-
-
-async def test_connection_class(dsn):
-    class MyConn(psycopg.AsyncConnection[Any]):
-        pass
-
-    async with AsyncNullConnectionPool(dsn, connection_class=MyConn) as p:
-        async with p.connection() as conn:
-            assert isinstance(conn, MyConn)
-
-
-async def test_kwargs(dsn):
-    async with AsyncNullConnectionPool(dsn, kwargs={"autocommit": True}) as p:
-        async with p.connection() as conn:
-            assert conn.autocommit
+        pool.AsyncNullConnectionPool(min_size=min_size, max_size=max_size)
 
 
 class MyRow(Dict[str, Any]):
@@ -69,7 +49,7 @@ async def test_generic_connection_type(dsn):
     class MyConnection(psycopg.AsyncConnection[Row]):
         pass
 
-    async with AsyncNullConnectionPool(
+    async with pool.AsyncNullConnectionPool(
         dsn,
         connection_class=MyConnection[MyRow],
         kwargs={"row_factory": class_row(MyRow)},
@@ -78,19 +58,19 @@ async def test_generic_connection_type(dsn):
         async with p1.connection() as conn1:
             cur1 = await conn1.execute("select 1 as x")
             (row1,) = await cur1.fetchall()
-    assert_type(p1, AsyncNullConnectionPool[MyConnection[MyRow]])
+    assert_type(p1, pool.AsyncNullConnectionPool[MyConnection[MyRow]])
     assert_type(conn1, MyConnection[MyRow])
     assert_type(row1, MyRow)
     assert conn1.autocommit
     assert row1 == {"x": 1}
 
-    async with AsyncNullConnectionPool(
+    async with pool.AsyncNullConnectionPool(
         dsn, connection_class=MyConnection[TupleRow]
     ) as p2:
         async with p2.connection() as conn2:
             cur2 = await conn2.execute("select 2 as y")
             (row2,) = await cur2.fetchall()
-    assert_type(p2, AsyncNullConnectionPool[MyConnection[TupleRow]])
+    assert_type(p2, pool.AsyncNullConnectionPool[MyConnection[TupleRow]])
     assert_type(conn2, MyConnection[TupleRow])
     assert_type(row2, TupleRow)
     assert row2 == (2,)
@@ -105,12 +85,12 @@ async def test_non_generic_connection_type(dsn):
             kwargs["row_factory"] = class_row(MyRow)
             super().__init__(*args, **kwargs)
 
-    async with AsyncNullConnectionPool(
+    async with pool.AsyncNullConnectionPool(
         dsn, connection_class=MyConnection, configure=set_autocommit
     ) as p1:
         async with p1.connection() as conn1:
             (row1,) = await (await conn1.execute("select 1 as x")).fetchall()
-    assert_type(p1, AsyncNullConnectionPool[MyConnection])
+    assert_type(p1, pool.AsyncNullConnectionPool[MyConnection])
     assert_type(conn1, MyConnection)
     assert_type(row1, MyRow)
     assert conn1.autocommit
@@ -119,7 +99,7 @@ async def test_non_generic_connection_type(dsn):
 
 @pytest.mark.crdb_skip("backend pid")
 async def test_its_no_pool_at_all(dsn):
-    async with AsyncNullConnectionPool(dsn, max_size=2) as p:
+    async with pool.AsyncNullConnectionPool(dsn, max_size=2) as p:
         async with p.connection() as conn:
             pid1 = conn.info.backend_pid
 
@@ -130,47 +110,18 @@ async def test_its_no_pool_at_all(dsn):
             assert conn.info.backend_pid not in (pid1, pid2)
 
 
-async def test_context(dsn):
-    async with AsyncNullConnectionPool(dsn) as p:
-        assert not p.closed
-    assert p.closed
-
-
 @pytest.mark.slow
 @pytest.mark.timing
 async def test_wait_ready(dsn, monkeypatch):
     delay_connection(monkeypatch, 0.2)
-    with pytest.raises(PoolTimeout):
-        async with AsyncNullConnectionPool(dsn, num_workers=1) as p:
+    with pytest.raises(pool.PoolTimeout):
+        async with pool.AsyncNullConnectionPool(dsn, num_workers=1) as p:
             await p.wait(0.1)
 
-    async with AsyncNullConnectionPool(dsn, num_workers=1) as p:
+    async with pool.AsyncNullConnectionPool(dsn, num_workers=1) as p:
         await p.wait(0.4)
 
 
-async def test_wait_closed(dsn):
-    async with AsyncNullConnectionPool(dsn) as p:
-        pass
-
-    with pytest.raises(PoolClosed):
-        await p.wait()
-
-
-@pytest.mark.slow
-async def test_setup_no_timeout(dsn, proxy):
-    with pytest.raises(PoolTimeout):
-        async with AsyncNullConnectionPool(proxy.client_dsn, num_workers=1) as p:
-            await p.wait(0.2)
-
-    async with AsyncNullConnectionPool(proxy.client_dsn, num_workers=1) as p:
-        await asyncio.sleep(0.5)
-        assert not p._pool
-        proxy.start()
-
-        async with p.connection() as conn:
-            await conn.execute("select 1")
-
-
 async def test_configure(dsn):
     inits = 0
 
@@ -180,7 +131,7 @@ async def test_configure(dsn):
         async with conn.transaction():
             await conn.execute("set default_transaction_read_only to on")
 
-    async with AsyncNullConnectionPool(dsn, configure=configure) as p:
+    async with pool.AsyncNullConnectionPool(dsn, configure=configure) as p:
         async with p.connection() as conn:
             assert inits == 1
             res = await conn.execute("show default_transaction_read_only")
@@ -198,37 +149,6 @@ async def test_configure(dsn):
             assert (await res.fetchone())[0] == "on"  # type: ignore[index]
 
 
-@pytest.mark.slow
-async def test_configure_badstate(dsn, caplog):
-    caplog.set_level(logging.WARNING, logger="psycopg.pool")
-
-    async def configure(conn):
-        await conn.execute("select 1")
-
-    async with AsyncNullConnectionPool(dsn, configure=configure) as p:
-        with pytest.raises(PoolTimeout):
-            await p.wait(timeout=0.5)
-
-    assert caplog.records
-    assert "INTRANS" in caplog.records[0].message
-
-
-@pytest.mark.slow
-async def test_configure_broken(dsn, caplog):
-    caplog.set_level(logging.WARNING, logger="psycopg.pool")
-
-    async def configure(conn):
-        async with conn.transaction():
-            await conn.execute("WAT")
-
-    async with AsyncNullConnectionPool(dsn, configure=configure) as p:
-        with pytest.raises(PoolTimeout):
-            await p.wait(timeout=0.5)
-
-    assert caplog.records
-    assert "WAT" in caplog.records[0].message
-
-
 @pytest.mark.crdb_skip("backend pid")
 async def test_reset(dsn):
     resets = 0
@@ -252,7 +172,7 @@ async def test_reset(dsn):
             assert (await cur.fetchone()) == ("UTC",)
             pids.append(conn.info.backend_pid)
 
-    async with AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p:
+    async with pool.AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p:
         async with p.connection() as conn:
             # Queue the worker so it will take the same connection a second time
             # instead of making a new one.
@@ -284,7 +204,7 @@ async def test_reset_badstate(dsn, caplog):
             await conn.execute("select 1")
             pids.append(conn.info.backend_pid)
 
-    async with AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p:
+    async with pool.AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p:
         async with p.connection() as conn:
             t = create_task(worker())
             await ensure_waiting(p)
@@ -314,7 +234,7 @@ async def test_reset_broken(dsn, caplog):
             await conn.execute("select 1")
             pids.append(conn.info.backend_pid)
 
-    async with AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p:
+    async with pool.AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p:
         async with p.connection() as conn:
             t = create_task(worker())
             await ensure_waiting(p)
@@ -332,171 +252,14 @@ async def test_reset_broken(dsn, caplog):
 @pytest.mark.slow
 @pytest.mark.skipif("ver(psycopg.__version__) < ver('3.0.8')")
 async def test_no_queue_timeout(deaf_port):
-    async with AsyncNullConnectionPool(
+    async with pool.AsyncNullConnectionPool(
         kwargs={"host": "localhost", "port": deaf_port}
     ) as p:
-        with pytest.raises(PoolTimeout):
+        with pytest.raises(pool.PoolTimeout):
             async with p.connection(timeout=1):
                 pass
 
 
-@pytest.mark.slow
-@pytest.mark.timing
-@pytest.mark.crdb_skip("backend pid")
-async def test_queue(dsn):
-    async def worker(n):
-        t0 = time()
-        async with p.connection() as conn:
-            await conn.execute("select pg_sleep(0.2)")
-            pid = conn.info.backend_pid
-        t1 = time()
-        results.append((n, t1 - t0, pid))
-
-    results: List[Tuple[int, float, int]] = []
-    async with AsyncNullConnectionPool(dsn, max_size=2) as p:
-        await p.wait()
-        ts = [create_task(worker(i)) for i in range(6)]
-        await asyncio.gather(*ts)
-
-    times = [item[1] for item in results]
-    want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
-    for got, want in zip(times, want_times):
-        assert got == pytest.approx(want, 0.2), times
-
-    assert len(set(r[2] for r in results)) == 2, results
-
-
-@pytest.mark.slow
-async def test_queue_size(dsn):
-    async def worker(t, ev=None):
-        try:
-            async with p.connection():
-                if ev:
-                    ev.set()
-                await asyncio.sleep(t)
-        except TooManyRequests as e:
-            errors.append(e)
-        else:
-            success.append(True)
-
-    errors: List[Exception] = []
-    success: List[bool] = []
-
-    async with AsyncNullConnectionPool(dsn, max_size=1, max_waiting=3) as p:
-        await p.wait()
-        ev = asyncio.Event()
-        create_task(worker(0.3, ev))
-        await ev.wait()
-
-        ts = [create_task(worker(0.1)) for i in range(4)]
-        await asyncio.gather(*ts)
-
-    assert len(success) == 4
-    assert len(errors) == 1
-    assert isinstance(errors[0], TooManyRequests)
-    assert p.name in str(errors[0])
-    assert str(p.max_waiting) in str(errors[0])
-    assert p.get_stats()["requests_errors"] == 1
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-@pytest.mark.crdb_skip("backend pid")
-async def test_queue_timeout(dsn):
-    async def worker(n):
-        t0 = time()
-        try:
-            async with p.connection() as conn:
-                await conn.execute("select pg_sleep(0.2)")
-                pid = conn.info.backend_pid
-        except PoolTimeout as e:
-            t1 = time()
-            errors.append((n, t1 - t0, e))
-        else:
-            t1 = time()
-            results.append((n, t1 - t0, pid))
-
-    results: List[Tuple[int, float, int]] = []
-    errors: List[Tuple[int, float, Exception]] = []
-
-    async with AsyncNullConnectionPool(dsn, max_size=2, timeout=0.1) as p:
-        ts = [create_task(worker(i)) for i in range(4)]
-        await asyncio.gather(*ts)
-
-    assert len(results) == 2
-    assert len(errors) == 2
-    for e in errors:
-        assert 0.1 < e[1] < 0.15
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-async def test_dead_client(dsn):
-    async def worker(i, timeout):
-        try:
-            async with p.connection(timeout=timeout) as conn:
-                await conn.execute("select pg_sleep(0.3)")
-                results.append(i)
-        except PoolTimeout:
-            if timeout > 0.2:
-                raise
-
-    async with AsyncNullConnectionPool(dsn, max_size=2) as p:
-        results: List[int] = []
-        ts = [
-            create_task(worker(i, timeout))
-            for i, timeout in enumerate([0.4, 0.4, 0.1, 0.4, 0.4])
-        ]
-        await asyncio.gather(*ts)
-
-        await asyncio.sleep(0.2)
-        assert set(results) == set([0, 1, 3, 4])
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-@pytest.mark.crdb_skip("backend pid")
-async def test_queue_timeout_override(dsn):
-    async def worker(n):
-        t0 = time()
-        timeout = 0.25 if n == 3 else None
-        try:
-            async with p.connection(timeout=timeout) as conn:
-                await conn.execute("select pg_sleep(0.2)")
-                pid = conn.info.backend_pid
-        except PoolTimeout as e:
-            t1 = time()
-            errors.append((n, t1 - t0, e))
-        else:
-            t1 = time()
-            results.append((n, t1 - t0, pid))
-
-    results: List[Tuple[int, float, int]] = []
-    errors: List[Tuple[int, float, Exception]] = []
-
-    async with AsyncNullConnectionPool(dsn, max_size=2, timeout=0.1) as p:
-        ts = [create_task(worker(i)) for i in range(4)]
-        await asyncio.gather(*ts)
-
-    assert len(results) == 3
-    assert len(errors) == 1
-    for e in errors:
-        assert 0.1 < e[1] < 0.15
-
-
-@pytest.mark.crdb_skip("backend pid")
-async def test_broken_reconnect(dsn):
-    async with AsyncNullConnectionPool(dsn, max_size=1) as p:
-        async with p.connection() as conn:
-            pid1 = conn.info.backend_pid
-            await conn.close()
-
-        async with p.connection() as conn2:
-            pid2 = conn2.info.backend_pid
-
-    assert pid1 != pid2
-
-
 @pytest.mark.crdb_skip("backend pid")
 async def test_intrans_rollback(dsn, caplog):
     caplog.set_level(logging.WARNING, logger="psycopg.pool")
@@ -511,7 +274,7 @@ async def test_intrans_rollback(dsn, caplog):
             )
             assert not await cur.fetchone()
 
-    async with AsyncNullConnectionPool(dsn, max_size=1) as p:
+    async with pool.AsyncNullConnectionPool(dsn, max_size=1) as p:
         conn = await p.getconn()
 
         # Queue the worker so it will take the connection a second time instead
@@ -540,7 +303,7 @@ async def test_inerror_rollback(dsn, caplog):
             pids.append(conn.info.backend_pid)
             assert conn.info.transaction_status == TransactionStatus.IDLE
 
-    async with AsyncNullConnectionPool(dsn, max_size=1) as p:
+    async with pool.AsyncNullConnectionPool(dsn, max_size=1) as p:
         conn = await p.getconn()
 
         t = create_task(worker())
@@ -569,7 +332,7 @@ async def test_active_close(dsn, caplog):
             pids.append(conn.info.backend_pid)
             assert conn.info.transaction_status == TransactionStatus.IDLE
 
-    async with AsyncNullConnectionPool(dsn, max_size=1) as p:
+    async with pool.AsyncNullConnectionPool(dsn, max_size=1) as p:
         conn = await p.getconn()
 
         t = create_task(worker())
@@ -597,7 +360,7 @@ async def test_fail_rollback_close(dsn, caplog, monkeypatch):
             pids.append(conn.info.backend_pid)
             assert conn.info.transaction_status == TransactionStatus.IDLE
 
-    async with AsyncNullConnectionPool(dsn, max_size=1) as p:
+    async with pool.AsyncNullConnectionPool(dsn, max_size=1) as p:
         conn = await p.getconn()
         t = create_task(worker())
         await ensure_waiting(p)
@@ -624,170 +387,16 @@ async def test_fail_rollback_close(dsn, caplog, monkeypatch):
     assert "BAD" in caplog.records[2].message
 
 
-async def test_close_no_tasks(dsn):
-    p = AsyncNullConnectionPool(dsn)
-    assert p._sched_runner and not p._sched_runner.done()
-    assert p._workers
-    workers = p._workers[:]
-    for t in workers:
-        assert not t.done()
-
-    await p.close()
-    assert p._sched_runner is None
-    assert not p._workers
-    for t in workers:
-        assert t.done()
-
-
-async def test_putconn_no_pool(aconn_cls, dsn):
-    async with AsyncNullConnectionPool(dsn) as p:
-        conn = await aconn_cls.connect(dsn)
-        with pytest.raises(ValueError):
-            await p.putconn(conn)
-
-    await conn.close()
-
-
-async def test_putconn_wrong_pool(dsn):
-    async with AsyncNullConnectionPool(dsn) as p1:
-        async with AsyncNullConnectionPool(dsn) as p2:
-            conn = await p1.getconn()
-            with pytest.raises(ValueError):
-                await p2.putconn(conn)
-
-
-async def test_closed_getconn(dsn):
-    p = AsyncNullConnectionPool(dsn)
-    assert not p.closed
-    async with p.connection():
-        pass
-
-    await p.close()
-    assert p.closed
-
-    with pytest.raises(PoolClosed):
-        async with p.connection():
-            pass
-
-
 async def test_closed_putconn(dsn):
-    p = AsyncNullConnectionPool(dsn)
-
-    async with p.connection() as conn:
-        pass
-    assert conn.closed
-
-    async with p.connection() as conn:
-        await p.close()
-    assert conn.closed
-
-
-async def test_closed_queue(dsn):
-    async def w1():
+    async with pool.AsyncNullConnectionPool(dsn) as p:
         async with p.connection() as conn:
-            e1.set()  # Tell w0 that w1 got a connection
-            cur = await conn.execute("select 1")
-            assert await cur.fetchone() == (1,)
-            await e2.wait()  # Wait until w0 has tested w2
-        success.append("w1")
-
-    async def w2():
-        try:
-            async with p.connection():
-                pass  # unexpected
-        except PoolClosed:
-            success.append("w2")
-
-    e1 = asyncio.Event()
-    e2 = asyncio.Event()
-
-    p = AsyncNullConnectionPool(dsn, max_size=1)
-    await p.wait()
-    success: List[str] = []
-
-    t1 = create_task(w1())
-    # Wait until w1 has received a connection
-    await e1.wait()
-
-    t2 = create_task(w2())
-    # Wait until w2 is in the queue
-    await ensure_waiting(p)
-    await p.close()
-
-    # Wait for the workers to finish
-    e2.set()
-    await asyncio.gather(t1, t2)
-    assert len(success) == 2
-
-
-async def test_open_explicit(dsn):
-    p = AsyncNullConnectionPool(dsn, open=False)
-    assert p.closed
-    with pytest.raises(PoolClosed):
-        await p.getconn()
-
-    with pytest.raises(PoolClosed, match="is not open yet"):
-        async with p.connection():
             pass
-
-    await p.open()
-    try:
-        assert not p.closed
-
-        async with p.connection() as conn:
-            cur = await conn.execute("select 1")
-            assert await cur.fetchone() == (1,)
-
-    finally:
-        await p.close()
-
-    with pytest.raises(PoolClosed, match="is already closed"):
-        await p.getconn()
-
-
-async def test_open_context(dsn):
-    p = AsyncNullConnectionPool(dsn, open=False)
-    assert p.closed
-
-    async with p:
-        assert not p.closed
-
-        async with p.connection() as conn:
-            cur = await conn.execute("select 1")
-            assert await cur.fetchone() == (1,)
-
-    assert p.closed
-
-
-async def test_open_no_op(dsn):
-    p = AsyncNullConnectionPool(dsn)
-    try:
-        assert not p.closed
-        await p.open()
-        assert not p.closed
-
-        async with p.connection() as conn:
-            cur = await conn.execute("select 1")
-            assert await cur.fetchone() == (1,)
-
-    finally:
-        await p.close()
-
-
-async def test_reopen(dsn):
-    p = AsyncNullConnectionPool(dsn)
-    async with p.connection() as conn:
-        await conn.execute("select 1")
-    await p.close()
-    assert p._sched_runner is None
-
-    with pytest.raises(psycopg.OperationalError, match="cannot be reused"):
-        await p.open()
+        assert conn.closed
 
 
 @pytest.mark.parametrize("min_size, max_size", [(1, None), (-1, None), (0, -2)])
 async def test_bad_resize(dsn, min_size, max_size):
-    async with AsyncNullConnectionPool() as p:
+    async with pool.AsyncNullConnectionPool() as p:
         with pytest.raises(ValueError):
             await p.resize(min_size=min_size, max_size=max_size)
 
@@ -803,7 +412,7 @@ async def test_max_lifetime(dsn):
             pids.append(conn.info.backend_pid)
             await asyncio.sleep(0.1)
 
-    async with AsyncNullConnectionPool(dsn, max_size=1, max_lifetime=0.2) as p:
+    async with pool.AsyncNullConnectionPool(dsn, max_size=1, max_lifetime=0.2) as p:
         ts = [create_task(worker()) for i in range(5)]
         await asyncio.gather(*ts)
 
@@ -812,88 +421,15 @@ async def test_max_lifetime(dsn):
 
 async def test_check(dsn):
     # no.op
-    async with AsyncNullConnectionPool(dsn) as p:
+    async with pool.AsyncNullConnectionPool(dsn) as p:
         await p.check()
 
 
-@pytest.mark.slow
-@pytest.mark.timing
-async def test_stats_measures(dsn):
-    async def worker(n):
-        async with p.connection() as conn:
-            await conn.execute("select pg_sleep(0.2)")
-
-    async with AsyncNullConnectionPool(dsn, max_size=4) as p:
-        await p.wait(2.0)
-
-        stats = p.get_stats()
-        assert stats["pool_min"] == 0
-        assert stats["pool_max"] == 4
-        assert stats["pool_size"] == 0
-        assert stats["pool_available"] == 0
-        assert stats["requests_waiting"] == 0
-
-        ts = [create_task(worker(i)) for i in range(3)]
-        await asyncio.sleep(0.1)
-        stats = p.get_stats()
-        await asyncio.gather(*ts)
-        assert stats["pool_min"] == 0
-        assert stats["pool_max"] == 4
-        assert stats["pool_size"] == 3
-        assert stats["pool_available"] == 0
-        assert stats["requests_waiting"] == 0
-
-        await p.wait(2.0)
-        ts = [create_task(worker(i)) for i in range(7)]
-        await asyncio.sleep(0.1)
-        stats = p.get_stats()
-        await asyncio.gather(*ts)
-        assert stats["pool_min"] == 0
-        assert stats["pool_max"] == 4
-        assert stats["pool_size"] == 4
-        assert stats["pool_available"] == 0
-        assert stats["requests_waiting"] == 3
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-async def test_stats_usage(dsn):
-    async def worker(n):
-        try:
-            async with p.connection(timeout=0.3) as conn:
-                await conn.execute("select pg_sleep(0.2)")
-        except PoolTimeout:
-            pass
-
-    async with AsyncNullConnectionPool(dsn, max_size=3) as p:
-        await p.wait(2.0)
-
-        ts = [create_task(worker(i)) for i in range(7)]
-        await asyncio.gather(*ts)
-        stats = p.get_stats()
-        assert stats["requests_num"] == 7
-        assert stats["requests_queued"] == 4
-        assert 850 <= stats["requests_wait_ms"] <= 950
-        assert stats["requests_errors"] == 1
-        assert 1150 <= stats["usage_ms"] <= 1250
-        assert stats.get("returns_bad", 0) == 0
-
-        async with p.connection() as conn:
-            await conn.close()
-        await p.wait()
-        stats = p.pop_stats()
-        assert stats["requests_num"] == 8
-        assert stats["returns_bad"] == 1
-        async with p.connection():
-            pass
-        assert p.get_stats()["requests_num"] == 1
-
-
 @pytest.mark.slow
 async def test_stats_connect(dsn, proxy, monkeypatch):
     proxy.start()
     delay_connection(monkeypatch, 0.2)
-    async with AsyncNullConnectionPool(proxy.client_dsn, max_size=3) as p:
+    async with pool.AsyncNullConnectionPool(proxy.client_dsn, max_size=3) as p:
         await p.wait()
         stats = p.get_stats()
         assert stats["connections_num"] == 1
@@ -907,7 +443,7 @@ async def test_cancellation_in_queue(dsn):
 
     nconns = 3
 
-    async with AsyncNullConnectionPool(
+    async with pool.AsyncNullConnectionPool(
         dsn, min_size=0, max_size=nconns, timeout=1
     ) as p:
         await p.wait()
index 2032fab89c9d448324c396d2e5bf08d4fe514429..fc353069ec394ef44b4edf2567fbb750c58ac9e7 100644 (file)
@@ -13,7 +13,7 @@ from psycopg.pq import TransactionStatus
 from psycopg.rows import class_row, Row, TupleRow
 from psycopg._compat import assert_type, Counter
 
-from ..utils import Event, spawn, gather, sleep, is_alive, is_async
+from ..utils import Event, spawn, gather, sleep, is_async
 
 try:
     import psycopg_pool as pool
@@ -22,13 +22,9 @@ except ImportError:
     pass
 
 
-def test_defaults(dsn):
+def test_default_sizes(dsn):
     with pool.ConnectionPool(dsn) as p:
         assert p.min_size == p.max_size == 4
-        assert p.timeout == 30
-        assert p.max_idle == 10 * 60
-        assert p.max_lifetime == 60 * 60
-        assert p.num_workers == 3
 
 
 @pytest.mark.parametrize("min_size, max_size", [(2, None), (0, 2), (2, 4)])
@@ -44,21 +40,6 @@ def test_bad_size(dsn, min_size, max_size):
         pool.ConnectionPool(min_size=min_size, max_size=max_size)
 
 
-def test_connection_class(dsn):
-    class MyConn(psycopg.Connection[Any]):
-        pass
-
-    with pool.ConnectionPool(dsn, connection_class=MyConn, min_size=1) as p:
-        with p.connection() as conn:
-            assert isinstance(conn, MyConn)
-
-
-def test_kwargs(dsn):
-    with pool.ConnectionPool(dsn, kwargs={"autocommit": True}, min_size=1) as p:
-        with p.connection() as conn:
-            assert conn.autocommit
-
-
 class MyRow(Dict[str, Any]):
     ...
 
@@ -130,12 +111,6 @@ def test_its_really_a_pool(dsn):
             assert conn.info.backend_pid in (pid1, pid2)
 
 
-def test_context(dsn):
-    with pool.ConnectionPool(dsn, min_size=1) as p:
-        assert not p.closed
-    assert p.closed
-
-
 @pytest.mark.crdb_skip("backend pid")
 def test_connection_not_lost(dsn):
     with pool.ConnectionPool(dsn, min_size=1) as p:
@@ -187,29 +162,6 @@ def test_wait_ready(dsn, monkeypatch):
         p.wait(0.0001)  # idempotent
 
 
-def test_wait_closed(dsn):
-    with pool.ConnectionPool(dsn) as p:
-        pass
-
-    with pytest.raises(pool.PoolClosed):
-        p.wait()
-
-
-@pytest.mark.slow
-def test_setup_no_timeout(dsn, proxy):
-    with pytest.raises(pool.PoolTimeout):
-        with pool.ConnectionPool(proxy.client_dsn, min_size=1, num_workers=1) as p:
-            p.wait(0.2)
-
-    with pool.ConnectionPool(proxy.client_dsn, min_size=1, num_workers=1) as p:
-        sleep(0.5)
-        assert not p._pool
-        proxy.start()
-
-        with p.connection() as conn:
-            conn.execute("select 1")
-
-
 def test_configure(dsn):
     inits = 0
 
@@ -238,37 +190,6 @@ def test_configure(dsn):
             assert res.fetchone()[0] == "on"  # type: ignore[index]
 
 
-@pytest.mark.slow
-def test_configure_badstate(dsn, caplog):
-    caplog.set_level(logging.WARNING, logger="psycopg.pool")
-
-    def configure(conn):
-        conn.execute("select 1")
-
-    with pool.ConnectionPool(dsn, min_size=1, configure=configure) as p:
-        with pytest.raises(pool.PoolTimeout):
-            p.wait(timeout=0.5)
-
-    assert caplog.records
-    assert "INTRANS" in caplog.records[0].message
-
-
-@pytest.mark.slow
-def test_configure_broken(dsn, caplog):
-    caplog.set_level(logging.WARNING, logger="psycopg.pool")
-
-    def configure(conn):
-        with conn.transaction():
-            conn.execute("WAT")
-
-    with pool.ConnectionPool(dsn, min_size=1, configure=configure) as p:
-        with pytest.raises(pool.PoolTimeout):
-            p.wait(timeout=0.5)
-
-    assert caplog.records
-    assert "WAT" in caplog.records[0].message
-
-
 def test_reset(dsn):
     resets = 0
 
@@ -341,164 +262,6 @@ def test_reset_broken(dsn, caplog):
     assert "WAT" in caplog.records[0].message
 
 
-@pytest.mark.slow
-@pytest.mark.timing
-@pytest.mark.crdb_skip("backend pid")
-def test_queue(dsn):
-    def worker(n):
-        t0 = time()
-        with p.connection() as conn:
-            conn.execute("select pg_sleep(0.2)")
-            pid = conn.info.backend_pid
-        t1 = time()
-        results.append((n, t1 - t0, pid))
-
-    results: List[Tuple[int, float, int]] = []
-    with pool.ConnectionPool(dsn, min_size=2) as p:
-        p.wait()
-        ts = [spawn(worker, args=(i,)) for i in range(6)]
-        gather(*ts)
-
-    times = [item[1] for item in results]
-    want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
-    for got, want in zip(times, want_times):
-        assert got == pytest.approx(want, 0.1), times
-
-    assert len(set((r[2] for r in results))) == 2, results
-
-
-@pytest.mark.slow
-def test_queue_size(dsn):
-    def worker(t, ev=None):
-        try:
-            with p.connection():
-                if ev:
-                    ev.set()
-                sleep(t)
-        except pool.TooManyRequests as e:
-            errors.append(e)
-        else:
-            success.append(True)
-
-    errors: List[Exception] = []
-    success: List[bool] = []
-
-    with pool.ConnectionPool(dsn, min_size=1, max_waiting=3) as p:
-        p.wait()
-        ev = Event()
-        spawn(worker, args=(0.3, ev))
-        ev.wait()
-
-        ts = [spawn(worker, args=(0.1,)) for i in range(4)]
-        gather(*ts)
-
-    assert len(success) == 4
-    assert len(errors) == 1
-    assert isinstance(errors[0], pool.TooManyRequests)
-    assert p.name in str(errors[0])
-    assert str(p.max_waiting) in str(errors[0])
-    assert p.get_stats()["requests_errors"] == 1
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-@pytest.mark.crdb_skip("backend pid")
-def test_queue_timeout(dsn):
-    def worker(n):
-        t0 = time()
-        try:
-            with p.connection() as conn:
-                conn.execute("select pg_sleep(0.2)")
-                pid = conn.info.backend_pid
-        except pool.PoolTimeout as e:
-            t1 = time()
-            errors.append((n, t1 - t0, e))
-        else:
-            t1 = time()
-            results.append((n, t1 - t0, pid))
-
-    results: List[Tuple[int, float, int]] = []
-    errors: List[Tuple[int, float, Exception]] = []
-
-    with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p:
-        ts = [spawn(worker, args=(i,)) for i in range(4)]
-        gather(*ts)
-
-    assert len(results) == 2
-    assert len(errors) == 2
-    for e in errors:
-        assert 0.1 < e[1] < 0.15
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-def test_dead_client(dsn):
-    def worker(i, timeout):
-        try:
-            with p.connection(timeout=timeout) as conn:
-                conn.execute("select pg_sleep(0.3)")
-                results.append(i)
-        except pool.PoolTimeout:
-            if timeout > 0.2:
-                raise
-
-    with pool.ConnectionPool(dsn, min_size=2) as p:
-        results: List[int] = []
-        ts = [
-            spawn(worker, args=(i, timeout))
-            for (i, timeout) in enumerate([0.4, 0.4, 0.1, 0.4, 0.4])
-        ]
-        gather(*ts)
-
-        sleep(0.2)
-        assert set(results) == set([0, 1, 3, 4])
-        assert len(p._pool) == 2  # no connection was lost
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-@pytest.mark.crdb_skip("backend pid")
-def test_queue_timeout_override(dsn):
-    def worker(n):
-        t0 = time()
-        timeout = 0.25 if n == 3 else None
-        try:
-            with p.connection(timeout=timeout) as conn:
-                conn.execute("select pg_sleep(0.2)")
-                pid = conn.info.backend_pid
-        except pool.PoolTimeout as e:
-            t1 = time()
-            errors.append((n, t1 - t0, e))
-        else:
-            t1 = time()
-            results.append((n, t1 - t0, pid))
-
-    results: List[Tuple[int, float, int]] = []
-    errors: List[Tuple[int, float, Exception]] = []
-
-    with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p:
-        ts = [spawn(worker, args=(i,)) for i in range(4)]
-        gather(*ts)
-
-    assert len(results) == 3
-    assert len(errors) == 1
-    for e in errors:
-        assert 0.1 < e[1] < 0.15
-
-
-@pytest.mark.crdb_skip("backend pid")
-def test_broken_reconnect(dsn):
-    with pool.ConnectionPool(dsn, min_size=1) as p:
-        with p.connection() as conn:
-            pid1 = conn.info.backend_pid
-            conn.close()
-
-        with p.connection() as conn2:
-            pid2 = conn2.info.backend_pid
-
-    assert pid1 != pid2
-
-
 @pytest.mark.crdb_skip("backend pid")
 def test_intrans_rollback(dsn, caplog):
     caplog.set_level(logging.WARNING, logger="psycopg.pool")
@@ -594,38 +357,6 @@ def test_fail_rollback_close(dsn, caplog, monkeypatch):
     assert "BAD" in caplog.records[2].message
 
 
-def test_close_no_tasks(dsn):
-    p = pool.ConnectionPool(dsn)
-    assert p._sched_runner and is_alive(p._sched_runner)
-    workers = p._workers[:]
-    assert workers
-    for t in workers:
-        assert is_alive(t)
-
-    p.close()
-    assert p._sched_runner is None
-    assert not p._workers
-    for t in workers:
-        assert not is_alive(t)
-
-
-def test_putconn_no_pool(conn_cls, dsn):
-    with pool.ConnectionPool(dsn, min_size=1) as p:
-        conn = conn_cls.connect(dsn)
-        with pytest.raises(ValueError):
-            p.putconn(conn)
-
-    conn.close()
-
-
-def test_putconn_wrong_pool(dsn):
-    with pool.ConnectionPool(dsn, min_size=1) as p1:
-        with pool.ConnectionPool(dsn, min_size=1) as p2:
-            conn = p1.getconn()
-            with pytest.raises(ValueError):
-                p2.putconn(conn)
-
-
 def test_del_no_warning(dsn, recwarn):
     p = pool.ConnectionPool(dsn, min_size=2)
     with p.connection() as conn:
@@ -638,132 +369,11 @@ def test_del_no_warning(dsn, recwarn):
     assert not recwarn, [str(w.message) for w in recwarn.list]
 
 
-@pytest.mark.slow
-@pytest.mark.skipif(is_async(__name__), reason="sync test only")
-def test_del_stops_threads(dsn):
-    p = pool.ConnectionPool(dsn)
-    assert p._sched_runner is not None
-    ts = [p._sched_runner] + p._workers
-    del p
-    sleep(0.1)
-    for t in ts:
-        assert not is_alive(t), t
-
-
-def test_closed_getconn(dsn):
-    p = pool.ConnectionPool(dsn, min_size=1)
-    assert not p.closed
-    with p.connection():
-        pass
-
-    p.close()
-    assert p.closed
-
-    with pytest.raises(pool.PoolClosed):
-        with p.connection():
-            pass
-
-
 def test_closed_putconn(dsn):
-    p = pool.ConnectionPool(dsn, min_size=1)
-
-    with p.connection() as conn:
-        pass
-    assert not conn.closed
-
-    with p.connection() as conn:
-        p.close()
-    assert conn.closed
-
-
-def test_closed_queue(dsn):
-    def w1():
+    with pool.ConnectionPool(dsn, min_size=1) as p:
         with p.connection() as conn:
-            e1.set()  # Tell w0 that w1 got a connection
-            cur = conn.execute("select 1")
-            assert cur.fetchone() == (1,)
-            e2.wait()  # Wait until w0 has tested w2
-        success.append("w1")
-
-    def w2():
-        try:
-            with p.connection():
-                pass  # unexpected
-        except pool.PoolClosed:
-            success.append("w2")
-
-    e1 = Event()
-    e2 = Event()
-
-    p = pool.ConnectionPool(dsn, min_size=1)
-    p.wait()
-    success: List[str] = []
-
-    t1 = spawn(w1)
-    # Wait until w1 has received a connection
-    e1.wait()
-
-    t2 = spawn(w2)
-    # Wait until w2 is in the queue
-    ensure_waiting(p)
-    p.close()
-
-    # Wait for the workers to finish
-    e2.set()
-    gather(t1, t2)
-    assert len(success) == 2
-
-
-def test_open_explicit(dsn):
-    p = pool.ConnectionPool(dsn, open=False)
-    assert p.closed
-    with pytest.raises(pool.PoolClosed, match="is not open yet"):
-        p.getconn()
-
-    with pytest.raises(pool.PoolClosed, match="is not open yet"):
-        with p.connection():
             pass
-
-    p.open()
-    try:
-        assert not p.closed
-
-        with p.connection() as conn:
-            cur = conn.execute("select 1")
-            assert cur.fetchone() == (1,)
-    finally:
-        p.close()
-
-    with pytest.raises(pool.PoolClosed, match="is already closed"):
-        p.getconn()
-
-
-def test_open_context(dsn):
-    p = pool.ConnectionPool(dsn, open=False)
-    assert p.closed
-
-    with p:
-        assert not p.closed
-
-        with p.connection() as conn:
-            cur = conn.execute("select 1")
-            assert cur.fetchone() == (1,)
-
-    assert p.closed
-
-
-def test_open_no_op(dsn):
-    p = pool.ConnectionPool(dsn)
-    try:
-        assert not p.closed
-        p.open()
-        assert not p.closed
-
-        with p.connection() as conn:
-            cur = conn.execute("select 1")
-            assert cur.fetchone() == (1,)
-    finally:
-        p.close()
+        assert not conn.closed
 
 
 @pytest.mark.slow
@@ -796,18 +406,6 @@ def test_open_as_wait(dsn, monkeypatch):
         p.open(wait=True, timeout=0.5)
 
 
-def test_reopen(dsn):
-    p = pool.ConnectionPool(dsn)
-    with p.connection() as conn:
-        conn.execute("select 1")
-    p.close()
-    assert p._sched_runner is None
-    assert not p._workers
-
-    with pytest.raises(psycopg.OperationalError, match="cannot be reused"):
-        p.open()
-
-
 @pytest.mark.slow
 @pytest.mark.timing
 @pytest.mark.parametrize(
@@ -1084,12 +682,6 @@ def test_bad_resize(dsn, min_size, max_size):
             p.resize(min_size=min_size, max_size=max_size)
 
 
-def test_jitter():
-    rnds = [pool.ConnectionPool._jitter(30, -0.1, +0.2) for i in range(100)]
-    assert 27 <= min(rnds) <= 28
-    assert 35 < max(rnds) < 36
-
-
 @pytest.mark.slow
 @pytest.mark.timing
 @pytest.mark.crdb_skip("backend pid")
@@ -1148,79 +740,6 @@ def test_check_max_lifetime(dsn):
             assert conn.info.backend_pid != pid
 
 
-@pytest.mark.slow
-@pytest.mark.timing
-def test_stats_measures(dsn):
-    def worker(n):
-        with p.connection() as conn:
-            conn.execute("select pg_sleep(0.2)")
-
-    with pool.ConnectionPool(dsn, min_size=2, max_size=4) as p:
-        p.wait(2.0)
-
-        stats = p.get_stats()
-        assert stats["pool_min"] == 2
-        assert stats["pool_max"] == 4
-        assert stats["pool_size"] == 2
-        assert stats["pool_available"] == 2
-        assert stats["requests_waiting"] == 0
-
-        ts = [spawn(worker, args=(i,)) for i in range(3)]
-        sleep(0.1)
-        stats = p.get_stats()
-        gather(*ts)
-        assert stats["pool_min"] == 2
-        assert stats["pool_max"] == 4
-        assert stats["pool_size"] == 3
-        assert stats["pool_available"] == 0
-        assert stats["requests_waiting"] == 0
-
-        p.wait(2.0)
-        ts = [spawn(worker, args=(i,)) for i in range(7)]
-        sleep(0.1)
-        stats = p.get_stats()
-        gather(*ts)
-        assert stats["pool_min"] == 2
-        assert stats["pool_max"] == 4
-        assert stats["pool_size"] == 4
-        assert stats["pool_available"] == 0
-        assert stats["requests_waiting"] == 3
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-def test_stats_usage(dsn):
-    def worker(n):
-        try:
-            with p.connection(timeout=0.3) as conn:
-                conn.execute("select pg_sleep(0.2)")
-        except pool.PoolTimeout:
-            pass
-
-    with pool.ConnectionPool(dsn, min_size=3) as p:
-        p.wait(2.0)
-
-        ts = [spawn(worker, args=(i,)) for i in range(7)]
-        gather(*ts)
-        stats = p.get_stats()
-        assert stats["requests_num"] == 7
-        assert stats["requests_queued"] == 4
-        assert 850 <= stats["requests_wait_ms"] <= 950
-        assert stats["requests_errors"] == 1
-        assert 1150 <= stats["usage_ms"] <= 1250
-        assert stats.get("returns_bad", 0) == 0
-
-        with p.connection() as conn:
-            conn.close()
-        p.wait()
-        stats = p.pop_stats()
-        assert stats["requests_num"] == 8
-        assert stats["returns_bad"] == 1
-        with p.connection():
-            pass
-        assert p.get_stats()["requests_num"] == 1
-
-
 @pytest.mark.slow
 def test_stats_connect(dsn, proxy, monkeypatch):
     proxy.start()
index 92a8f7bf59a4b61d54c3c3cd4e494516cb1ad531..ef6db8d2286143fc27f196b8e3a7321baf73df9c 100644 (file)
@@ -10,7 +10,7 @@ from psycopg.pq import TransactionStatus
 from psycopg.rows import class_row, Row, TupleRow
 from psycopg._compat import assert_type, Counter
 
-from ..utils import AEvent, spawn, gather, asleep, is_alive, is_async
+from ..utils import AEvent, spawn, gather, asleep, is_async
 
 try:
     import psycopg_pool as pool
@@ -22,13 +22,9 @@ if True:  # ASYNC
     pytestmark = [pytest.mark.anyio]
 
 
-async def test_defaults(dsn):
+async def test_default_sizes(dsn):
     async with pool.AsyncConnectionPool(dsn) as p:
         assert p.min_size == p.max_size == 4
-        assert p.timeout == 30
-        assert p.max_idle == 10 * 60
-        assert p.max_lifetime == 60 * 60
-        assert p.num_workers == 3
 
 
 @pytest.mark.parametrize("min_size, max_size", [(2, None), (0, 2), (2, 4)])
@@ -44,23 +40,6 @@ async def test_bad_size(dsn, min_size, max_size):
         pool.AsyncConnectionPool(min_size=min_size, max_size=max_size)
 
 
-async def test_connection_class(dsn):
-    class MyConn(psycopg.AsyncConnection[Any]):
-        pass
-
-    async with pool.AsyncConnectionPool(dsn, connection_class=MyConn, min_size=1) as p:
-        async with p.connection() as conn:
-            assert isinstance(conn, MyConn)
-
-
-async def test_kwargs(dsn):
-    async with pool.AsyncConnectionPool(
-        dsn, kwargs={"autocommit": True}, min_size=1
-    ) as p:
-        async with p.connection() as conn:
-            assert conn.autocommit
-
-
 class MyRow(Dict[str, Any]):
     ...
 
@@ -136,12 +115,6 @@ async def test_its_really_a_pool(dsn):
             assert conn.info.backend_pid in (pid1, pid2)
 
 
-async def test_context(dsn):
-    async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
-        assert not p.closed
-    assert p.closed
-
-
 @pytest.mark.crdb_skip("backend pid")
 async def test_connection_not_lost(dsn):
     async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
@@ -193,33 +166,6 @@ async def test_wait_ready(dsn, monkeypatch):
         await p.wait(0.0001)  # idempotent
 
 
-async def test_wait_closed(dsn):
-    async with pool.AsyncConnectionPool(dsn) as p:
-        pass
-
-    with pytest.raises(pool.PoolClosed):
-        await p.wait()
-
-
-@pytest.mark.slow
-async def test_setup_no_timeout(dsn, proxy):
-    with pytest.raises(pool.PoolTimeout):
-        async with pool.AsyncConnectionPool(
-            proxy.client_dsn, min_size=1, num_workers=1
-        ) as p:
-            await p.wait(0.2)
-
-    async with pool.AsyncConnectionPool(
-        proxy.client_dsn, min_size=1, num_workers=1
-    ) as p:
-        await asleep(0.5)
-        assert not p._pool
-        proxy.start()
-
-        async with p.connection() as conn:
-            await conn.execute("select 1")
-
-
 async def test_configure(dsn):
     inits = 0
 
@@ -248,37 +194,6 @@ async def test_configure(dsn):
             assert (await res.fetchone())[0] == "on"  # type: ignore[index]
 
 
-@pytest.mark.slow
-async def test_configure_badstate(dsn, caplog):
-    caplog.set_level(logging.WARNING, logger="psycopg.pool")
-
-    async def configure(conn):
-        await conn.execute("select 1")
-
-    async with pool.AsyncConnectionPool(dsn, min_size=1, configure=configure) as p:
-        with pytest.raises(pool.PoolTimeout):
-            await p.wait(timeout=0.5)
-
-    assert caplog.records
-    assert "INTRANS" in caplog.records[0].message
-
-
-@pytest.mark.slow
-async def test_configure_broken(dsn, caplog):
-    caplog.set_level(logging.WARNING, logger="psycopg.pool")
-
-    async def configure(conn):
-        async with conn.transaction():
-            await conn.execute("WAT")
-
-    async with pool.AsyncConnectionPool(dsn, min_size=1, configure=configure) as p:
-        with pytest.raises(pool.PoolTimeout):
-            await p.wait(timeout=0.5)
-
-    assert caplog.records
-    assert "WAT" in caplog.records[0].message
-
-
 async def test_reset(dsn):
     resets = 0
 
@@ -351,164 +266,6 @@ async def test_reset_broken(dsn, caplog):
     assert "WAT" in caplog.records[0].message
 
 
-@pytest.mark.slow
-@pytest.mark.timing
-@pytest.mark.crdb_skip("backend pid")
-async def test_queue(dsn):
-    async def worker(n):
-        t0 = time()
-        async with p.connection() as conn:
-            await conn.execute("select pg_sleep(0.2)")
-            pid = conn.info.backend_pid
-        t1 = time()
-        results.append((n, t1 - t0, pid))
-
-    results: List[Tuple[int, float, int]] = []
-    async with pool.AsyncConnectionPool(dsn, min_size=2) as p:
-        await p.wait()
-        ts = [spawn(worker, args=(i,)) for i in range(6)]
-        await gather(*ts)
-
-    times = [item[1] for item in results]
-    want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
-    for got, want in zip(times, want_times):
-        assert got == pytest.approx(want, 0.1), times
-
-    assert len(set(r[2] for r in results)) == 2, results
-
-
-@pytest.mark.slow
-async def test_queue_size(dsn):
-    async def worker(t, ev=None):
-        try:
-            async with p.connection():
-                if ev:
-                    ev.set()
-                await asleep(t)
-        except pool.TooManyRequests as e:
-            errors.append(e)
-        else:
-            success.append(True)
-
-    errors: List[Exception] = []
-    success: List[bool] = []
-
-    async with pool.AsyncConnectionPool(dsn, min_size=1, max_waiting=3) as p:
-        await p.wait()
-        ev = AEvent()
-        spawn(worker, args=(0.3, ev))
-        await ev.wait()
-
-        ts = [spawn(worker, args=(0.1,)) for i in range(4)]
-        await gather(*ts)
-
-    assert len(success) == 4
-    assert len(errors) == 1
-    assert isinstance(errors[0], pool.TooManyRequests)
-    assert p.name in str(errors[0])
-    assert str(p.max_waiting) in str(errors[0])
-    assert p.get_stats()["requests_errors"] == 1
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-@pytest.mark.crdb_skip("backend pid")
-async def test_queue_timeout(dsn):
-    async def worker(n):
-        t0 = time()
-        try:
-            async with p.connection() as conn:
-                await conn.execute("select pg_sleep(0.2)")
-                pid = conn.info.backend_pid
-        except pool.PoolTimeout as e:
-            t1 = time()
-            errors.append((n, t1 - t0, e))
-        else:
-            t1 = time()
-            results.append((n, t1 - t0, pid))
-
-    results: List[Tuple[int, float, int]] = []
-    errors: List[Tuple[int, float, Exception]] = []
-
-    async with pool.AsyncConnectionPool(dsn, min_size=2, timeout=0.1) as p:
-        ts = [spawn(worker, args=(i,)) for i in range(4)]
-        await gather(*ts)
-
-    assert len(results) == 2
-    assert len(errors) == 2
-    for e in errors:
-        assert 0.1 < e[1] < 0.15
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-async def test_dead_client(dsn):
-    async def worker(i, timeout):
-        try:
-            async with p.connection(timeout=timeout) as conn:
-                await conn.execute("select pg_sleep(0.3)")
-                results.append(i)
-        except pool.PoolTimeout:
-            if timeout > 0.2:
-                raise
-
-    async with pool.AsyncConnectionPool(dsn, min_size=2) as p:
-        results: List[int] = []
-        ts = [
-            spawn(worker, args=(i, timeout))
-            for i, timeout in enumerate([0.4, 0.4, 0.1, 0.4, 0.4])
-        ]
-        await gather(*ts)
-
-        await asleep(0.2)
-        assert set(results) == set([0, 1, 3, 4])
-        assert len(p._pool) == 2  # no connection was lost
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-@pytest.mark.crdb_skip("backend pid")
-async def test_queue_timeout_override(dsn):
-    async def worker(n):
-        t0 = time()
-        timeout = 0.25 if n == 3 else None
-        try:
-            async with p.connection(timeout=timeout) as conn:
-                await conn.execute("select pg_sleep(0.2)")
-                pid = conn.info.backend_pid
-        except pool.PoolTimeout as e:
-            t1 = time()
-            errors.append((n, t1 - t0, e))
-        else:
-            t1 = time()
-            results.append((n, t1 - t0, pid))
-
-    results: List[Tuple[int, float, int]] = []
-    errors: List[Tuple[int, float, Exception]] = []
-
-    async with pool.AsyncConnectionPool(dsn, min_size=2, timeout=0.1) as p:
-        ts = [spawn(worker, args=(i,)) for i in range(4)]
-        await gather(*ts)
-
-    assert len(results) == 3
-    assert len(errors) == 1
-    for e in errors:
-        assert 0.1 < e[1] < 0.15
-
-
-@pytest.mark.crdb_skip("backend pid")
-async def test_broken_reconnect(dsn):
-    async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
-        async with p.connection() as conn:
-            pid1 = conn.info.backend_pid
-            await conn.close()
-
-        async with p.connection() as conn2:
-            pid2 = conn2.info.backend_pid
-
-    assert pid1 != pid2
-
-
 @pytest.mark.crdb_skip("backend pid")
 async def test_intrans_rollback(dsn, caplog):
     caplog.set_level(logging.WARNING, logger="psycopg.pool")
@@ -604,38 +361,6 @@ async def test_fail_rollback_close(dsn, caplog, monkeypatch):
     assert "BAD" in caplog.records[2].message
 
 
-async def test_close_no_tasks(dsn):
-    p = pool.AsyncConnectionPool(dsn)
-    assert p._sched_runner and is_alive(p._sched_runner)
-    workers = p._workers[:]
-    assert workers
-    for t in workers:
-        assert is_alive(t)
-
-    await p.close()
-    assert p._sched_runner is None
-    assert not p._workers
-    for t in workers:
-        assert not is_alive(t)
-
-
-async def test_putconn_no_pool(aconn_cls, dsn):
-    async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
-        conn = await aconn_cls.connect(dsn)
-        with pytest.raises(ValueError):
-            await p.putconn(conn)
-
-    await conn.close()
-
-
-async def test_putconn_wrong_pool(dsn):
-    async with pool.AsyncConnectionPool(dsn, min_size=1) as p1:
-        async with pool.AsyncConnectionPool(dsn, min_size=1) as p2:
-            conn = await p1.getconn()
-            with pytest.raises(ValueError):
-                await p2.putconn(conn)
-
-
 async def test_del_no_warning(dsn, recwarn):
     p = pool.AsyncConnectionPool(dsn, min_size=2)
     async with p.connection() as conn:
@@ -648,134 +373,11 @@ async def test_del_no_warning(dsn, recwarn):
     assert not recwarn, [str(w.message) for w in recwarn.list]
 
 
-@pytest.mark.slow
-@pytest.mark.skipif(is_async(__name__), reason="sync test only")
-async def test_del_stops_threads(dsn):
-    p = pool.AsyncConnectionPool(dsn)
-    assert p._sched_runner is not None
-    ts = [p._sched_runner] + p._workers
-    del p
-    await asleep(0.1)
-    for t in ts:
-        assert not is_alive(t), t
-
-
-async def test_closed_getconn(dsn):
-    p = pool.AsyncConnectionPool(dsn, min_size=1)
-    assert not p.closed
-    async with p.connection():
-        pass
-
-    await p.close()
-    assert p.closed
-
-    with pytest.raises(pool.PoolClosed):
-        async with p.connection():
-            pass
-
-
 async def test_closed_putconn(dsn):
-    p = pool.AsyncConnectionPool(dsn, min_size=1)
-
-    async with p.connection() as conn:
-        pass
-    assert not conn.closed
-
-    async with p.connection() as conn:
-        await p.close()
-    assert conn.closed
-
-
-async def test_closed_queue(dsn):
-    async def w1():
+    async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
         async with p.connection() as conn:
-            e1.set()  # Tell w0 that w1 got a connection
-            cur = await conn.execute("select 1")
-            assert await cur.fetchone() == (1,)
-            await e2.wait()  # Wait until w0 has tested w2
-        success.append("w1")
-
-    async def w2():
-        try:
-            async with p.connection():
-                pass  # unexpected
-        except pool.PoolClosed:
-            success.append("w2")
-
-    e1 = AEvent()
-    e2 = AEvent()
-
-    p = pool.AsyncConnectionPool(dsn, min_size=1)
-    await p.wait()
-    success: List[str] = []
-
-    t1 = spawn(w1)
-    # Wait until w1 has received a connection
-    await e1.wait()
-
-    t2 = spawn(w2)
-    # Wait until w2 is in the queue
-    await ensure_waiting(p)
-    await p.close()
-
-    # Wait for the workers to finish
-    e2.set()
-    await gather(t1, t2)
-    assert len(success) == 2
-
-
-async def test_open_explicit(dsn):
-    p = pool.AsyncConnectionPool(dsn, open=False)
-    assert p.closed
-    with pytest.raises(pool.PoolClosed, match="is not open yet"):
-        await p.getconn()
-
-    with pytest.raises(pool.PoolClosed, match="is not open yet"):
-        async with p.connection():
             pass
-
-    await p.open()
-    try:
-        assert not p.closed
-
-        async with p.connection() as conn:
-            cur = await conn.execute("select 1")
-            assert await cur.fetchone() == (1,)
-
-    finally:
-        await p.close()
-
-    with pytest.raises(pool.PoolClosed, match="is already closed"):
-        await p.getconn()
-
-
-async def test_open_context(dsn):
-    p = pool.AsyncConnectionPool(dsn, open=False)
-    assert p.closed
-
-    async with p:
-        assert not p.closed
-
-        async with p.connection() as conn:
-            cur = await conn.execute("select 1")
-            assert await cur.fetchone() == (1,)
-
-    assert p.closed
-
-
-async def test_open_no_op(dsn):
-    p = pool.AsyncConnectionPool(dsn)
-    try:
-        assert not p.closed
-        await p.open()
-        assert not p.closed
-
-        async with p.connection() as conn:
-            cur = await conn.execute("select 1")
-            assert await cur.fetchone() == (1,)
-
-    finally:
-        await p.close()
+        assert not conn.closed
 
 
 @pytest.mark.slow
@@ -808,18 +410,6 @@ async def test_open_as_wait(dsn, monkeypatch):
         await p.open(wait=True, timeout=0.5)
 
 
-async def test_reopen(dsn):
-    p = pool.AsyncConnectionPool(dsn)
-    async with p.connection() as conn:
-        await conn.execute("select 1")
-    await p.close()
-    assert p._sched_runner is None
-    assert not p._workers
-
-    with pytest.raises(psycopg.OperationalError, match="cannot be reused"):
-        await p.open()
-
-
 @pytest.mark.slow
 @pytest.mark.timing
 @pytest.mark.parametrize(
@@ -1098,12 +688,6 @@ async def test_bad_resize(dsn, min_size, max_size):
             await p.resize(min_size=min_size, max_size=max_size)
 
 
-async def test_jitter():
-    rnds = [pool.AsyncConnectionPool._jitter(30, -0.1, +0.2) for i in range(100)]
-    assert 27 <= min(rnds) <= 28
-    assert 35 < max(rnds) < 36
-
-
 @pytest.mark.slow
 @pytest.mark.timing
 @pytest.mark.crdb_skip("backend pid")
@@ -1162,79 +746,6 @@ async def test_check_max_lifetime(dsn):
             assert conn.info.backend_pid != pid
 
 
-@pytest.mark.slow
-@pytest.mark.timing
-async def test_stats_measures(dsn):
-    async def worker(n):
-        async with p.connection() as conn:
-            await conn.execute("select pg_sleep(0.2)")
-
-    async with pool.AsyncConnectionPool(dsn, min_size=2, max_size=4) as p:
-        await p.wait(2.0)
-
-        stats = p.get_stats()
-        assert stats["pool_min"] == 2
-        assert stats["pool_max"] == 4
-        assert stats["pool_size"] == 2
-        assert stats["pool_available"] == 2
-        assert stats["requests_waiting"] == 0
-
-        ts = [spawn(worker, args=(i,)) for i in range(3)]
-        await asleep(0.1)
-        stats = p.get_stats()
-        await gather(*ts)
-        assert stats["pool_min"] == 2
-        assert stats["pool_max"] == 4
-        assert stats["pool_size"] == 3
-        assert stats["pool_available"] == 0
-        assert stats["requests_waiting"] == 0
-
-        await p.wait(2.0)
-        ts = [spawn(worker, args=(i,)) for i in range(7)]
-        await asleep(0.1)
-        stats = p.get_stats()
-        await gather(*ts)
-        assert stats["pool_min"] == 2
-        assert stats["pool_max"] == 4
-        assert stats["pool_size"] == 4
-        assert stats["pool_available"] == 0
-        assert stats["requests_waiting"] == 3
-
-
-@pytest.mark.slow
-@pytest.mark.timing
-async def test_stats_usage(dsn):
-    async def worker(n):
-        try:
-            async with p.connection(timeout=0.3) as conn:
-                await conn.execute("select pg_sleep(0.2)")
-        except pool.PoolTimeout:
-            pass
-
-    async with pool.AsyncConnectionPool(dsn, min_size=3) as p:
-        await p.wait(2.0)
-
-        ts = [spawn(worker, args=(i,)) for i in range(7)]
-        await gather(*ts)
-        stats = p.get_stats()
-        assert stats["requests_num"] == 7
-        assert stats["requests_queued"] == 4
-        assert 850 <= stats["requests_wait_ms"] <= 950
-        assert stats["requests_errors"] == 1
-        assert 1150 <= stats["usage_ms"] <= 1250
-        assert stats.get("returns_bad", 0) == 0
-
-        async with p.connection() as conn:
-            await conn.close()
-        await p.wait()
-        stats = p.pop_stats()
-        assert stats["requests_num"] == 8
-        assert stats["returns_bad"] == 1
-        async with p.connection():
-            pass
-        assert p.get_stats()["requests_num"] == 1
-
-
 @pytest.mark.slow
 async def test_stats_connect(dsn, proxy, monkeypatch):
     proxy.start()
diff --git a/tests/pool/test_pool_common.py b/tests/pool/test_pool_common.py
new file mode 100644 (file)
index 0000000..1ae013e
--- /dev/null
@@ -0,0 +1,621 @@
+# WARNING: this file is auto-generated by 'async_to_sync.py'
+# from the original file 'test_pool_common_async.py'
+# DO NOT CHANGE! Change the original file instead.
+import logging
+from time import time
+from typing import Any, List, Tuple
+
+import pytest
+
+import psycopg
+
+from ..utils import Event, spawn, gather, sleep, is_alive, is_async
+
+try:
+    import psycopg_pool as pool
+except ImportError:
+    # Tests should have been skipped if the package is not available
+    pass
+
+
+@pytest.fixture(params=[pool.ConnectionPool, pool.NullConnectionPool])
+def pool_cls(request):
+    return request.param
+
+
+def test_defaults(pool_cls, dsn):
+    with pool_cls(dsn) as p:
+        assert p.timeout == 30
+        assert p.max_idle == 10 * 60
+        assert p.max_lifetime == 60 * 60
+        assert p.num_workers == 3
+
+
+def test_connection_class(pool_cls, dsn):
+    class MyConn(psycopg.Connection[Any]):
+        pass
+
+    with pool_cls(dsn, connection_class=MyConn, min_size=min_size(pool_cls)) as p:
+        with p.connection() as conn:
+            assert isinstance(conn, MyConn)
+
+
+def test_kwargs(pool_cls, dsn):
+    with pool_cls(dsn, kwargs={"autocommit": True}, min_size=min_size(pool_cls)) as p:
+        with p.connection() as conn:
+            assert conn.autocommit
+
+
+def test_context(pool_cls, dsn):
+    with pool_cls(dsn, min_size=min_size(pool_cls)) as p:
+        assert not p.closed
+    assert p.closed
+
+
+def test_wait_closed(pool_cls, dsn):
+    with pool_cls(dsn) as p:
+        pass
+
+    with pytest.raises(pool.PoolClosed):
+        p.wait()
+
+
+@pytest.mark.slow
+def test_setup_no_timeout(pool_cls, dsn, proxy):
+    with pytest.raises(pool.PoolTimeout):
+        with pool_cls(
+            proxy.client_dsn, min_size=min_size(pool_cls), num_workers=1
+        ) as p:
+            p.wait(0.2)
+
+    with pool_cls(proxy.client_dsn, min_size=min_size(pool_cls), num_workers=1) as p:
+        sleep(0.5)
+        assert not p._pool
+        proxy.start()
+
+        with p.connection() as conn:
+            conn.execute("select 1")
+
+
+@pytest.mark.slow
+def test_configure_badstate(pool_cls, dsn, caplog):
+    caplog.set_level(logging.WARNING, logger="psycopg.pool")
+
+    def configure(conn):
+        conn.execute("select 1")
+
+    with pool_cls(dsn, min_size=min_size(pool_cls), configure=configure) as p:
+        with pytest.raises(pool.PoolTimeout):
+            p.wait(timeout=0.5)
+
+    assert caplog.records
+    assert "INTRANS" in caplog.records[0].message
+
+
+@pytest.mark.slow
+def test_configure_broken(pool_cls, dsn, caplog):
+    caplog.set_level(logging.WARNING, logger="psycopg.pool")
+
+    def configure(conn):
+        with conn.transaction():
+            conn.execute("WAT")
+
+    with pool_cls(dsn, min_size=min_size(pool_cls), configure=configure) as p:
+        with pytest.raises(pool.PoolTimeout):
+            p.wait(timeout=0.5)
+
+    assert caplog.records
+    assert "WAT" in caplog.records[0].message
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+@pytest.mark.crdb_skip("backend pid")
+def test_queue(pool_cls, dsn):
+    def worker(n):
+        t0 = time()
+        with p.connection() as conn:
+            conn.execute("select pg_sleep(0.2)")
+            pid = conn.info.backend_pid
+        t1 = time()
+        results.append((n, t1 - t0, pid))
+
+    results: List[Tuple[int, float, int]] = []
+    with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2) as p:
+        p.wait()
+        ts = [spawn(worker, args=(i,)) for i in range(6)]
+        gather(*ts)
+
+    times = [item[1] for item in results]
+    want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
+    for got, want in zip(times, want_times):
+        assert got == pytest.approx(want, 0.2), times
+
+    assert len(set((r[2] for r in results))) == 2, results
+
+
+@pytest.mark.slow
+def test_queue_size(pool_cls, dsn):
+    def worker(t, ev=None):
+        try:
+            with p.connection():
+                if ev:
+                    ev.set()
+                sleep(t)
+        except pool.TooManyRequests as e:
+            errors.append(e)
+        else:
+            success.append(True)
+
+    errors: List[Exception] = []
+    success: List[bool] = []
+
+    with pool_cls(dsn, min_size=min_size(pool_cls), max_size=1, max_waiting=3) as p:
+        p.wait()
+        ev = Event()
+        spawn(worker, args=(0.3, ev))
+        ev.wait()
+
+        ts = [spawn(worker, args=(0.1,)) for i in range(4)]
+        gather(*ts)
+
+    assert len(success) == 4
+    assert len(errors) == 1
+    assert isinstance(errors[0], pool.TooManyRequests)
+    assert p.name in str(errors[0])
+    assert str(p.max_waiting) in str(errors[0])
+    assert p.get_stats()["requests_errors"] == 1
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+@pytest.mark.crdb_skip("backend pid")
+def test_queue_timeout(pool_cls, dsn):
+    def worker(n):
+        t0 = time()
+        try:
+            with p.connection() as conn:
+                conn.execute("select pg_sleep(0.2)")
+                pid = conn.info.backend_pid
+        except pool.PoolTimeout as e:
+            t1 = time()
+            errors.append((n, t1 - t0, e))
+        else:
+            t1 = time()
+            results.append((n, t1 - t0, pid))
+
+    results: List[Tuple[int, float, int]] = []
+    errors: List[Tuple[int, float, Exception]] = []
+
+    with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2, timeout=0.1) as p:
+        ts = [spawn(worker, args=(i,)) for i in range(4)]
+        gather(*ts)
+
+    assert len(results) == 2
+    assert len(errors) == 2
+    for e in errors:
+        assert 0.1 < e[1] < 0.15
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+def test_dead_client(pool_cls, dsn):
+    def worker(i, timeout):
+        try:
+            with p.connection(timeout=timeout) as conn:
+                conn.execute("select pg_sleep(0.3)")
+                results.append(i)
+        except pool.PoolTimeout:
+            if timeout > 0.2:
+                raise
+
+    with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2) as p:
+        results: List[int] = []
+        ts = [
+            spawn(worker, args=(i, timeout))
+            for (i, timeout) in enumerate([0.4, 0.4, 0.1, 0.4, 0.4])
+        ]
+        gather(*ts)
+
+        sleep(0.2)
+        assert set(results) == set([0, 1, 3, 4])
+        if pool_cls is pool.ConnectionPool:
+            assert len(p._pool) == 2  # no connection was lost
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+@pytest.mark.crdb_skip("backend pid")
+def test_queue_timeout_override(pool_cls, dsn):
+    def worker(n):
+        t0 = time()
+        timeout = 0.25 if n == 3 else None
+        try:
+            with p.connection(timeout=timeout) as conn:
+                conn.execute("select pg_sleep(0.2)")
+                pid = conn.info.backend_pid
+        except pool.PoolTimeout as e:
+            t1 = time()
+            errors.append((n, t1 - t0, e))
+        else:
+            t1 = time()
+            results.append((n, t1 - t0, pid))
+
+    results: List[Tuple[int, float, int]] = []
+    errors: List[Tuple[int, float, Exception]] = []
+
+    with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2, timeout=0.1) as p:
+        ts = [spawn(worker, args=(i,)) for i in range(4)]
+        gather(*ts)
+
+    assert len(results) == 3
+    assert len(errors) == 1
+    for e in errors:
+        assert 0.1 < e[1] < 0.15
+
+
+@pytest.mark.crdb_skip("backend pid")
+def test_broken_reconnect(pool_cls, dsn):
+    with pool_cls(dsn, min_size=min_size(pool_cls), max_size=1) as p:
+        with p.connection() as conn:
+            pid1 = conn.info.backend_pid
+            conn.close()
+
+        with p.connection() as conn2:
+            pid2 = conn2.info.backend_pid
+
+    assert pid1 != pid2
+
+
+def test_close_no_tasks(pool_cls, dsn):
+    p = pool_cls(dsn)
+    assert p._sched_runner and is_alive(p._sched_runner)
+    workers = p._workers[:]
+    assert workers
+    for t in workers:
+        assert is_alive(t)
+
+    p.close()
+    assert p._sched_runner is None
+    assert not p._workers
+    for t in workers:
+        assert not is_alive(t)
+
+
+def test_putconn_no_pool(pool_cls, conn_cls, dsn):
+    with pool_cls(dsn, min_size=min_size(pool_cls)) as p:
+        conn = conn_cls.connect(dsn)
+        with pytest.raises(ValueError):
+            p.putconn(conn)
+
+    conn.close()
+
+
+def test_putconn_wrong_pool(pool_cls, dsn):
+    with pool_cls(dsn, min_size=min_size(pool_cls)) as p1:
+        with pool_cls(dsn, min_size=min_size(pool_cls)) as p2:
+            conn = p1.getconn()
+            with pytest.raises(ValueError):
+                p2.putconn(conn)
+
+
+@pytest.mark.slow
+@pytest.mark.skipif(is_async(__name__), reason="sync test only")
+def test_del_stops_threads(pool_cls, dsn):
+    p = pool_cls(dsn)
+    assert p._sched_runner is not None
+    ts = [p._sched_runner] + p._workers
+    del p
+    sleep(0.1)
+    for t in ts:
+        assert not is_alive(t), t
+
+
+def test_closed_getconn(pool_cls, dsn):
+    p = pool_cls(dsn, min_size=min_size(pool_cls))
+    assert not p.closed
+    with p.connection():
+        pass
+
+    p.close()
+    assert p.closed
+
+    with pytest.raises(pool.PoolClosed):
+        with p.connection():
+            pass
+
+
+def test_close_connection_on_pool_close(pool_cls, dsn):
+    p = pool_cls(dsn, min_size=min_size(pool_cls))
+    with p.connection() as conn:
+        p.close()
+    assert conn.closed
+
+
+def test_closed_queue(pool_cls, dsn):
+    def w1():
+        with p.connection() as conn:
+            e1.set()  # Tell w0 that w1 got a connection
+            cur = conn.execute("select 1")
+            assert cur.fetchone() == (1,)
+            e2.wait()  # Wait until w0 has tested w2
+        success.append("w1")
+
+    def w2():
+        try:
+            with p.connection():
+                pass  # unexpected
+        except pool.PoolClosed:
+            success.append("w2")
+
+    e1 = Event()
+    e2 = Event()
+
+    p = pool_cls(dsn, min_size=min_size(pool_cls), max_size=1)
+    p.wait()
+    success: List[str] = []
+
+    t1 = spawn(w1)
+    # Wait until w1 has received a connection
+    e1.wait()
+
+    t2 = spawn(w2)
+    # Wait until w2 is in the queue
+    ensure_waiting(p)
+    p.close()
+
+    # Wait for the workers to finish
+    e2.set()
+    gather(t1, t2)
+    assert len(success) == 2
+
+
+def test_open_explicit(pool_cls, dsn):
+    p = pool_cls(dsn, open=False)
+    assert p.closed
+    with pytest.raises(pool.PoolClosed, match="is not open yet"):
+        p.getconn()
+
+    with pytest.raises(pool.PoolClosed, match="is not open yet"):
+        with p.connection():
+            pass
+
+    p.open()
+    try:
+        assert not p.closed
+
+        with p.connection() as conn:
+            cur = conn.execute("select 1")
+            assert cur.fetchone() == (1,)
+    finally:
+        p.close()
+
+    with pytest.raises(pool.PoolClosed, match="is already closed"):
+        p.getconn()
+
+
+def test_open_context(pool_cls, dsn):
+    p = pool_cls(dsn, open=False)
+    assert p.closed
+
+    with p:
+        assert not p.closed
+
+        with p.connection() as conn:
+            cur = conn.execute("select 1")
+            assert cur.fetchone() == (1,)
+
+    assert p.closed
+
+
+def test_open_no_op(pool_cls, dsn):
+    p = pool_cls(dsn)
+    try:
+        assert not p.closed
+        p.open()
+        assert not p.closed
+
+        with p.connection() as conn:
+            cur = conn.execute("select 1")
+            assert cur.fetchone() == (1,)
+    finally:
+        p.close()
+
+
+def test_reopen(pool_cls, dsn):
+    p = pool_cls(dsn)
+    with p.connection() as conn:
+        conn.execute("select 1")
+    p.close()
+    assert p._sched_runner is None
+    assert not p._workers
+
+    with pytest.raises(psycopg.OperationalError, match="cannot be reused"):
+        p.open()
+
+
+def test_jitter(pool_cls):
+    rnds = [pool_cls._jitter(30, -0.1, +0.2) for i in range(100)]
+    assert 27 <= min(rnds) <= 28
+    assert 35 < max(rnds) < 36
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+def test_stats_measures(pool_cls, dsn):
+    def worker(n):
+        with p.connection() as conn:
+            conn.execute("select pg_sleep(0.2)")
+
+    with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=4) as p:
+        p.wait(2.0)
+
+        stats = p.get_stats()
+        assert stats["pool_min"] == min_size(pool_cls, 2)
+        assert stats["pool_max"] == 4
+        assert stats["pool_size"] == min_size(pool_cls, 2)
+        assert stats["pool_available"] == min_size(pool_cls, 2)
+        assert stats["requests_waiting"] == 0
+
+        ts = [spawn(worker, args=(i,)) for i in range(3)]
+        sleep(0.1)
+        stats = p.get_stats()
+        gather(*ts)
+        assert stats["pool_min"] == min_size(pool_cls, 2)
+        assert stats["pool_max"] == 4
+        assert stats["pool_size"] == 3
+        assert stats["pool_available"] == 0
+        assert stats["requests_waiting"] == 0
+
+        p.wait(2.0)
+        ts = [spawn(worker, args=(i,)) for i in range(7)]
+        sleep(0.1)
+        stats = p.get_stats()
+        gather(*ts)
+        assert stats["pool_min"] == min_size(pool_cls, 2)
+        assert stats["pool_max"] == 4
+        assert stats["pool_size"] == 4
+        assert stats["pool_available"] == 0
+        assert stats["requests_waiting"] == 3
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+def test_stats_usage(pool_cls, dsn):
+    def worker(n):
+        try:
+            with p.connection(timeout=0.3) as conn:
+                conn.execute("select pg_sleep(0.2)")
+        except pool.PoolTimeout:
+            pass
+
+    with pool_cls(dsn, min_size=min_size(pool_cls, 3), max_size=3) as p:
+        p.wait(2.0)
+
+        ts = [spawn(worker, args=(i,)) for i in range(7)]
+        gather(*ts)
+        stats = p.get_stats()
+        assert stats["requests_num"] == 7
+        assert stats["requests_queued"] == 4
+        assert 850 <= stats["requests_wait_ms"] <= 950
+        assert stats["requests_errors"] == 1
+        assert 1150 <= stats["usage_ms"] <= 1250
+        assert stats.get("returns_bad", 0) == 0
+
+        with p.connection() as conn:
+            conn.close()
+        p.wait()
+        stats = p.pop_stats()
+        assert stats["requests_num"] == 8
+        assert stats["returns_bad"] == 1
+        with p.connection():
+            pass
+        assert p.get_stats()["requests_num"] == 1
+
+
+def test_debug_deadlock(pool_cls, dsn):
+    # https://github.com/psycopg/psycopg/issues/230
+    logger = logging.getLogger("psycopg")
+    handler = logging.StreamHandler()
+    old_level = logger.level
+    logger.setLevel(logging.DEBUG)
+    handler.setLevel(logging.DEBUG)
+    logger.addHandler(handler)
+    try:
+        with pool_cls(dsn, min_size=min_size(pool_cls, 4), open=True) as p:
+            p.wait(timeout=2)
+    finally:
+        logger.removeHandler(handler)
+        logger.setLevel(old_level)
+
+
+@pytest.mark.skipif(not is_async(__name__), reason="async test only")
+def test_cancellation_in_queue(pool_cls, dsn):
+    # https://github.com/psycopg/psycopg/issues/509
+
+    nconns = 3
+
+    with pool_cls(
+        dsn, min_size=min_size(pool_cls, nconns), max_size=nconns, timeout=1
+    ) as p:
+        p.wait()
+
+        got_conns = []
+        ev = Event()
+
+        def worker(i):
+            try:
+                logging.info("worker %s started", i)
+                nonlocal got_conns
+
+                with p.connection() as conn:
+                    logging.info("worker %s got conn", i)
+                    cur = conn.execute("select 1")
+                    assert cur.fetchone() == (1,)
+
+                    got_conns.append(conn)
+                    if len(got_conns) >= nconns:
+                        ev.set()
+
+                    sleep(5)
+            except BaseException as ex:
+                logging.info("worker %s stopped: %r", i, ex)
+                raise
+
+        # Start tasks taking up all the connections and getting in the queue
+        tasks = [spawn(worker, (i,)) for i in range(nconns * 3)]
+
+        # wait until the pool has served all the connections and clients are queued.
+        ev.wait(3.0)
+        for i in range(10):
+            if p.get_stats().get("requests_queued", 0):
+                break
+            else:
+                sleep(0.1)
+        else:
+            pytest.fail("no client got in the queue")
+
+        [task.cancel() for task in reversed(tasks)]
+        gather(*tasks, return_exceptions=True, timeout=1.0)
+
+        stats = p.get_stats()
+        assert stats["pool_available"] == min_size(pool_cls, nconns)
+        assert stats.get("requests_waiting", 0) == 0
+
+        with p.connection() as conn:
+            cur = conn.execute("select 1")
+            assert cur.fetchone() == (1,)
+
+
+def min_size(pool_cls, num=1):
+    """Return the minimum min_size supported by the pool class."""
+    if pool_cls is pool.ConnectionPool:
+        return num
+    elif pool_cls is pool.NullConnectionPool:
+        return 0
+    else:
+        assert False, pool_cls
+
+
+def delay_connection(monkeypatch, sec):
+    """
+    Return a _connect_gen function delayed by the amount of seconds
+    """
+
+    def connect_delay(*args, **kwargs):
+        t0 = time()
+        rv = connect_orig(*args, **kwargs)
+        t1 = time()
+        sleep(max(0, sec - (t1 - t0)))
+        return rv
+
+    connect_orig = psycopg.Connection.connect
+    monkeypatch.setattr(psycopg.Connection, "connect", connect_delay)
+
+
+def ensure_waiting(p, num=1):
+    """
+    Wait until there are at least *num* clients waiting in the queue.
+    """
+    while len(p._waiting) < num:
+        sleep(0)
diff --git a/tests/pool/test_pool_common_async.py b/tests/pool/test_pool_common_async.py
new file mode 100644 (file)
index 0000000..61472e0
--- /dev/null
@@ -0,0 +1,634 @@
+import logging
+from time import time
+from typing import Any, List, Tuple
+
+import pytest
+
+import psycopg
+
+from ..utils import AEvent, spawn, gather, asleep, is_alive, is_async
+
+try:
+    import psycopg_pool as pool
+except ImportError:
+    # Tests should have been skipped if the package is not available
+    pass
+
+if True:  # ASYNC
+    pytestmark = [pytest.mark.anyio]
+
+
+@pytest.fixture(params=[pool.AsyncConnectionPool, pool.AsyncNullConnectionPool])
+def pool_cls(request):
+    return request.param
+
+
+async def test_defaults(pool_cls, dsn):
+    async with pool_cls(dsn) as p:
+        assert p.timeout == 30
+        assert p.max_idle == 10 * 60
+        assert p.max_lifetime == 60 * 60
+        assert p.num_workers == 3
+
+
+async def test_connection_class(pool_cls, dsn):
+    class MyConn(psycopg.AsyncConnection[Any]):
+        pass
+
+    async with pool_cls(dsn, connection_class=MyConn, min_size=min_size(pool_cls)) as p:
+        async with p.connection() as conn:
+            assert isinstance(conn, MyConn)
+
+
+async def test_kwargs(pool_cls, dsn):
+    async with pool_cls(
+        dsn, kwargs={"autocommit": True}, min_size=min_size(pool_cls)
+    ) as p:
+        async with p.connection() as conn:
+            assert conn.autocommit
+
+
+async def test_context(pool_cls, dsn):
+    async with pool_cls(dsn, min_size=min_size(pool_cls)) as p:
+        assert not p.closed
+    assert p.closed
+
+
+async def test_wait_closed(pool_cls, dsn):
+    async with pool_cls(dsn) as p:
+        pass
+
+    with pytest.raises(pool.PoolClosed):
+        await p.wait()
+
+
+@pytest.mark.slow
+async def test_setup_no_timeout(pool_cls, dsn, proxy):
+    with pytest.raises(pool.PoolTimeout):
+        async with pool_cls(
+            proxy.client_dsn, min_size=min_size(pool_cls), num_workers=1
+        ) as p:
+            await p.wait(0.2)
+
+    async with pool_cls(
+        proxy.client_dsn, min_size=min_size(pool_cls), num_workers=1
+    ) as p:
+        await asleep(0.5)
+        assert not p._pool
+        proxy.start()
+
+        async with p.connection() as conn:
+            await conn.execute("select 1")
+
+
+@pytest.mark.slow
+async def test_configure_badstate(pool_cls, dsn, caplog):
+    caplog.set_level(logging.WARNING, logger="psycopg.pool")
+
+    async def configure(conn):
+        await conn.execute("select 1")
+
+    async with pool_cls(dsn, min_size=min_size(pool_cls), configure=configure) as p:
+        with pytest.raises(pool.PoolTimeout):
+            await p.wait(timeout=0.5)
+
+    assert caplog.records
+    assert "INTRANS" in caplog.records[0].message
+
+
+@pytest.mark.slow
+async def test_configure_broken(pool_cls, dsn, caplog):
+    caplog.set_level(logging.WARNING, logger="psycopg.pool")
+
+    async def configure(conn):
+        async with conn.transaction():
+            await conn.execute("WAT")
+
+    async with pool_cls(dsn, min_size=min_size(pool_cls), configure=configure) as p:
+        with pytest.raises(pool.PoolTimeout):
+            await p.wait(timeout=0.5)
+
+    assert caplog.records
+    assert "WAT" in caplog.records[0].message
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+@pytest.mark.crdb_skip("backend pid")
+async def test_queue(pool_cls, dsn):
+    async def worker(n):
+        t0 = time()
+        async with p.connection() as conn:
+            await conn.execute("select pg_sleep(0.2)")
+            pid = conn.info.backend_pid
+        t1 = time()
+        results.append((n, t1 - t0, pid))
+
+    results: List[Tuple[int, float, int]] = []
+    async with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2) as p:
+        await p.wait()
+        ts = [spawn(worker, args=(i,)) for i in range(6)]
+        await gather(*ts)
+
+    times = [item[1] for item in results]
+    want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
+    for got, want in zip(times, want_times):
+        assert got == pytest.approx(want, 0.2), times
+
+    assert len(set(r[2] for r in results)) == 2, results
+
+
+@pytest.mark.slow
+async def test_queue_size(pool_cls, dsn):
+    async def worker(t, ev=None):
+        try:
+            async with p.connection():
+                if ev:
+                    ev.set()
+                await asleep(t)
+        except pool.TooManyRequests as e:
+            errors.append(e)
+        else:
+            success.append(True)
+
+    errors: List[Exception] = []
+    success: List[bool] = []
+
+    async with pool_cls(
+        dsn, min_size=min_size(pool_cls), max_size=1, max_waiting=3
+    ) as p:
+        await p.wait()
+        ev = AEvent()
+        spawn(worker, args=(0.3, ev))
+        await ev.wait()
+
+        ts = [spawn(worker, args=(0.1,)) for i in range(4)]
+        await gather(*ts)
+
+    assert len(success) == 4
+    assert len(errors) == 1
+    assert isinstance(errors[0], pool.TooManyRequests)
+    assert p.name in str(errors[0])
+    assert str(p.max_waiting) in str(errors[0])
+    assert p.get_stats()["requests_errors"] == 1
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+@pytest.mark.crdb_skip("backend pid")
+async def test_queue_timeout(pool_cls, dsn):
+    async def worker(n):
+        t0 = time()
+        try:
+            async with p.connection() as conn:
+                await conn.execute("select pg_sleep(0.2)")
+                pid = conn.info.backend_pid
+        except pool.PoolTimeout as e:
+            t1 = time()
+            errors.append((n, t1 - t0, e))
+        else:
+            t1 = time()
+            results.append((n, t1 - t0, pid))
+
+    results: List[Tuple[int, float, int]] = []
+    errors: List[Tuple[int, float, Exception]] = []
+
+    async with pool_cls(
+        dsn, min_size=min_size(pool_cls, 2), max_size=2, timeout=0.1
+    ) as p:
+        ts = [spawn(worker, args=(i,)) for i in range(4)]
+        await gather(*ts)
+
+    assert len(results) == 2
+    assert len(errors) == 2
+    for e in errors:
+        assert 0.1 < e[1] < 0.15
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+async def test_dead_client(pool_cls, dsn):
+    async def worker(i, timeout):
+        try:
+            async with p.connection(timeout=timeout) as conn:
+                await conn.execute("select pg_sleep(0.3)")
+                results.append(i)
+        except pool.PoolTimeout:
+            if timeout > 0.2:
+                raise
+
+    async with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2) as p:
+        results: List[int] = []
+        ts = [
+            spawn(worker, args=(i, timeout))
+            for i, timeout in enumerate([0.4, 0.4, 0.1, 0.4, 0.4])
+        ]
+        await gather(*ts)
+
+        await asleep(0.2)
+        assert set(results) == set([0, 1, 3, 4])
+        if pool_cls is pool.AsyncConnectionPool:
+            assert len(p._pool) == 2  # no connection was lost
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+@pytest.mark.crdb_skip("backend pid")
+async def test_queue_timeout_override(pool_cls, dsn):
+    async def worker(n):
+        t0 = time()
+        timeout = 0.25 if n == 3 else None
+        try:
+            async with p.connection(timeout=timeout) as conn:
+                await conn.execute("select pg_sleep(0.2)")
+                pid = conn.info.backend_pid
+        except pool.PoolTimeout as e:
+            t1 = time()
+            errors.append((n, t1 - t0, e))
+        else:
+            t1 = time()
+            results.append((n, t1 - t0, pid))
+
+    results: List[Tuple[int, float, int]] = []
+    errors: List[Tuple[int, float, Exception]] = []
+
+    async with pool_cls(
+        dsn, min_size=min_size(pool_cls, 2), max_size=2, timeout=0.1
+    ) as p:
+        ts = [spawn(worker, args=(i,)) for i in range(4)]
+        await gather(*ts)
+
+    assert len(results) == 3
+    assert len(errors) == 1
+    for e in errors:
+        assert 0.1 < e[1] < 0.15
+
+
+@pytest.mark.crdb_skip("backend pid")
+async def test_broken_reconnect(pool_cls, dsn):
+    async with pool_cls(dsn, min_size=min_size(pool_cls), max_size=1) as p:
+        async with p.connection() as conn:
+            pid1 = conn.info.backend_pid
+            await conn.close()
+
+        async with p.connection() as conn2:
+            pid2 = conn2.info.backend_pid
+
+    assert pid1 != pid2
+
+
+async def test_close_no_tasks(pool_cls, dsn):
+    p = pool_cls(dsn)
+    assert p._sched_runner and is_alive(p._sched_runner)
+    workers = p._workers[:]
+    assert workers
+    for t in workers:
+        assert is_alive(t)
+
+    await p.close()
+    assert p._sched_runner is None
+    assert not p._workers
+    for t in workers:
+        assert not is_alive(t)
+
+
+async def test_putconn_no_pool(pool_cls, aconn_cls, dsn):
+    async with pool_cls(dsn, min_size=min_size(pool_cls)) as p:
+        conn = await aconn_cls.connect(dsn)
+        with pytest.raises(ValueError):
+            await p.putconn(conn)
+
+    await conn.close()
+
+
+async def test_putconn_wrong_pool(pool_cls, dsn):
+    async with pool_cls(dsn, min_size=min_size(pool_cls)) as p1:
+        async with pool_cls(dsn, min_size=min_size(pool_cls)) as p2:
+            conn = await p1.getconn()
+            with pytest.raises(ValueError):
+                await p2.putconn(conn)
+
+
+@pytest.mark.slow
+@pytest.mark.skipif(is_async(__name__), reason="sync test only")
+async def test_del_stops_threads(pool_cls, dsn):
+    p = pool_cls(dsn)
+    assert p._sched_runner is not None
+    ts = [p._sched_runner] + p._workers
+    del p
+    await asleep(0.1)
+    for t in ts:
+        assert not is_alive(t), t
+
+
+async def test_closed_getconn(pool_cls, dsn):
+    p = pool_cls(dsn, min_size=min_size(pool_cls))
+    assert not p.closed
+    async with p.connection():
+        pass
+
+    await p.close()
+    assert p.closed
+
+    with pytest.raises(pool.PoolClosed):
+        async with p.connection():
+            pass
+
+
+async def test_close_connection_on_pool_close(pool_cls, dsn):
+    p = pool_cls(dsn, min_size=min_size(pool_cls))
+    async with p.connection() as conn:
+        await p.close()
+    assert conn.closed
+
+
+async def test_closed_queue(pool_cls, dsn):
+    async def w1():
+        async with p.connection() as conn:
+            e1.set()  # Tell w0 that w1 got a connection
+            cur = await conn.execute("select 1")
+            assert await cur.fetchone() == (1,)
+            await e2.wait()  # Wait until w0 has tested w2
+        success.append("w1")
+
+    async def w2():
+        try:
+            async with p.connection():
+                pass  # unexpected
+        except pool.PoolClosed:
+            success.append("w2")
+
+    e1 = AEvent()
+    e2 = AEvent()
+
+    p = pool_cls(dsn, min_size=min_size(pool_cls), max_size=1)
+    await p.wait()
+    success: List[str] = []
+
+    t1 = spawn(w1)
+    # Wait until w1 has received a connection
+    await e1.wait()
+
+    t2 = spawn(w2)
+    # Wait until w2 is in the queue
+    await ensure_waiting(p)
+    await p.close()
+
+    # Wait for the workers to finish
+    e2.set()
+    await gather(t1, t2)
+    assert len(success) == 2
+
+
+async def test_open_explicit(pool_cls, dsn):
+    p = pool_cls(dsn, open=False)
+    assert p.closed
+    with pytest.raises(pool.PoolClosed, match="is not open yet"):
+        await p.getconn()
+
+    with pytest.raises(pool.PoolClosed, match="is not open yet"):
+        async with p.connection():
+            pass
+
+    await p.open()
+    try:
+        assert not p.closed
+
+        async with p.connection() as conn:
+            cur = await conn.execute("select 1")
+            assert await cur.fetchone() == (1,)
+
+    finally:
+        await p.close()
+
+    with pytest.raises(pool.PoolClosed, match="is already closed"):
+        await p.getconn()
+
+
+async def test_open_context(pool_cls, dsn):
+    p = pool_cls(dsn, open=False)
+    assert p.closed
+
+    async with p:
+        assert not p.closed
+
+        async with p.connection() as conn:
+            cur = await conn.execute("select 1")
+            assert await cur.fetchone() == (1,)
+
+    assert p.closed
+
+
+async def test_open_no_op(pool_cls, dsn):
+    p = pool_cls(dsn)
+    try:
+        assert not p.closed
+        await p.open()
+        assert not p.closed
+
+        async with p.connection() as conn:
+            cur = await conn.execute("select 1")
+            assert await cur.fetchone() == (1,)
+
+    finally:
+        await p.close()
+
+
+async def test_reopen(pool_cls, dsn):
+    p = pool_cls(dsn)
+    async with p.connection() as conn:
+        await conn.execute("select 1")
+    await p.close()
+    assert p._sched_runner is None
+    assert not p._workers
+
+    with pytest.raises(psycopg.OperationalError, match="cannot be reused"):
+        await p.open()
+
+
+async def test_jitter(pool_cls):
+    rnds = [pool_cls._jitter(30, -0.1, +0.2) for i in range(100)]
+    assert 27 <= min(rnds) <= 28
+    assert 35 < max(rnds) < 36
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+async def test_stats_measures(pool_cls, dsn):
+    async def worker(n):
+        async with p.connection() as conn:
+            await conn.execute("select pg_sleep(0.2)")
+
+    async with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=4) as p:
+        await p.wait(2.0)
+
+        stats = p.get_stats()
+        assert stats["pool_min"] == min_size(pool_cls, 2)
+        assert stats["pool_max"] == 4
+        assert stats["pool_size"] == min_size(pool_cls, 2)
+        assert stats["pool_available"] == min_size(pool_cls, 2)
+        assert stats["requests_waiting"] == 0
+
+        ts = [spawn(worker, args=(i,)) for i in range(3)]
+        await asleep(0.1)
+        stats = p.get_stats()
+        await gather(*ts)
+        assert stats["pool_min"] == min_size(pool_cls, 2)
+        assert stats["pool_max"] == 4
+        assert stats["pool_size"] == 3
+        assert stats["pool_available"] == 0
+        assert stats["requests_waiting"] == 0
+
+        await p.wait(2.0)
+        ts = [spawn(worker, args=(i,)) for i in range(7)]
+        await asleep(0.1)
+        stats = p.get_stats()
+        await gather(*ts)
+        assert stats["pool_min"] == min_size(pool_cls, 2)
+        assert stats["pool_max"] == 4
+        assert stats["pool_size"] == 4
+        assert stats["pool_available"] == 0
+        assert stats["requests_waiting"] == 3
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+async def test_stats_usage(pool_cls, dsn):
+    async def worker(n):
+        try:
+            async with p.connection(timeout=0.3) as conn:
+                await conn.execute("select pg_sleep(0.2)")
+        except pool.PoolTimeout:
+            pass
+
+    async with pool_cls(dsn, min_size=min_size(pool_cls, 3), max_size=3) as p:
+        await p.wait(2.0)
+
+        ts = [spawn(worker, args=(i,)) for i in range(7)]
+        await gather(*ts)
+        stats = p.get_stats()
+        assert stats["requests_num"] == 7
+        assert stats["requests_queued"] == 4
+        assert 850 <= stats["requests_wait_ms"] <= 950
+        assert stats["requests_errors"] == 1
+        assert 1150 <= stats["usage_ms"] <= 1250
+        assert stats.get("returns_bad", 0) == 0
+
+        async with p.connection() as conn:
+            await conn.close()
+        await p.wait()
+        stats = p.pop_stats()
+        assert stats["requests_num"] == 8
+        assert stats["returns_bad"] == 1
+        async with p.connection():
+            pass
+        assert p.get_stats()["requests_num"] == 1
+
+
+async def test_debug_deadlock(pool_cls, dsn):
+    # https://github.com/psycopg/psycopg/issues/230
+    logger = logging.getLogger("psycopg")
+    handler = logging.StreamHandler()
+    old_level = logger.level
+    logger.setLevel(logging.DEBUG)
+    handler.setLevel(logging.DEBUG)
+    logger.addHandler(handler)
+    try:
+        async with pool_cls(dsn, min_size=min_size(pool_cls, 4), open=True) as p:
+            await p.wait(timeout=2)
+    finally:
+        logger.removeHandler(handler)
+        logger.setLevel(old_level)
+
+
+@pytest.mark.skipif(not is_async(__name__), reason="async test only")
+async def test_cancellation_in_queue(pool_cls, dsn):
+    # https://github.com/psycopg/psycopg/issues/509
+
+    nconns = 3
+
+    async with pool_cls(
+        dsn, min_size=min_size(pool_cls, nconns), max_size=nconns, timeout=1
+    ) as p:
+        await p.wait()
+
+        got_conns = []
+        ev = AEvent()
+
+        async def worker(i):
+            try:
+                logging.info("worker %s started", i)
+                nonlocal got_conns
+
+                async with p.connection() as conn:
+                    logging.info("worker %s got conn", i)
+                    cur = await conn.execute("select 1")
+                    assert (await cur.fetchone()) == (1,)
+
+                    got_conns.append(conn)
+                    if len(got_conns) >= nconns:
+                        ev.set()
+
+                    await asleep(5)
+
+            except BaseException as ex:
+                logging.info("worker %s stopped: %r", i, ex)
+                raise
+
+        # Start tasks taking up all the connections and getting in the queue
+        tasks = [spawn(worker, (i,)) for i in range(nconns * 3)]
+
+        # wait until the pool has served all the connections and clients are queued.
+        await ev.wait_timeout(3.0)
+        for i in range(10):
+            if p.get_stats().get("requests_queued", 0):
+                break
+            else:
+                await asleep(0.1)
+        else:
+            pytest.fail("no client got in the queue")
+
+        [task.cancel() for task in reversed(tasks)]
+        await gather(*tasks, return_exceptions=True, timeout=1.0)
+
+        stats = p.get_stats()
+        assert stats["pool_available"] == min_size(pool_cls, nconns)
+        assert stats.get("requests_waiting", 0) == 0
+
+        async with p.connection() as conn:
+            cur = await conn.execute("select 1")
+            assert await cur.fetchone() == (1,)
+
+
+def min_size(pool_cls, num=1):
+    """Return the minimum min_size supported by the pool class."""
+    if pool_cls is pool.AsyncConnectionPool:
+        return num
+    elif pool_cls is pool.AsyncNullConnectionPool:
+        return 0
+    else:
+        assert False, pool_cls
+
+
+def delay_connection(monkeypatch, sec):
+    """
+    Return a _connect_gen function delayed by the amount of seconds
+    """
+
+    async def connect_delay(*args, **kwargs):
+        t0 = time()
+        rv = await connect_orig(*args, **kwargs)
+        t1 = time()
+        await asleep(max(0, sec - (t1 - t0)))
+        return rv
+
+    connect_orig = psycopg.AsyncConnection.connect
+    monkeypatch.setattr(psycopg.AsyncConnection, "connect", connect_delay)
+
+
+async def ensure_waiting(p, num=1):
+    """
+    Wait until there are at least *num* clients waiting in the queue.
+    """
+    while len(p._waiting) < num:
+        await asleep(0)
index 4183d722219c7c9fad0adbed86e06d0a06287291..3ede5997faffde076bd825ab7044108aeb312f3d 100755 (executable)
@@ -163,6 +163,7 @@ class RenameAsyncToSync(ast.NodeTransformer):
         "AsyncGenerator": "Generator",
         "AsyncIterator": "Iterator",
         "AsyncLibpqWriter": "LibpqWriter",
+        "AsyncNullConnectionPool": "NullConnectionPool",
         "AsyncPipeline": "Pipeline",
         "AsyncQueuedLibpqWriter": "QueuedLibpqWriter",
         "AsyncRawCursor": "RawCursor",
index 843fd9be364dc3258b3087db969caded3d11192a..1b05fda1f65bd1f270240acdbd321ae14c233967 100755 (executable)
@@ -22,6 +22,7 @@ for async in \
     psycopg/psycopg/cursor_async.py \
     psycopg_pool/psycopg_pool/sched_async.py \
     tests/pool/test_pool_async.py \
+    tests/pool/test_pool_common_async.py \
     tests/pool/test_sched_async.py \
     tests/test_connection_async.py \
     tests/test_copy_async.py \