]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
More robust null pool tests
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 8 Jan 2022 18:37:14 +0000 (19:37 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 9 Jan 2022 17:48:04 +0000 (18:48 +0100)
Fix race conditions of tasks completing before a concurrent client is
queued.

Fix test relying on fixed broken behaviour of copy().

Use connection.info rather than connection.pgconn to obtain higher level
results (e.g. Enum instead of int).

tests/pool/test_null_pool.py
tests/pool/test_null_pool_async.py
tests/pool/test_pool.py
tests/pool/test_pool_async.py

index 9747c5e198e826163bebf2055bcc6524c4c2b10b..74b880f6cc0e5ee602d29bca2af96580ca776264 100644 (file)
@@ -9,7 +9,7 @@ from packaging.version import parse as ver  # noqa: F401  # used in skipif
 import psycopg
 from psycopg.pq import TransactionStatus
 
-from .test_pool import delay_connection
+from .test_pool import delay_connection, ensure_waiting
 
 try:
     from psycopg_pool import NullConnectionPool
@@ -67,7 +67,7 @@ def test_its_no_pool_at_all(dsn):
                     (pid2,) = cur.fetchone()  # type: ignore[misc]
 
         with p.connection() as conn:
-            assert conn.pgconn.backend_pid not in (pid1, pid2)
+            assert conn.info.backend_pid not in (pid1, pid2)
 
 
 def test_context(dsn):
@@ -189,7 +189,7 @@ def test_reset(dsn):
             assert resets == 1
             with conn.execute("show timezone") as cur:
                 assert cur.fetchone() == ("UTC",)
-            pids.append(conn.pgconn.backend_pid)
+            pids.append(conn.info.backend_pid)
 
     with NullConnectionPool(dsn, max_size=1, reset=reset) as p:
         with p.connection() as conn:
@@ -198,10 +198,11 @@ def test_reset(dsn):
             # instead of making a new one.
             t = Thread(target=worker)
             t.start()
+            ensure_waiting(p)
 
             assert resets == 0
             conn.execute("set timezone to '+2:00'")
-            pids.append(conn.pgconn.backend_pid)
+            pids.append(conn.info.backend_pid)
 
         t.join()
         p.wait()
@@ -221,16 +222,17 @@ def test_reset_badstate(dsn, caplog):
     def worker():
         with p.connection() as conn:
             conn.execute("select 1")
-            pids.append(conn.pgconn.backend_pid)
+            pids.append(conn.info.backend_pid)
 
     with NullConnectionPool(dsn, max_size=1, reset=reset) as p:
         with p.connection() as conn:
 
             t = Thread(target=worker)
             t.start()
+            ensure_waiting(p)
 
             conn.execute("select 1")
-            pids.append(conn.pgconn.backend_pid)
+            pids.append(conn.info.backend_pid)
 
         t.join()
 
@@ -251,16 +253,17 @@ def test_reset_broken(dsn, caplog):
     def worker():
         with p.connection() as conn:
             conn.execute("select 1")
-            pids.append(conn.pgconn.backend_pid)
+            pids.append(conn.info.backend_pid)
 
     with NullConnectionPool(dsn, max_size=1, reset=reset) as p:
         with p.connection() as conn:
 
             t = Thread(target=worker)
             t.start()
+            ensure_waiting(p)
 
             conn.execute("select 1")
-            pids.append(conn.pgconn.backend_pid)
+            pids.append(conn.info.backend_pid)
 
         t.join()
 
@@ -466,8 +469,8 @@ def test_intrans_rollback(dsn, caplog):
 
     def worker():
         with p.connection() as conn:
-            pids.append(conn.pgconn.backend_pid)
-            assert conn.pgconn.transaction_status == TransactionStatus.IDLE
+            pids.append(conn.info.backend_pid)
+            assert conn.info.transaction_status == TransactionStatus.IDLE
             assert not conn.execute(
                 "select 1 from pg_class where relname = 'test_intrans_rollback'"
             ).fetchone()
@@ -479,10 +482,11 @@ def test_intrans_rollback(dsn, caplog):
         # of making a new one.
         t = Thread(target=worker)
         t.start()
+        ensure_waiting(p)
 
-        pids.append(conn.pgconn.backend_pid)
+        pids.append(conn.info.backend_pid)
         conn.execute("create table test_intrans_rollback ()")
-        assert conn.pgconn.transaction_status == TransactionStatus.INTRANS
+        assert conn.info.transaction_status == TransactionStatus.INTRANS
         p.putconn(conn)
         t.join()
 
@@ -497,8 +501,8 @@ def test_inerror_rollback(dsn, caplog):
 
     def worker():
         with p.connection() as conn:
-            pids.append(conn.pgconn.backend_pid)
-            assert conn.pgconn.transaction_status == TransactionStatus.IDLE
+            pids.append(conn.info.backend_pid)
+            assert conn.info.transaction_status == TransactionStatus.IDLE
 
     with NullConnectionPool(dsn, max_size=1) as p:
         conn = p.getconn()
@@ -507,11 +511,12 @@ def test_inerror_rollback(dsn, caplog):
         # of making a new one.
         t = Thread(target=worker)
         t.start()
+        ensure_waiting(p)
 
-        pids.append(conn.pgconn.backend_pid)
+        pids.append(conn.info.backend_pid)
         with pytest.raises(psycopg.ProgrammingError):
             conn.execute("wat")
-        assert conn.pgconn.transaction_status == TransactionStatus.INERROR
+        assert conn.info.transaction_status == TransactionStatus.INERROR
         p.putconn(conn)
         t.join()
 
@@ -526,20 +531,21 @@ def test_active_close(dsn, caplog):
 
     def worker():
         with p.connection() as conn:
-            pids.append(conn.pgconn.backend_pid)
-            assert conn.pgconn.transaction_status == TransactionStatus.IDLE
+            pids.append(conn.info.backend_pid)
+            assert conn.info.transaction_status == TransactionStatus.IDLE
 
     with NullConnectionPool(dsn, max_size=1) as p:
         conn = p.getconn()
 
         t = Thread(target=worker)
         t.start()
+        ensure_waiting(p)
 
-        pids.append(conn.pgconn.backend_pid)
-        cur = conn.cursor()
-        with cur.copy("copy (select * from generate_series(1, 10)) to stdout"):
-            pass
-        assert conn.pgconn.transaction_status == TransactionStatus.ACTIVE
+        pids.append(conn.info.backend_pid)
+        conn.pgconn.exec_(
+            b"copy (select * from generate_series(1, 10)) to stdout"
+        )
+        assert conn.info.transaction_status == TransactionStatus.ACTIVE
         p.putconn(conn)
         t.join()
 
@@ -555,8 +561,8 @@ def test_fail_rollback_close(dsn, caplog, monkeypatch):
 
     def worker(p):
         with p.connection() as conn:
-            pids.append(conn.pgconn.backend_pid)
-            assert conn.pgconn.transaction_status == TransactionStatus.IDLE
+            pids.append(conn.info.backend_pid)
+            assert conn.info.transaction_status == TransactionStatus.IDLE
 
     with NullConnectionPool(dsn, max_size=1) as p:
         conn = p.getconn()
@@ -571,11 +577,12 @@ def test_fail_rollback_close(dsn, caplog, monkeypatch):
 
         t = Thread(target=worker, args=(p,))
         t.start()
+        ensure_waiting(p)
 
-        pids.append(conn.pgconn.backend_pid)
+        pids.append(conn.info.backend_pid)
         with pytest.raises(psycopg.ProgrammingError):
             conn.execute("wat")
-        assert conn.pgconn.transaction_status == TransactionStatus.INERROR
+        assert conn.info.transaction_status == TransactionStatus.INERROR
         p.putconn(conn)
         t.join()
 
@@ -686,8 +693,7 @@ def test_closed_queue(dsn):
     t2 = Thread(target=w2)
     t2.start()
     # Wait until w2 is in the queue
-    while not p._waiting:
-        sleep(0)
+    ensure_waiting(p)
 
     p.close(0)
 
@@ -780,7 +786,7 @@ def test_max_lifetime(dsn):
 
     def worker(p):
         with p.connection() as conn:
-            pids.append(conn.pgconn.backend_pid)
+            pids.append(conn.info.backend_pid)
             sleep(0.1)
 
     ts = []
index 9175307e7a92961a73983dfb05448bab010722db..fa31984b8915f6ad1ae257363d90683caeb3ea8e 100644 (file)
@@ -10,7 +10,7 @@ from packaging.version import parse as ver  # noqa: F401  # used in skipif
 import psycopg
 from psycopg.pq import TransactionStatus
 from psycopg._compat import create_task
-from .test_pool_async import delay_connection
+from .test_pool_async import delay_connection, ensure_waiting
 
 pytestmark = [
     pytest.mark.asyncio,
@@ -76,7 +76,7 @@ async def test_its_no_pool_at_all(dsn):
                 (pid2,) = await cur.fetchone()  # type: ignore[misc]
 
         async with p.connection() as conn:
-            assert conn.pgconn.backend_pid not in (pid1, pid2)
+            assert conn.info.backend_pid not in (pid1, pid2)
 
 
 async def test_context(dsn):
@@ -200,7 +200,7 @@ async def test_reset(dsn):
             assert resets == 1
             cur = await conn.execute("show timezone")
             assert (await cur.fetchone()) == ("UTC",)
-            pids.append(conn.pgconn.backend_pid)
+            pids.append(conn.info.backend_pid)
 
     async with AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p:
         async with p.connection() as conn:
@@ -208,10 +208,11 @@ async def test_reset(dsn):
             # Queue the worker so it will take the same connection a second time
             # instead of making a new one.
             t = create_task(worker())
+            await ensure_waiting(p)
 
             assert resets == 0
             await conn.execute("set timezone to '+2:00'")
-            pids.append(conn.pgconn.backend_pid)
+            pids.append(conn.info.backend_pid)
 
         await asyncio.gather(t)
         await p.wait()
@@ -231,15 +232,16 @@ async def test_reset_badstate(dsn, caplog):
     async def worker():
         async with p.connection() as conn:
             await conn.execute("select 1")
-            pids.append(conn.pgconn.backend_pid)
+            pids.append(conn.info.backend_pid)
 
     async with AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p:
         async with p.connection() as conn:
 
             t = create_task(worker())
+            await ensure_waiting(p)
 
             await conn.execute("select 1")
-            pids.append(conn.pgconn.backend_pid)
+            pids.append(conn.info.backend_pid)
 
         await asyncio.gather(t)
 
@@ -260,15 +262,16 @@ async def test_reset_broken(dsn, caplog):
     async def worker():
         async with p.connection() as conn:
             await conn.execute("select 1")
-            pids.append(conn.pgconn.backend_pid)
+            pids.append(conn.info.backend_pid)
 
     async with AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p:
         async with p.connection() as conn:
 
             t = create_task(worker())
+            await ensure_waiting(p)
 
             await conn.execute("select 1")
-            pids.append(conn.pgconn.backend_pid)
+            pids.append(conn.info.backend_pid)
 
         await asyncio.gather(t)
 
@@ -465,8 +468,8 @@ async def test_intrans_rollback(dsn, caplog):
 
     async def worker():
         async with p.connection() as conn:
-            pids.append(conn.pgconn.backend_pid)
-            assert conn.pgconn.transaction_status == TransactionStatus.IDLE
+            pids.append(conn.info.backend_pid)
+            assert conn.info.transaction_status == TransactionStatus.IDLE
             cur = await conn.execute(
                 "select 1 from pg_class where relname = 'test_intrans_rollback'"
             )
@@ -478,10 +481,11 @@ async def test_intrans_rollback(dsn, caplog):
         # Queue the worker so it will take the connection a second time instead
         # of making a new one.
         t = create_task(worker())
+        await ensure_waiting(p)
 
-        pids.append(conn.pgconn.backend_pid)
+        pids.append(conn.info.backend_pid)
         await conn.execute("create table test_intrans_rollback ()")
-        assert conn.pgconn.transaction_status == TransactionStatus.INTRANS
+        assert conn.info.transaction_status == TransactionStatus.INTRANS
         await p.putconn(conn)
         await asyncio.gather(t)
 
@@ -496,18 +500,19 @@ async def test_inerror_rollback(dsn, caplog):
 
     async def worker():
         async with p.connection() as conn:
-            pids.append(conn.pgconn.backend_pid)
-            assert conn.pgconn.transaction_status == TransactionStatus.IDLE
+            pids.append(conn.info.backend_pid)
+            assert conn.info.transaction_status == TransactionStatus.IDLE
 
     async with AsyncNullConnectionPool(dsn, max_size=1) as p:
         conn = await p.getconn()
 
         t = create_task(worker())
+        await ensure_waiting(p)
 
-        pids.append(conn.pgconn.backend_pid)
+        pids.append(conn.info.backend_pid)
         with pytest.raises(psycopg.ProgrammingError):
             await conn.execute("wat")
-        assert conn.pgconn.transaction_status == TransactionStatus.INERROR
+        assert conn.info.transaction_status == TransactionStatus.INERROR
         await p.putconn(conn)
         await asyncio.gather(t)
 
@@ -522,21 +527,20 @@ async def test_active_close(dsn, caplog):
 
     async def worker():
         async with p.connection() as conn:
-            pids.append(conn.pgconn.backend_pid)
-            assert conn.pgconn.transaction_status == TransactionStatus.IDLE
+            pids.append(conn.info.backend_pid)
+            assert conn.info.transaction_status == TransactionStatus.IDLE
 
     async with AsyncNullConnectionPool(dsn, max_size=1) as p:
         conn = await p.getconn()
 
         t = create_task(worker())
+        await ensure_waiting(p)
 
-        pids.append(conn.pgconn.backend_pid)
-        cur = conn.cursor()
-        async with cur.copy(
-            "copy (select * from generate_series(1, 10)) to stdout"
-        ):
-            pass
-        assert conn.pgconn.transaction_status == TransactionStatus.ACTIVE
+        pids.append(conn.info.backend_pid)
+        conn.pgconn.exec_(
+            b"copy (select * from generate_series(1, 10)) to stdout"
+        )
+        assert conn.info.transaction_status == TransactionStatus.ACTIVE
         await p.putconn(conn)
         await asyncio.gather(t)
 
@@ -552,12 +556,13 @@ async def test_fail_rollback_close(dsn, caplog, monkeypatch):
 
     async def worker():
         async with p.connection() as conn:
-            pids.append(conn.pgconn.backend_pid)
-            assert conn.pgconn.transaction_status == TransactionStatus.IDLE
+            pids.append(conn.info.backend_pid)
+            assert conn.info.transaction_status == TransactionStatus.IDLE
 
     async with AsyncNullConnectionPool(dsn, max_size=1) as p:
         conn = await p.getconn()
         t = create_task(worker())
+        await ensure_waiting(p)
 
         async def bad_rollback():
             conn.pgconn.finish()
@@ -567,10 +572,10 @@ async def test_fail_rollback_close(dsn, caplog, monkeypatch):
         orig_rollback = conn.rollback
         monkeypatch.setattr(conn, "rollback", bad_rollback)
 
-        pids.append(conn.pgconn.backend_pid)
+        pids.append(conn.info.backend_pid)
         with pytest.raises(psycopg.ProgrammingError):
             await conn.execute("wat")
-        assert conn.pgconn.transaction_status == TransactionStatus.INERROR
+        assert conn.info.transaction_status == TransactionStatus.INERROR
         await p.putconn(conn)
         await asyncio.gather(t)
 
@@ -668,9 +673,7 @@ async def test_closed_queue(dsn):
 
     t2 = create_task(w2())
     # Wait until w2 is in the queue
-    while not p._waiting:
-        await asyncio.sleep(0)
-
+    await ensure_waiting(p)
     await p.close()
 
     # Wait for the workers to finish
@@ -760,7 +763,7 @@ async def test_max_lifetime(dsn):
 
     async def worker():
         async with p.connection() as conn:
-            pids.append(conn.pgconn.backend_pid)
+            pids.append(conn.info.backend_pid)
             await asyncio.sleep(0.1)
 
     async with AsyncNullConnectionPool(dsn, max_size=1, max_lifetime=0.2) as p:
index 17cac2bcb608c3582612254a561aff768e42c0e0..a7634b459779597a34060634767a85d5625f79e3 100644 (file)
@@ -79,7 +79,7 @@ def test_its_really_a_pool(dsn):
                     (pid2,) = cur.fetchone()  # type: ignore[misc]
 
         with p.connection() as conn:
-            assert conn.pgconn.backend_pid in (pid1, pid2)
+            assert conn.info.backend_pid in (pid1, pid2)
 
 
 def test_context(dsn):
@@ -92,11 +92,11 @@ def test_connection_not_lost(dsn):
     with pool.ConnectionPool(dsn, min_size=1) as p:
         with pytest.raises(ZeroDivisionError):
             with p.connection() as conn:
-                pid = conn.pgconn.backend_pid
+                pid = conn.info.backend_pid
                 1 / 0
 
         with p.connection() as conn2:
-            assert conn2.pgconn.backend_pid == pid
+            assert conn2.info.backend_pid == pid
 
 
 @pytest.mark.slow
@@ -262,11 +262,11 @@ def test_reset_badstate(dsn, caplog):
     with pool.ConnectionPool(dsn, min_size=1, reset=reset) as p:
         with p.connection() as conn:
             conn.execute("select 1")
-            pid1 = conn.pgconn.backend_pid
+            pid1 = conn.info.backend_pid
 
         with p.connection() as conn:
             conn.execute("select 1")
-            pid2 = conn.pgconn.backend_pid
+            pid2 = conn.info.backend_pid
 
     assert pid1 != pid2
     assert caplog.records
@@ -283,11 +283,11 @@ def test_reset_broken(dsn, caplog):
     with pool.ConnectionPool(dsn, min_size=1, reset=reset) as p:
         with p.connection() as conn:
             conn.execute("select 1")
-            pid1 = conn.pgconn.backend_pid
+            pid1 = conn.info.backend_pid
 
         with p.connection() as conn:
             conn.execute("select 1")
-            pid2 = conn.pgconn.backend_pid
+            pid2 = conn.info.backend_pid
 
     assert pid1 != pid2
     assert caplog.records
@@ -480,14 +480,14 @@ def test_intrans_rollback(dsn, caplog):
 
     with pool.ConnectionPool(dsn, min_size=1) as p:
         conn = p.getconn()
-        pid = conn.pgconn.backend_pid
+        pid = conn.info.backend_pid
         conn.execute("create table test_intrans_rollback ()")
-        assert conn.pgconn.transaction_status == TransactionStatus.INTRANS
+        assert conn.info.transaction_status == TransactionStatus.INTRANS
         p.putconn(conn)
 
         with p.connection() as conn2:
-            assert conn2.pgconn.backend_pid == pid
-            assert conn2.pgconn.transaction_status == TransactionStatus.IDLE
+            assert conn2.info.backend_pid == pid
+            assert conn2.info.transaction_status == TransactionStatus.IDLE
             assert not conn2.execute(
                 "select 1 from pg_class where relname = 'test_intrans_rollback'"
             ).fetchone()
@@ -501,15 +501,15 @@ def test_inerror_rollback(dsn, caplog):
 
     with pool.ConnectionPool(dsn, min_size=1) as p:
         conn = p.getconn()
-        pid = conn.pgconn.backend_pid
+        pid = conn.info.backend_pid
         with pytest.raises(psycopg.ProgrammingError):
             conn.execute("wat")
-        assert conn.pgconn.transaction_status == TransactionStatus.INERROR
+        assert conn.info.transaction_status == TransactionStatus.INERROR
         p.putconn(conn)
 
         with p.connection() as conn2:
-            assert conn2.pgconn.backend_pid == pid
-            assert conn2.pgconn.transaction_status == TransactionStatus.IDLE
+            assert conn2.info.backend_pid == pid
+            assert conn2.info.transaction_status == TransactionStatus.IDLE
 
     assert len(caplog.records) == 1
     assert "INERROR" in caplog.records[0].message
@@ -520,16 +520,16 @@ def test_active_close(dsn, caplog):
 
     with pool.ConnectionPool(dsn, min_size=1) as p:
         conn = p.getconn()
-        pid = conn.pgconn.backend_pid
-        cur = conn.cursor()
-        with cur.copy("copy (select * from generate_series(1, 10)) to stdout"):
-            pass
-        assert conn.pgconn.transaction_status == TransactionStatus.ACTIVE
+        pid = conn.info.backend_pid
+        conn.pgconn.exec_(
+            b"copy (select * from generate_series(1, 10)) to stdout"
+        )
+        assert conn.info.transaction_status == TransactionStatus.ACTIVE
         p.putconn(conn)
 
         with p.connection() as conn2:
-            assert conn2.pgconn.backend_pid != pid
-            assert conn2.pgconn.transaction_status == TransactionStatus.IDLE
+            assert conn2.info.backend_pid != pid
+            assert conn2.info.transaction_status == TransactionStatus.IDLE
 
     assert len(caplog.records) == 2
     assert "ACTIVE" in caplog.records[0].message
@@ -550,15 +550,15 @@ def test_fail_rollback_close(dsn, caplog, monkeypatch):
         orig_rollback = conn.rollback
         monkeypatch.setattr(conn, "rollback", bad_rollback)
 
-        pid = conn.pgconn.backend_pid
+        pid = conn.info.backend_pid
         with pytest.raises(psycopg.ProgrammingError):
             conn.execute("wat")
-        assert conn.pgconn.transaction_status == TransactionStatus.INERROR
+        assert conn.info.transaction_status == TransactionStatus.INERROR
         p.putconn(conn)
 
         with p.connection() as conn2:
-            assert conn2.pgconn.backend_pid != pid
-            assert conn2.pgconn.transaction_status == TransactionStatus.IDLE
+            assert conn2.info.backend_pid != pid
+            assert conn2.info.transaction_status == TransactionStatus.IDLE
 
     assert len(caplog.records) == 3
     assert "INERROR" in caplog.records[0].message
@@ -678,8 +678,7 @@ def test_closed_queue(dsn):
     t2 = Thread(target=w2)
     t2.start()
     # Wait until w2 is in the queue
-    while not p._waiting:
-        sleep(0)
+    ensure_waiting(p)
 
     p.close(0)
 
@@ -1020,7 +1019,7 @@ def test_max_lifetime(dsn):
         pids = []
         for i in range(5):
             with p.connection() as conn:
-                pids.append(conn.pgconn.backend_pid)
+                pids.append(conn.info.backend_pid)
             sleep(0.2)
 
     assert pids[0] == pids[1] != pids[4], pids
@@ -1031,10 +1030,10 @@ def test_check(dsn, caplog):
     with pool.ConnectionPool(dsn, min_size=4) as p:
         p.wait(1.0)
         with p.connection() as conn:
-            pid = conn.pgconn.backend_pid
+            pid = conn.info.backend_pid
 
         p.wait(1.0)
-        pids = set(conn.pgconn.backend_pid for conn in p._pool)
+        pids = set(conn.info.backend_pid for conn in p._pool)
         assert pid in pids
         conn.close()
 
@@ -1042,7 +1041,7 @@ def test_check(dsn, caplog):
         p.check()
         assert len(caplog.records) == 1
         p.wait(1.0)
-        pids2 = set(conn.pgconn.backend_pid for conn in p._pool)
+        pids2 = set(conn.info.backend_pid for conn in p._pool)
         assert len(pids & pids2) == 3
         assert pid not in pids2
 
@@ -1197,3 +1196,11 @@ def delay_connection(monkeypatch, sec):
 
     connect_orig = psycopg.Connection.connect
     monkeypatch.setattr(psycopg.Connection, "connect", connect_delay)
+
+
+def ensure_waiting(p, num=1):
+    """
+    Wait until there are at least *num* clients waiting in the queue.
+    """
+    while len(p._waiting) < num:
+        sleep(0)
index 9618af1db9ad8feaa57e030028ac739d4e52e66c..eb84868752c969ab0451705bf7702054c6323560 100644 (file)
@@ -74,7 +74,7 @@ async def test_its_really_a_pool(dsn):
                 (pid2,) = await cur.fetchone()  # type: ignore[misc]
 
         async with p.connection() as conn:
-            assert conn.pgconn.backend_pid in (pid1, pid2)
+            assert conn.info.backend_pid in (pid1, pid2)
 
 
 async def test_context(dsn):
@@ -87,11 +87,11 @@ async def test_connection_not_lost(dsn):
     async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
         with pytest.raises(ZeroDivisionError):
             async with p.connection() as conn:
-                pid = conn.pgconn.backend_pid
+                pid = conn.info.backend_pid
                 1 / 0
 
         async with p.connection() as conn2:
-            assert conn2.pgconn.backend_pid == pid
+            assert conn2.info.backend_pid == pid
 
 
 @pytest.mark.slow
@@ -269,11 +269,11 @@ async def test_reset_badstate(dsn, caplog):
     async with pool.AsyncConnectionPool(dsn, min_size=1, reset=reset) as p:
         async with p.connection() as conn:
             await conn.execute("select 1")
-            pid1 = conn.pgconn.backend_pid
+            pid1 = conn.info.backend_pid
 
         async with p.connection() as conn:
             await conn.execute("select 1")
-            pid2 = conn.pgconn.backend_pid
+            pid2 = conn.info.backend_pid
 
     assert pid1 != pid2
     assert caplog.records
@@ -290,11 +290,11 @@ async def test_reset_broken(dsn, caplog):
     async with pool.AsyncConnectionPool(dsn, min_size=1, reset=reset) as p:
         async with p.connection() as conn:
             await conn.execute("select 1")
-            pid1 = conn.pgconn.backend_pid
+            pid1 = conn.info.backend_pid
 
         async with p.connection() as conn:
             await conn.execute("select 1")
-            pid2 = conn.pgconn.backend_pid
+            pid2 = conn.info.backend_pid
 
     assert pid1 != pid2
     assert caplog.records
@@ -478,14 +478,14 @@ async def test_intrans_rollback(dsn, caplog):
 
     async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
         conn = await p.getconn()
-        pid = conn.pgconn.backend_pid
+        pid = conn.info.backend_pid
         await conn.execute("create table test_intrans_rollback ()")
-        assert conn.pgconn.transaction_status == TransactionStatus.INTRANS
+        assert conn.info.transaction_status == TransactionStatus.INTRANS
         await p.putconn(conn)
 
         async with p.connection() as conn2:
-            assert conn2.pgconn.backend_pid == pid
-            assert conn2.pgconn.transaction_status == TransactionStatus.IDLE
+            assert conn2.info.backend_pid == pid
+            assert conn2.info.transaction_status == TransactionStatus.IDLE
             cur = await conn2.execute(
                 "select 1 from pg_class where relname = 'test_intrans_rollback'"
             )
@@ -500,15 +500,15 @@ async def test_inerror_rollback(dsn, caplog):
 
     async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
         conn = await p.getconn()
-        pid = conn.pgconn.backend_pid
+        pid = conn.info.backend_pid
         with pytest.raises(psycopg.ProgrammingError):
             await conn.execute("wat")
-        assert conn.pgconn.transaction_status == TransactionStatus.INERROR
+        assert conn.info.transaction_status == TransactionStatus.INERROR
         await p.putconn(conn)
 
         async with p.connection() as conn2:
-            assert conn2.pgconn.backend_pid == pid
-            assert conn2.pgconn.transaction_status == TransactionStatus.IDLE
+            assert conn2.info.backend_pid == pid
+            assert conn2.info.transaction_status == TransactionStatus.IDLE
 
     assert len(caplog.records) == 1
     assert "INERROR" in caplog.records[0].message
@@ -519,18 +519,16 @@ async def test_active_close(dsn, caplog):
 
     async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
         conn = await p.getconn()
-        pid = conn.pgconn.backend_pid
-        cur = conn.cursor()
-        async with cur.copy(
-            "copy (select * from generate_series(1, 10)) to stdout"
-        ):
-            pass
-        assert conn.pgconn.transaction_status == TransactionStatus.ACTIVE
+        pid = conn.info.backend_pid
+        conn.pgconn.exec_(
+            b"copy (select * from generate_series(1, 10)) to stdout"
+        )
+        assert conn.info.transaction_status == TransactionStatus.ACTIVE
         await p.putconn(conn)
 
         async with p.connection() as conn2:
-            assert conn2.pgconn.backend_pid != pid
-            assert conn2.pgconn.transaction_status == TransactionStatus.IDLE
+            assert conn2.info.backend_pid != pid
+            assert conn2.info.transaction_status == TransactionStatus.IDLE
 
     assert len(caplog.records) == 2
     assert "ACTIVE" in caplog.records[0].message
@@ -551,15 +549,15 @@ async def test_fail_rollback_close(dsn, caplog, monkeypatch):
         orig_rollback = conn.rollback
         monkeypatch.setattr(conn, "rollback", bad_rollback)
 
-        pid = conn.pgconn.backend_pid
+        pid = conn.info.backend_pid
         with pytest.raises(psycopg.ProgrammingError):
             await conn.execute("wat")
-        assert conn.pgconn.transaction_status == TransactionStatus.INERROR
+        assert conn.info.transaction_status == TransactionStatus.INERROR
         await p.putconn(conn)
 
         async with p.connection() as conn2:
-            assert conn2.pgconn.backend_pid != pid
-            assert conn2.pgconn.transaction_status == TransactionStatus.IDLE
+            assert conn2.info.backend_pid != pid
+            assert conn2.info.transaction_status == TransactionStatus.IDLE
 
     assert len(caplog.records) == 3
     assert "INERROR" in caplog.records[0].message
@@ -654,9 +652,7 @@ async def test_closed_queue(dsn):
 
     t2 = create_task(w2())
     # Wait until w2 is in the queue
-    while not p._waiting:
-        await asyncio.sleep(0)
-
+    await ensure_waiting(p)
     await p.close()
 
     # Wait for the workers to finish
@@ -998,7 +994,7 @@ async def test_max_lifetime(dsn):
         pids = []
         for i in range(5):
             async with p.connection() as conn:
-                pids.append(conn.pgconn.backend_pid)
+                pids.append(conn.info.backend_pid)
             await asyncio.sleep(0.2)
 
     assert pids[0] == pids[1] != pids[4], pids
@@ -1009,10 +1005,10 @@ async def test_check(dsn, caplog):
     async with pool.AsyncConnectionPool(dsn, min_size=4) as p:
         await p.wait(1.0)
         async with p.connection() as conn:
-            pid = conn.pgconn.backend_pid
+            pid = conn.info.backend_pid
 
         await p.wait(1.0)
-        pids = set(conn.pgconn.backend_pid for conn in p._pool)
+        pids = set(conn.info.backend_pid for conn in p._pool)
         assert pid in pids
         await conn.close()
 
@@ -1020,7 +1016,7 @@ async def test_check(dsn, caplog):
         await p.check()
         assert len(caplog.records) == 1
         await p.wait(1.0)
-        pids2 = set(conn.pgconn.backend_pid for conn in p._pool)
+        pids2 = set(conn.info.backend_pid for conn in p._pool)
         assert len(pids & pids2) == 3
         assert pid not in pids2
 
@@ -1163,3 +1159,8 @@ def delay_connection(monkeypatch, sec):
 
     connect_orig = psycopg.AsyncConnection.connect
     monkeypatch.setattr(psycopg.AsyncConnection, "connect", connect_delay)
+
+
+async def ensure_waiting(p, num=1):
+    while len(p._waiting) < num:
+        await asyncio.sleep(0)