From: Daniele Varrazzo Date: Sat, 8 Jan 2022 18:37:14 +0000 (+0100) Subject: More robust null pool tests X-Git-Tag: pool-3.1~21^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=f6f53c3e0c2bcef98f1566d6dd9b40e8a00580c5;p=thirdparty%2Fpsycopg.git More robust null pool tests Fix race conditions of tasks completing before a concurrent client is queued. Fix test relying on fixed broken behaviour of copy(). Use connection.info rather than connection.pgconn to obtain higher level results (e.g. Enum instead of int). --- diff --git a/tests/pool/test_null_pool.py b/tests/pool/test_null_pool.py index 9747c5e19..74b880f6c 100644 --- a/tests/pool/test_null_pool.py +++ b/tests/pool/test_null_pool.py @@ -9,7 +9,7 @@ from packaging.version import parse as ver # noqa: F401 # used in skipif import psycopg from psycopg.pq import TransactionStatus -from .test_pool import delay_connection +from .test_pool import delay_connection, ensure_waiting try: from psycopg_pool import NullConnectionPool @@ -67,7 +67,7 @@ def test_its_no_pool_at_all(dsn): (pid2,) = cur.fetchone() # type: ignore[misc] with p.connection() as conn: - assert conn.pgconn.backend_pid not in (pid1, pid2) + assert conn.info.backend_pid not in (pid1, pid2) def test_context(dsn): @@ -189,7 +189,7 @@ def test_reset(dsn): assert resets == 1 with conn.execute("show timezone") as cur: assert cur.fetchone() == ("UTC",) - pids.append(conn.pgconn.backend_pid) + pids.append(conn.info.backend_pid) with NullConnectionPool(dsn, max_size=1, reset=reset) as p: with p.connection() as conn: @@ -198,10 +198,11 @@ def test_reset(dsn): # instead of making a new one. t = Thread(target=worker) t.start() + ensure_waiting(p) assert resets == 0 conn.execute("set timezone to '+2:00'") - pids.append(conn.pgconn.backend_pid) + pids.append(conn.info.backend_pid) t.join() p.wait() @@ -221,16 +222,17 @@ def test_reset_badstate(dsn, caplog): def worker(): with p.connection() as conn: conn.execute("select 1") - pids.append(conn.pgconn.backend_pid) + pids.append(conn.info.backend_pid) with NullConnectionPool(dsn, max_size=1, reset=reset) as p: with p.connection() as conn: t = Thread(target=worker) t.start() + ensure_waiting(p) conn.execute("select 1") - pids.append(conn.pgconn.backend_pid) + pids.append(conn.info.backend_pid) t.join() @@ -251,16 +253,17 @@ def test_reset_broken(dsn, caplog): def worker(): with p.connection() as conn: conn.execute("select 1") - pids.append(conn.pgconn.backend_pid) + pids.append(conn.info.backend_pid) with NullConnectionPool(dsn, max_size=1, reset=reset) as p: with p.connection() as conn: t = Thread(target=worker) t.start() + ensure_waiting(p) conn.execute("select 1") - pids.append(conn.pgconn.backend_pid) + pids.append(conn.info.backend_pid) t.join() @@ -466,8 +469,8 @@ def test_intrans_rollback(dsn, caplog): def worker(): with p.connection() as conn: - pids.append(conn.pgconn.backend_pid) - assert conn.pgconn.transaction_status == TransactionStatus.IDLE + pids.append(conn.info.backend_pid) + assert conn.info.transaction_status == TransactionStatus.IDLE assert not conn.execute( "select 1 from pg_class where relname = 'test_intrans_rollback'" ).fetchone() @@ -479,10 +482,11 @@ def test_intrans_rollback(dsn, caplog): # of making a new one. t = Thread(target=worker) t.start() + ensure_waiting(p) - pids.append(conn.pgconn.backend_pid) + pids.append(conn.info.backend_pid) conn.execute("create table test_intrans_rollback ()") - assert conn.pgconn.transaction_status == TransactionStatus.INTRANS + assert conn.info.transaction_status == TransactionStatus.INTRANS p.putconn(conn) t.join() @@ -497,8 +501,8 @@ def test_inerror_rollback(dsn, caplog): def worker(): with p.connection() as conn: - pids.append(conn.pgconn.backend_pid) - assert conn.pgconn.transaction_status == TransactionStatus.IDLE + pids.append(conn.info.backend_pid) + assert conn.info.transaction_status == TransactionStatus.IDLE with NullConnectionPool(dsn, max_size=1) as p: conn = p.getconn() @@ -507,11 +511,12 @@ def test_inerror_rollback(dsn, caplog): # of making a new one. t = Thread(target=worker) t.start() + ensure_waiting(p) - pids.append(conn.pgconn.backend_pid) + pids.append(conn.info.backend_pid) with pytest.raises(psycopg.ProgrammingError): conn.execute("wat") - assert conn.pgconn.transaction_status == TransactionStatus.INERROR + assert conn.info.transaction_status == TransactionStatus.INERROR p.putconn(conn) t.join() @@ -526,20 +531,21 @@ def test_active_close(dsn, caplog): def worker(): with p.connection() as conn: - pids.append(conn.pgconn.backend_pid) - assert conn.pgconn.transaction_status == TransactionStatus.IDLE + pids.append(conn.info.backend_pid) + assert conn.info.transaction_status == TransactionStatus.IDLE with NullConnectionPool(dsn, max_size=1) as p: conn = p.getconn() t = Thread(target=worker) t.start() + ensure_waiting(p) - pids.append(conn.pgconn.backend_pid) - cur = conn.cursor() - with cur.copy("copy (select * from generate_series(1, 10)) to stdout"): - pass - assert conn.pgconn.transaction_status == TransactionStatus.ACTIVE + pids.append(conn.info.backend_pid) + conn.pgconn.exec_( + b"copy (select * from generate_series(1, 10)) to stdout" + ) + assert conn.info.transaction_status == TransactionStatus.ACTIVE p.putconn(conn) t.join() @@ -555,8 +561,8 @@ def test_fail_rollback_close(dsn, caplog, monkeypatch): def worker(p): with p.connection() as conn: - pids.append(conn.pgconn.backend_pid) - assert conn.pgconn.transaction_status == TransactionStatus.IDLE + pids.append(conn.info.backend_pid) + assert conn.info.transaction_status == TransactionStatus.IDLE with NullConnectionPool(dsn, max_size=1) as p: conn = p.getconn() @@ -571,11 +577,12 @@ def test_fail_rollback_close(dsn, caplog, monkeypatch): t = Thread(target=worker, args=(p,)) t.start() + ensure_waiting(p) - pids.append(conn.pgconn.backend_pid) + pids.append(conn.info.backend_pid) with pytest.raises(psycopg.ProgrammingError): conn.execute("wat") - assert conn.pgconn.transaction_status == TransactionStatus.INERROR + assert conn.info.transaction_status == TransactionStatus.INERROR p.putconn(conn) t.join() @@ -686,8 +693,7 @@ def test_closed_queue(dsn): t2 = Thread(target=w2) t2.start() # Wait until w2 is in the queue - while not p._waiting: - sleep(0) + ensure_waiting(p) p.close(0) @@ -780,7 +786,7 @@ def test_max_lifetime(dsn): def worker(p): with p.connection() as conn: - pids.append(conn.pgconn.backend_pid) + pids.append(conn.info.backend_pid) sleep(0.1) ts = [] diff --git a/tests/pool/test_null_pool_async.py b/tests/pool/test_null_pool_async.py index 9175307e7..fa31984b8 100644 --- a/tests/pool/test_null_pool_async.py +++ b/tests/pool/test_null_pool_async.py @@ -10,7 +10,7 @@ from packaging.version import parse as ver # noqa: F401 # used in skipif import psycopg from psycopg.pq import TransactionStatus from psycopg._compat import create_task -from .test_pool_async import delay_connection +from .test_pool_async import delay_connection, ensure_waiting pytestmark = [ pytest.mark.asyncio, @@ -76,7 +76,7 @@ async def test_its_no_pool_at_all(dsn): (pid2,) = await cur.fetchone() # type: ignore[misc] async with p.connection() as conn: - assert conn.pgconn.backend_pid not in (pid1, pid2) + assert conn.info.backend_pid not in (pid1, pid2) async def test_context(dsn): @@ -200,7 +200,7 @@ async def test_reset(dsn): assert resets == 1 cur = await conn.execute("show timezone") assert (await cur.fetchone()) == ("UTC",) - pids.append(conn.pgconn.backend_pid) + pids.append(conn.info.backend_pid) async with AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p: async with p.connection() as conn: @@ -208,10 +208,11 @@ async def test_reset(dsn): # Queue the worker so it will take the same connection a second time # instead of making a new one. t = create_task(worker()) + await ensure_waiting(p) assert resets == 0 await conn.execute("set timezone to '+2:00'") - pids.append(conn.pgconn.backend_pid) + pids.append(conn.info.backend_pid) await asyncio.gather(t) await p.wait() @@ -231,15 +232,16 @@ async def test_reset_badstate(dsn, caplog): async def worker(): async with p.connection() as conn: await conn.execute("select 1") - pids.append(conn.pgconn.backend_pid) + pids.append(conn.info.backend_pid) async with AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p: async with p.connection() as conn: t = create_task(worker()) + await ensure_waiting(p) await conn.execute("select 1") - pids.append(conn.pgconn.backend_pid) + pids.append(conn.info.backend_pid) await asyncio.gather(t) @@ -260,15 +262,16 @@ async def test_reset_broken(dsn, caplog): async def worker(): async with p.connection() as conn: await conn.execute("select 1") - pids.append(conn.pgconn.backend_pid) + pids.append(conn.info.backend_pid) async with AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p: async with p.connection() as conn: t = create_task(worker()) + await ensure_waiting(p) await conn.execute("select 1") - pids.append(conn.pgconn.backend_pid) + pids.append(conn.info.backend_pid) await asyncio.gather(t) @@ -465,8 +468,8 @@ async def test_intrans_rollback(dsn, caplog): async def worker(): async with p.connection() as conn: - pids.append(conn.pgconn.backend_pid) - assert conn.pgconn.transaction_status == TransactionStatus.IDLE + pids.append(conn.info.backend_pid) + assert conn.info.transaction_status == TransactionStatus.IDLE cur = await conn.execute( "select 1 from pg_class where relname = 'test_intrans_rollback'" ) @@ -478,10 +481,11 @@ async def test_intrans_rollback(dsn, caplog): # Queue the worker so it will take the connection a second time instead # of making a new one. t = create_task(worker()) + await ensure_waiting(p) - pids.append(conn.pgconn.backend_pid) + pids.append(conn.info.backend_pid) await conn.execute("create table test_intrans_rollback ()") - assert conn.pgconn.transaction_status == TransactionStatus.INTRANS + assert conn.info.transaction_status == TransactionStatus.INTRANS await p.putconn(conn) await asyncio.gather(t) @@ -496,18 +500,19 @@ async def test_inerror_rollback(dsn, caplog): async def worker(): async with p.connection() as conn: - pids.append(conn.pgconn.backend_pid) - assert conn.pgconn.transaction_status == TransactionStatus.IDLE + pids.append(conn.info.backend_pid) + assert conn.info.transaction_status == TransactionStatus.IDLE async with AsyncNullConnectionPool(dsn, max_size=1) as p: conn = await p.getconn() t = create_task(worker()) + await ensure_waiting(p) - pids.append(conn.pgconn.backend_pid) + pids.append(conn.info.backend_pid) with pytest.raises(psycopg.ProgrammingError): await conn.execute("wat") - assert conn.pgconn.transaction_status == TransactionStatus.INERROR + assert conn.info.transaction_status == TransactionStatus.INERROR await p.putconn(conn) await asyncio.gather(t) @@ -522,21 +527,20 @@ async def test_active_close(dsn, caplog): async def worker(): async with p.connection() as conn: - pids.append(conn.pgconn.backend_pid) - assert conn.pgconn.transaction_status == TransactionStatus.IDLE + pids.append(conn.info.backend_pid) + assert conn.info.transaction_status == TransactionStatus.IDLE async with AsyncNullConnectionPool(dsn, max_size=1) as p: conn = await p.getconn() t = create_task(worker()) + await ensure_waiting(p) - pids.append(conn.pgconn.backend_pid) - cur = conn.cursor() - async with cur.copy( - "copy (select * from generate_series(1, 10)) to stdout" - ): - pass - assert conn.pgconn.transaction_status == TransactionStatus.ACTIVE + pids.append(conn.info.backend_pid) + conn.pgconn.exec_( + b"copy (select * from generate_series(1, 10)) to stdout" + ) + assert conn.info.transaction_status == TransactionStatus.ACTIVE await p.putconn(conn) await asyncio.gather(t) @@ -552,12 +556,13 @@ async def test_fail_rollback_close(dsn, caplog, monkeypatch): async def worker(): async with p.connection() as conn: - pids.append(conn.pgconn.backend_pid) - assert conn.pgconn.transaction_status == TransactionStatus.IDLE + pids.append(conn.info.backend_pid) + assert conn.info.transaction_status == TransactionStatus.IDLE async with AsyncNullConnectionPool(dsn, max_size=1) as p: conn = await p.getconn() t = create_task(worker()) + await ensure_waiting(p) async def bad_rollback(): conn.pgconn.finish() @@ -567,10 +572,10 @@ async def test_fail_rollback_close(dsn, caplog, monkeypatch): orig_rollback = conn.rollback monkeypatch.setattr(conn, "rollback", bad_rollback) - pids.append(conn.pgconn.backend_pid) + pids.append(conn.info.backend_pid) with pytest.raises(psycopg.ProgrammingError): await conn.execute("wat") - assert conn.pgconn.transaction_status == TransactionStatus.INERROR + assert conn.info.transaction_status == TransactionStatus.INERROR await p.putconn(conn) await asyncio.gather(t) @@ -668,9 +673,7 @@ async def test_closed_queue(dsn): t2 = create_task(w2()) # Wait until w2 is in the queue - while not p._waiting: - await asyncio.sleep(0) - + await ensure_waiting(p) await p.close() # Wait for the workers to finish @@ -760,7 +763,7 @@ async def test_max_lifetime(dsn): async def worker(): async with p.connection() as conn: - pids.append(conn.pgconn.backend_pid) + pids.append(conn.info.backend_pid) await asyncio.sleep(0.1) async with AsyncNullConnectionPool(dsn, max_size=1, max_lifetime=0.2) as p: diff --git a/tests/pool/test_pool.py b/tests/pool/test_pool.py index 17cac2bcb..a7634b459 100644 --- a/tests/pool/test_pool.py +++ b/tests/pool/test_pool.py @@ -79,7 +79,7 @@ def test_its_really_a_pool(dsn): (pid2,) = cur.fetchone() # type: ignore[misc] with p.connection() as conn: - assert conn.pgconn.backend_pid in (pid1, pid2) + assert conn.info.backend_pid in (pid1, pid2) def test_context(dsn): @@ -92,11 +92,11 @@ def test_connection_not_lost(dsn): with pool.ConnectionPool(dsn, min_size=1) as p: with pytest.raises(ZeroDivisionError): with p.connection() as conn: - pid = conn.pgconn.backend_pid + pid = conn.info.backend_pid 1 / 0 with p.connection() as conn2: - assert conn2.pgconn.backend_pid == pid + assert conn2.info.backend_pid == pid @pytest.mark.slow @@ -262,11 +262,11 @@ def test_reset_badstate(dsn, caplog): with pool.ConnectionPool(dsn, min_size=1, reset=reset) as p: with p.connection() as conn: conn.execute("select 1") - pid1 = conn.pgconn.backend_pid + pid1 = conn.info.backend_pid with p.connection() as conn: conn.execute("select 1") - pid2 = conn.pgconn.backend_pid + pid2 = conn.info.backend_pid assert pid1 != pid2 assert caplog.records @@ -283,11 +283,11 @@ def test_reset_broken(dsn, caplog): with pool.ConnectionPool(dsn, min_size=1, reset=reset) as p: with p.connection() as conn: conn.execute("select 1") - pid1 = conn.pgconn.backend_pid + pid1 = conn.info.backend_pid with p.connection() as conn: conn.execute("select 1") - pid2 = conn.pgconn.backend_pid + pid2 = conn.info.backend_pid assert pid1 != pid2 assert caplog.records @@ -480,14 +480,14 @@ def test_intrans_rollback(dsn, caplog): with pool.ConnectionPool(dsn, min_size=1) as p: conn = p.getconn() - pid = conn.pgconn.backend_pid + pid = conn.info.backend_pid conn.execute("create table test_intrans_rollback ()") - assert conn.pgconn.transaction_status == TransactionStatus.INTRANS + assert conn.info.transaction_status == TransactionStatus.INTRANS p.putconn(conn) with p.connection() as conn2: - assert conn2.pgconn.backend_pid == pid - assert conn2.pgconn.transaction_status == TransactionStatus.IDLE + assert conn2.info.backend_pid == pid + assert conn2.info.transaction_status == TransactionStatus.IDLE assert not conn2.execute( "select 1 from pg_class where relname = 'test_intrans_rollback'" ).fetchone() @@ -501,15 +501,15 @@ def test_inerror_rollback(dsn, caplog): with pool.ConnectionPool(dsn, min_size=1) as p: conn = p.getconn() - pid = conn.pgconn.backend_pid + pid = conn.info.backend_pid with pytest.raises(psycopg.ProgrammingError): conn.execute("wat") - assert conn.pgconn.transaction_status == TransactionStatus.INERROR + assert conn.info.transaction_status == TransactionStatus.INERROR p.putconn(conn) with p.connection() as conn2: - assert conn2.pgconn.backend_pid == pid - assert conn2.pgconn.transaction_status == TransactionStatus.IDLE + assert conn2.info.backend_pid == pid + assert conn2.info.transaction_status == TransactionStatus.IDLE assert len(caplog.records) == 1 assert "INERROR" in caplog.records[0].message @@ -520,16 +520,16 @@ def test_active_close(dsn, caplog): with pool.ConnectionPool(dsn, min_size=1) as p: conn = p.getconn() - pid = conn.pgconn.backend_pid - cur = conn.cursor() - with cur.copy("copy (select * from generate_series(1, 10)) to stdout"): - pass - assert conn.pgconn.transaction_status == TransactionStatus.ACTIVE + pid = conn.info.backend_pid + conn.pgconn.exec_( + b"copy (select * from generate_series(1, 10)) to stdout" + ) + assert conn.info.transaction_status == TransactionStatus.ACTIVE p.putconn(conn) with p.connection() as conn2: - assert conn2.pgconn.backend_pid != pid - assert conn2.pgconn.transaction_status == TransactionStatus.IDLE + assert conn2.info.backend_pid != pid + assert conn2.info.transaction_status == TransactionStatus.IDLE assert len(caplog.records) == 2 assert "ACTIVE" in caplog.records[0].message @@ -550,15 +550,15 @@ def test_fail_rollback_close(dsn, caplog, monkeypatch): orig_rollback = conn.rollback monkeypatch.setattr(conn, "rollback", bad_rollback) - pid = conn.pgconn.backend_pid + pid = conn.info.backend_pid with pytest.raises(psycopg.ProgrammingError): conn.execute("wat") - assert conn.pgconn.transaction_status == TransactionStatus.INERROR + assert conn.info.transaction_status == TransactionStatus.INERROR p.putconn(conn) with p.connection() as conn2: - assert conn2.pgconn.backend_pid != pid - assert conn2.pgconn.transaction_status == TransactionStatus.IDLE + assert conn2.info.backend_pid != pid + assert conn2.info.transaction_status == TransactionStatus.IDLE assert len(caplog.records) == 3 assert "INERROR" in caplog.records[0].message @@ -678,8 +678,7 @@ def test_closed_queue(dsn): t2 = Thread(target=w2) t2.start() # Wait until w2 is in the queue - while not p._waiting: - sleep(0) + ensure_waiting(p) p.close(0) @@ -1020,7 +1019,7 @@ def test_max_lifetime(dsn): pids = [] for i in range(5): with p.connection() as conn: - pids.append(conn.pgconn.backend_pid) + pids.append(conn.info.backend_pid) sleep(0.2) assert pids[0] == pids[1] != pids[4], pids @@ -1031,10 +1030,10 @@ def test_check(dsn, caplog): with pool.ConnectionPool(dsn, min_size=4) as p: p.wait(1.0) with p.connection() as conn: - pid = conn.pgconn.backend_pid + pid = conn.info.backend_pid p.wait(1.0) - pids = set(conn.pgconn.backend_pid for conn in p._pool) + pids = set(conn.info.backend_pid for conn in p._pool) assert pid in pids conn.close() @@ -1042,7 +1041,7 @@ def test_check(dsn, caplog): p.check() assert len(caplog.records) == 1 p.wait(1.0) - pids2 = set(conn.pgconn.backend_pid for conn in p._pool) + pids2 = set(conn.info.backend_pid for conn in p._pool) assert len(pids & pids2) == 3 assert pid not in pids2 @@ -1197,3 +1196,11 @@ def delay_connection(monkeypatch, sec): connect_orig = psycopg.Connection.connect monkeypatch.setattr(psycopg.Connection, "connect", connect_delay) + + +def ensure_waiting(p, num=1): + """ + Wait until there are at least *num* clients waiting in the queue. + """ + while len(p._waiting) < num: + sleep(0) diff --git a/tests/pool/test_pool_async.py b/tests/pool/test_pool_async.py index 9618af1db..eb8486875 100644 --- a/tests/pool/test_pool_async.py +++ b/tests/pool/test_pool_async.py @@ -74,7 +74,7 @@ async def test_its_really_a_pool(dsn): (pid2,) = await cur.fetchone() # type: ignore[misc] async with p.connection() as conn: - assert conn.pgconn.backend_pid in (pid1, pid2) + assert conn.info.backend_pid in (pid1, pid2) async def test_context(dsn): @@ -87,11 +87,11 @@ async def test_connection_not_lost(dsn): async with pool.AsyncConnectionPool(dsn, min_size=1) as p: with pytest.raises(ZeroDivisionError): async with p.connection() as conn: - pid = conn.pgconn.backend_pid + pid = conn.info.backend_pid 1 / 0 async with p.connection() as conn2: - assert conn2.pgconn.backend_pid == pid + assert conn2.info.backend_pid == pid @pytest.mark.slow @@ -269,11 +269,11 @@ async def test_reset_badstate(dsn, caplog): async with pool.AsyncConnectionPool(dsn, min_size=1, reset=reset) as p: async with p.connection() as conn: await conn.execute("select 1") - pid1 = conn.pgconn.backend_pid + pid1 = conn.info.backend_pid async with p.connection() as conn: await conn.execute("select 1") - pid2 = conn.pgconn.backend_pid + pid2 = conn.info.backend_pid assert pid1 != pid2 assert caplog.records @@ -290,11 +290,11 @@ async def test_reset_broken(dsn, caplog): async with pool.AsyncConnectionPool(dsn, min_size=1, reset=reset) as p: async with p.connection() as conn: await conn.execute("select 1") - pid1 = conn.pgconn.backend_pid + pid1 = conn.info.backend_pid async with p.connection() as conn: await conn.execute("select 1") - pid2 = conn.pgconn.backend_pid + pid2 = conn.info.backend_pid assert pid1 != pid2 assert caplog.records @@ -478,14 +478,14 @@ async def test_intrans_rollback(dsn, caplog): async with pool.AsyncConnectionPool(dsn, min_size=1) as p: conn = await p.getconn() - pid = conn.pgconn.backend_pid + pid = conn.info.backend_pid await conn.execute("create table test_intrans_rollback ()") - assert conn.pgconn.transaction_status == TransactionStatus.INTRANS + assert conn.info.transaction_status == TransactionStatus.INTRANS await p.putconn(conn) async with p.connection() as conn2: - assert conn2.pgconn.backend_pid == pid - assert conn2.pgconn.transaction_status == TransactionStatus.IDLE + assert conn2.info.backend_pid == pid + assert conn2.info.transaction_status == TransactionStatus.IDLE cur = await conn2.execute( "select 1 from pg_class where relname = 'test_intrans_rollback'" ) @@ -500,15 +500,15 @@ async def test_inerror_rollback(dsn, caplog): async with pool.AsyncConnectionPool(dsn, min_size=1) as p: conn = await p.getconn() - pid = conn.pgconn.backend_pid + pid = conn.info.backend_pid with pytest.raises(psycopg.ProgrammingError): await conn.execute("wat") - assert conn.pgconn.transaction_status == TransactionStatus.INERROR + assert conn.info.transaction_status == TransactionStatus.INERROR await p.putconn(conn) async with p.connection() as conn2: - assert conn2.pgconn.backend_pid == pid - assert conn2.pgconn.transaction_status == TransactionStatus.IDLE + assert conn2.info.backend_pid == pid + assert conn2.info.transaction_status == TransactionStatus.IDLE assert len(caplog.records) == 1 assert "INERROR" in caplog.records[0].message @@ -519,18 +519,16 @@ async def test_active_close(dsn, caplog): async with pool.AsyncConnectionPool(dsn, min_size=1) as p: conn = await p.getconn() - pid = conn.pgconn.backend_pid - cur = conn.cursor() - async with cur.copy( - "copy (select * from generate_series(1, 10)) to stdout" - ): - pass - assert conn.pgconn.transaction_status == TransactionStatus.ACTIVE + pid = conn.info.backend_pid + conn.pgconn.exec_( + b"copy (select * from generate_series(1, 10)) to stdout" + ) + assert conn.info.transaction_status == TransactionStatus.ACTIVE await p.putconn(conn) async with p.connection() as conn2: - assert conn2.pgconn.backend_pid != pid - assert conn2.pgconn.transaction_status == TransactionStatus.IDLE + assert conn2.info.backend_pid != pid + assert conn2.info.transaction_status == TransactionStatus.IDLE assert len(caplog.records) == 2 assert "ACTIVE" in caplog.records[0].message @@ -551,15 +549,15 @@ async def test_fail_rollback_close(dsn, caplog, monkeypatch): orig_rollback = conn.rollback monkeypatch.setattr(conn, "rollback", bad_rollback) - pid = conn.pgconn.backend_pid + pid = conn.info.backend_pid with pytest.raises(psycopg.ProgrammingError): await conn.execute("wat") - assert conn.pgconn.transaction_status == TransactionStatus.INERROR + assert conn.info.transaction_status == TransactionStatus.INERROR await p.putconn(conn) async with p.connection() as conn2: - assert conn2.pgconn.backend_pid != pid - assert conn2.pgconn.transaction_status == TransactionStatus.IDLE + assert conn2.info.backend_pid != pid + assert conn2.info.transaction_status == TransactionStatus.IDLE assert len(caplog.records) == 3 assert "INERROR" in caplog.records[0].message @@ -654,9 +652,7 @@ async def test_closed_queue(dsn): t2 = create_task(w2()) # Wait until w2 is in the queue - while not p._waiting: - await asyncio.sleep(0) - + await ensure_waiting(p) await p.close() # Wait for the workers to finish @@ -998,7 +994,7 @@ async def test_max_lifetime(dsn): pids = [] for i in range(5): async with p.connection() as conn: - pids.append(conn.pgconn.backend_pid) + pids.append(conn.info.backend_pid) await asyncio.sleep(0.2) assert pids[0] == pids[1] != pids[4], pids @@ -1009,10 +1005,10 @@ async def test_check(dsn, caplog): async with pool.AsyncConnectionPool(dsn, min_size=4) as p: await p.wait(1.0) async with p.connection() as conn: - pid = conn.pgconn.backend_pid + pid = conn.info.backend_pid await p.wait(1.0) - pids = set(conn.pgconn.backend_pid for conn in p._pool) + pids = set(conn.info.backend_pid for conn in p._pool) assert pid in pids await conn.close() @@ -1020,7 +1016,7 @@ async def test_check(dsn, caplog): await p.check() assert len(caplog.records) == 1 await p.wait(1.0) - pids2 = set(conn.pgconn.backend_pid for conn in p._pool) + pids2 = set(conn.info.backend_pid for conn in p._pool) assert len(pids & pids2) == 3 assert pid not in pids2 @@ -1163,3 +1159,8 @@ def delay_connection(monkeypatch, sec): connect_orig = psycopg.AsyncConnection.connect monkeypatch.setattr(psycopg.AsyncConnection, "connect", connect_delay) + + +async def ensure_waiting(p, num=1): + while len(p._waiting) < num: + await asyncio.sleep(0)