]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
test: improve wait functions tests
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 14 Sep 2025 15:46:34 +0000 (17:46 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Mon, 13 Oct 2025 11:26:01 +0000 (13:26 +0200)
Check for consistent behaviour and expected result. The test currently
pass on the Python module and highlight shortcoming in the C function.

Complete wait async test, generate sync version automatically.

See #1141

tests/test_waiting.py
tests/test_waiting_async.py [new file with mode: 0644]
tools/async_to_sync.py

index 2c40c476743dd597b7eb88e990647078bdbaada4..78eae8652fd57bdf3cfe80dbac4163f413bce6c4 100644 (file)
@@ -1,8 +1,10 @@
+# WARNING: this file is auto-generated by 'async_to_sync.py'
+# from the original file 'test_waiting_async.py'
+# DO NOT CHANGE! Change the original file instead.
 import sys
 import time
 import select  # noqa: used in pytest.mark.skipif
 import socket
-from threading import Event
 
 import pytest
 
@@ -11,7 +13,7 @@ from psycopg import generators, waiting
 from psycopg.pq import ConnStatus, ExecStatus
 from psycopg.conninfo import make_conninfo
 
-from .acompat import gather, spawn
+from .acompat import Event, gather, sleep, spawn
 
 skip_if_not_linux = pytest.mark.skipif(
     not sys.platform.startswith("linux"), reason="non-Linux platform"
@@ -31,14 +33,19 @@ waitfns = [
 ]
 
 events = ["R", "W", "RW"]
-intervals = [pytest.param({}, id="blank")]
-intervals += [pytest.param({"interval": x}, id=str(x)) for x in [0, 0.2, 10]]
+intervals = [0, 0.2, 2]
 
 
-@pytest.mark.parametrize("timeout", intervals)
-def test_wait_conn(dsn, timeout):
+def tgen(wait):
+    """A generator waiting for a specific event and returning what waited on."""
+    r = yield wait
+    return r
+
+
+@pytest.mark.parametrize("interval", intervals)
+def test_wait_conn(dsn, interval):
     gen = generators.connect(dsn)
-    conn = waiting.wait_conn(gen, **timeout)
+    conn = waiting.wait_conn(gen, interval)
     assert conn.status == ConnStatus.OK
 
 
@@ -50,66 +57,194 @@ def test_wait_conn_bad(dsn):
 
 
 @pytest.mark.slow
+@pytest.mark.skipif("sys.platform != 'linux'")
+@pytest.mark.parametrize("interval", [i for i in intervals if i > 0])
+@pytest.mark.parametrize("ready", ["R", "NONE"])
+@pytest.mark.parametrize("event", ["R", "RW"])
 @pytest.mark.parametrize("waitfn", waitfns)
-@skip_if_not_linux
-def test_wait_r(waitfn):
+def test_wait_r(waitfn, event, ready, interval, request):
+    # Test that wait functions handle waiting and returning state correctly
+    # This test doesn't work on macOS for some internal race condition betwwn
+    # listen/connect/accept.
     waitfn = getattr(waiting, waitfn)
+    wait = getattr(waiting.Wait, event)
+    ready = getattr(waiting.Ready, ready)
+    delay = interval / 2
 
     port = None
     ev = Event()
 
     def writer():
+        # Wake up the socket, or let it time out, according to the expected `ready`.
         ev.wait()
         assert port
-        time.sleep(0.2)
-        with socket.create_connection(("127.0.0.1", port)):
-            pass
+        sleep(delay)
+        if ready == waiting.Ready.R:
+            with socket.create_connection(("127.0.0.1", port)):
+                pass
 
-    t = spawn(writer)
+    tasks = [spawn(writer)]
 
-    def gen():
-        r = yield waiting.Wait.R
-        return r
+    try:
+        with socket.socket() as s:
+            # Make a listening socket
+            s.bind(("127.0.0.1", 0))
+            port = s.getsockname()[1]
+            s.listen(10)
+            s.setblocking(False)
+            # Let the writer start
+            ev.set()
+            # Wait for socket ready to read or timing out
+            t0 = time.time()
+            r = waitfn(tgen(wait), s.fileno(), interval)
+            dt = time.time() - t0
+            # Check timing and received waiting state
+            assert r == ready
+            if check_timing(request):
+                exptime = {waiting.Ready.R: delay, waiting.Ready.NONE: interval}[ready]
+                assert exptime <= dt < exptime * 1.2
+    finally:
+        gather(*tasks)
 
-    with socket.socket() as s:
-        s.bind(("", 0))
-        port = s.getsockname()[1]
-        s.listen()
+
+@pytest.mark.slow
+@pytest.mark.skipif("sys.platform == 'linux'")
+@pytest.mark.parametrize("interval", [2])
+@pytest.mark.parametrize("ready", ["R", "NONE"])
+@pytest.mark.parametrize("waitfn", waitfns)
+def test_wait_r_no_linux(waitfn, ready, interval, request):
+    # A version of test_wait_r that works on macOS too, but doesn't allow to
+    # test for the RW wait (because it seems that the sockets returned by
+    # socketpair() is immediately w-ready, including the r one.
+    waitfn = getattr(waiting, waitfn)
+    wait = waiting.Wait.R
+    ready = getattr(waiting.Ready, ready)
+    delay = interval / 2
+
+    ev = Event()
+
+    def writer():
+        # Wake up the socket, or let it time out, according to the expected `ready`.
+        ev.wait()
+        sleep(delay)
+        if ready == waiting.Ready.R:
+            for att in range(10):
+                try:
+                    ws.sendall(b"hi")
+                    ws.close()
+                except Exception as ex:
+                    the_ex = ex
+                    sleep(0.1)
+                else:
+                    break
+            else:
+                pytest.fail(
+                    f"failed after many attempts. Socket: {ws}, Last error: {the_ex}"
+                )
+
+    tasks = [spawn(writer)]
+    try:
+        rs, ws = socket.socketpair()
+        rs.setblocking(False)
+        ws.setblocking(False)
+        # Let the writer start
         ev.set()
+        # Wait for socket ready to read or timing out
         t0 = time.time()
-        r = waitfn(gen(), s.fileno(), 0.3)
-        tf = time.time()
-        assert r == waiting.Ready.R
-        assert 0.2 <= tf - t0 < 0.3
-
-    gather(t)
+        r = waitfn(tgen(wait), rs.fileno(), interval)
+        dt = time.time() - t0
+        # Check timing and received waiting state
+        assert r == ready
+        if check_timing(request):
+            exptime = {waiting.Ready.R: delay, waiting.Ready.NONE: interval}[ready]
+            assert exptime <= dt < exptime * 1.2
+    finally:
+        gather(*tasks)
+        rs.close()
+        ws.close()
 
 
+@pytest.mark.parametrize("ready", ["R", "NONE"])
+@pytest.mark.parametrize("event", ["R", "RW"])
 @pytest.mark.parametrize("waitfn", waitfns)
-@pytest.mark.parametrize("event", events)
-@skip_if_not_linux
-def test_wait_ready(waitfn, event):
-    wait = getattr(waiting.Wait, event)
-    ready = getattr(waiting.Ready, event)
+def test_wait_r_nowait(waitfn, event, ready, request):
+    # Test that wait functions handle a poll when called with no timeout
     waitfn = getattr(waiting, waitfn)
+    wait = getattr(waiting.Wait, event)
+    ready = getattr(waiting.Ready, ready)
+
+    port = None
+    ev1 = Event()
+    ev2 = Event()
+    ev3 = Event()
+
+    def writer():
+        ev1.wait()
+        assert port
+        if ready == waiting.Ready.R:
+            with socket.create_connection(("127.0.0.1", port)):
+                ev2.set()
+        else:
+            ev2.set()
+
+    def unblocker():
+        # If test doesn't pass, wake up the socket again to avoid hanging forever
+        if not ev3.wait(0.5):
+            assert port
+            with socket.create_connection(("127.0.0.1", port)):
+                pass
+
+    t1 = spawn(writer)
+    t2 = spawn(unblocker)
+    try:
+        with socket.socket() as s:
+            s.bind(("127.0.0.1", 0))
+            port = s.getsockname()[1]
+            s.listen(10)
+            s.setblocking(False)
+            ev1.set()
+            ev2.wait()
+            t0 = time.time()
+            r = waitfn(tgen(wait), s.fileno())
+            dt = time.time() - t0
+            ev3.set()  # unblock the unblocker
+            if check_timing(request):
+                assert dt < 0.1
+            assert r == ready
+    finally:
+        # await gather(t1)
+        gather(t1, t2)
+
 
-    def gen():
-        r = yield wait
-        return r
+@pytest.mark.slow
+@pytest.mark.parametrize("event", ["W", "RW"])
+@pytest.mark.parametrize("waitfn", waitfns)
+def test_wait_w(waitfn, event, request):
+    # Test that wait functions handle waiting and returning state correctly
+    waitfn = getattr(waiting, waitfn)
+    wait = getattr(waiting.Wait, event)
 
-    with socket.socket() as s:
-        r = waitfn(gen(), s.fileno())
-    assert r & ready
+    rs, ws = socket.socketpair()  # the w socket is already ready for writing
+    rs.setblocking(False)
+    ws.setblocking(False)
+    with rs, ws:
+        t0 = time.time()
+        r = waitfn(tgen(wait), ws.fileno(), 0.5)
+        dt = time.time() - t0
+        # Check timing and received waiting state
+        assert r == waiting.Ready.W
+        if check_timing(request):
+            assert dt < 0.1
 
 
 @pytest.mark.parametrize("waitfn", waitfns)
-@pytest.mark.parametrize("timeout", intervals)
-def test_wait(pgconn, waitfn, timeout):
+@pytest.mark.parametrize("interval", intervals)
+def test_wait(pgconn, waitfn, interval):
     waitfn = getattr(waiting, waitfn)
 
     pgconn.send_query(b"select 1")
     gen = generators.execute(pgconn)
-    (res,) = waitfn(gen, pgconn.socket, **timeout)
+    (res,) = waitfn(gen, pgconn.socket, interval)
     assert res.status == ExecStatus.TUPLES_OK
 
 
@@ -156,9 +291,9 @@ def test_wait_timeout(pgconn, waitfn):
 @pytest.mark.skipif(
     "sys.platform == 'win32'", reason="win32 works ok, but FDs are mysterious"
 )
-@pytest.mark.parametrize("fname", waitfns)
-def test_wait_large_fd(dsn, fname):
-    waitfn = getattr(waiting, fname)
+@pytest.mark.parametrize("waitfn", waitfns)
+def test_wait_large_fd(dsn, waitfn):
+    waitfn = getattr(waiting, waitfn)
 
     files = []
     try:
@@ -173,7 +308,7 @@ def test_wait_large_fd(dsn, fname):
             assert pgconn.socket > 1024
             pgconn.send_query(b"select 1")
             gen = generators.execute(pgconn)
-            if fname == "wait_select":
+            if waitfn.__name__ == "wait_select":
                 with pytest.raises(ValueError):
                     waitfn(gen, pgconn.socket)
             else:
@@ -186,63 +321,25 @@ def test_wait_large_fd(dsn, fname):
             f.close()
 
 
-@pytest.mark.parametrize("timeout", intervals)
-@pytest.mark.anyio
-async def test_wait_conn_async(dsn, timeout):
-    gen = generators.connect(dsn)
-    conn = await waiting.wait_conn_async(gen, **timeout)
-    assert conn.status == ConnStatus.OK
-
-
-@pytest.mark.anyio
-@pytest.mark.crdb("skip", reason="can connect to any db name")
-async def test_wait_conn_async_bad(dsn):
-    gen = generators.connect(make_conninfo(dsn, dbname="nosuchdb"))
-    with pytest.raises(psycopg.OperationalError):
-        await waiting.wait_conn_async(gen)
-
-
-@pytest.mark.anyio
-@pytest.mark.parametrize("event", events)
-@skip_if_not_linux
-async def test_wait_ready_async(event):
-    wait = getattr(waiting.Wait, event)
-    ready = getattr(waiting.Ready, event)
-
-    def gen():
-        r = yield wait
-        return r
-
-    with socket.socket() as s:
-        r = await waiting.wait_async(gen(), s.fileno())
-    assert r & ready
-
-
-@pytest.mark.anyio
-async def test_wait_async(pgconn):
-    pgconn.send_query(b"select 1")
-    gen = generators.execute(pgconn)
-    (res,) = await waiting.wait_async(gen, pgconn.socket)
-    assert res.status == ExecStatus.TUPLES_OK
-
-
-@pytest.mark.anyio
-async def test_wait_async_bad(pgconn):
-    pgconn.send_query(b"select 1")
-    gen = generators.execute(pgconn)
-    socket = pgconn.socket
-    pgconn.finish()
-    with pytest.raises(psycopg.OperationalError):
-        await waiting.wait_async(gen, socket)
-
-
 @pytest.mark.parametrize("waitfn", waitfns)
 def test_wait_timeout_none_unsupported(waitfn):
     waitfn = getattr(waiting, waitfn)
 
-    def gen():
-        r = yield waiting.Wait.R
-        return r
-
     with pytest.raises(ValueError):
-        waitfn(gen(), 1, None)
+        waitfn(tgen(waiting.Wait.R), 1, None)
+
+
+def check_timing(request):
+    """Return true if the test run requires to check timing
+
+    Return false if the user has specified something like `pytest -m "not timing"
+
+    Allow to run the tests to verify if the responses are correct but ignoring
+    the timing, which on macOS and Windows in CI is very slow.
+    """
+    tokens = request.config.option.markexpr.split()
+    if "timing" not in tokens:
+        return True
+    if (idx := tokens.index("timing")) > 0 and tokens[idx - 1] == "not":
+        return False
+    return True
diff --git a/tests/test_waiting_async.py b/tests/test_waiting_async.py
new file mode 100644 (file)
index 0000000..212c9e1
--- /dev/null
@@ -0,0 +1,353 @@
+import sys
+import time
+import select  # noqa: used in pytest.mark.skipif
+import socket
+
+import pytest
+
+import psycopg
+from psycopg import generators, waiting
+from psycopg.pq import ConnStatus, ExecStatus
+from psycopg.conninfo import make_conninfo
+
+from .acompat import AEvent, asleep, gather, spawn
+
+skip_if_not_linux = pytest.mark.skipif(
+    not sys.platform.startswith("linux"), reason="non-Linux platform"
+)
+
+if True:  # ASYNC
+    pytestmark = [pytest.mark.anyio]
+    waitfns = [
+        "wait_async",
+    ]
+else:
+    waitfns = [
+        "wait",
+        "wait_selector",
+        pytest.param(
+            "wait_select", marks=pytest.mark.skipif("not hasattr(select, 'select')")
+        ),
+        pytest.param(
+            "wait_epoll", marks=pytest.mark.skipif("not hasattr(select, 'epoll')")
+        ),
+        pytest.param(
+            "wait_poll", marks=pytest.mark.skipif("not hasattr(select, 'poll')")
+        ),
+        pytest.param(
+            "wait_c", marks=pytest.mark.skipif("not psycopg._cmodule._psycopg")
+        ),
+    ]
+
+
+events = ["R", "W", "RW"]
+intervals = [0, 0.2, 2]
+
+
+def tgen(wait):
+    """A generator waiting for a specific event and returning what waited on."""
+    r = yield wait
+    return r
+
+
+@pytest.mark.parametrize("interval", intervals)
+async def test_wait_conn(dsn, interval):
+    gen = generators.connect(dsn)
+    conn = await waiting.wait_conn_async(gen, interval)
+    assert conn.status == ConnStatus.OK
+
+
+@pytest.mark.crdb("skip", reason="can connect to any db name")
+async def test_wait_conn_bad(dsn):
+    gen = generators.connect(make_conninfo(dsn, dbname="nosuchdb"))
+    with pytest.raises(psycopg.OperationalError):
+        await waiting.wait_conn_async(gen)
+
+
+@pytest.mark.slow
+@pytest.mark.skipif("sys.platform != 'linux'")
+@pytest.mark.parametrize("interval", [i for i in intervals if i > 0])
+@pytest.mark.parametrize("ready", ["R", "NONE"])
+@pytest.mark.parametrize("event", ["R", "RW"])
+@pytest.mark.parametrize("waitfn", waitfns)
+async def test_wait_r(waitfn, event, ready, interval, request):
+    # Test that wait functions handle waiting and returning state correctly
+    # This test doesn't work on macOS for some internal race condition betwwn
+    # listen/connect/accept.
+    waitfn = getattr(waiting, waitfn)
+    wait = getattr(waiting.Wait, event)
+    ready = getattr(waiting.Ready, ready)
+    delay = interval / 2
+
+    port = None
+    ev = AEvent()
+
+    async def writer():
+        # Wake up the socket, or let it time out, according to the expected `ready`.
+        await ev.wait()
+        assert port
+        await asleep(delay)
+        if ready == waiting.Ready.R:
+            with socket.create_connection(("127.0.0.1", port)):
+                pass
+
+    tasks = [spawn(writer)]
+
+    try:
+        with socket.socket() as s:
+            # Make a listening socket
+            s.bind(("127.0.0.1", 0))
+            port = s.getsockname()[1]
+            s.listen(10)
+            s.setblocking(False)
+            # Let the writer start
+            ev.set()
+            # Wait for socket ready to read or timing out
+            t0 = time.time()
+            r = await waitfn(tgen(wait), s.fileno(), interval)
+            dt = time.time() - t0
+            # Check timing and received waiting state
+            assert r == ready
+            if check_timing(request):
+                exptime = {waiting.Ready.R: delay, waiting.Ready.NONE: interval}[ready]
+                assert exptime <= dt < (exptime * 1.2)
+    finally:
+        await gather(*tasks)
+
+
+@pytest.mark.slow
+@pytest.mark.skipif("sys.platform == 'linux'")
+@pytest.mark.parametrize("interval", [2])
+@pytest.mark.parametrize("ready", ["R", "NONE"])
+@pytest.mark.parametrize("waitfn", waitfns)
+async def test_wait_r_no_linux(waitfn, ready, interval, request):
+    # A version of test_wait_r that works on macOS too, but doesn't allow to
+    # test for the RW wait (because it seems that the sockets returned by
+    # socketpair() is immediately w-ready, including the r one.
+    waitfn = getattr(waiting, waitfn)
+    wait = waiting.Wait.R
+    ready = getattr(waiting.Ready, ready)
+    delay = interval / 2
+
+    ev = AEvent()
+
+    async def writer():
+        # Wake up the socket, or let it time out, according to the expected `ready`.
+        await ev.wait()
+        await asleep(delay)
+        if ready == waiting.Ready.R:
+            for att in range(10):
+                try:
+                    ws.sendall(b"hi")
+                    ws.close()
+                except Exception as ex:
+                    the_ex = ex
+                    await asleep(0.1)
+                else:
+                    break
+            else:
+                pytest.fail(
+                    f"failed after many attempts. Socket: {ws}, Last error: {the_ex}"
+                )
+
+    tasks = [spawn(writer)]
+    try:
+        rs, ws = socket.socketpair()
+        rs.setblocking(False)
+        ws.setblocking(False)
+        # Let the writer start
+        ev.set()
+        # Wait for socket ready to read or timing out
+        t0 = time.time()
+        r = await waitfn(tgen(wait), rs.fileno(), interval)
+        dt = time.time() - t0
+        # Check timing and received waiting state
+        assert r == ready
+        if check_timing(request):
+            exptime = {waiting.Ready.R: delay, waiting.Ready.NONE: interval}[ready]
+            assert exptime <= dt < (exptime * 1.2)
+    finally:
+        await gather(*tasks)
+        rs.close()
+        ws.close()
+
+
+@pytest.mark.parametrize("ready", ["R", "NONE"])
+@pytest.mark.parametrize("event", ["R", "RW"])
+@pytest.mark.parametrize("waitfn", waitfns)
+async def test_wait_r_nowait(waitfn, event, ready, request):
+    # Test that wait functions handle a poll when called with no timeout
+    waitfn = getattr(waiting, waitfn)
+    wait = getattr(waiting.Wait, event)
+    ready = getattr(waiting.Ready, ready)
+
+    port = None
+    ev1 = AEvent()
+    ev2 = AEvent()
+    ev3 = AEvent()
+
+    async def writer():
+        await ev1.wait()
+        assert port
+        if ready == waiting.Ready.R:
+            with socket.create_connection(("127.0.0.1", port)):
+                ev2.set()
+        else:
+            ev2.set()
+
+    async def unblocker():
+        # If test doesn't pass, wake up the socket again to avoid hanging forever
+        if not await ev3.wait_timeout(0.5):
+            assert port
+            with socket.create_connection(("127.0.0.1", port)):
+                pass
+
+    t1 = spawn(writer)
+    t2 = spawn(unblocker)
+    try:
+        with socket.socket() as s:
+            s.bind(("127.0.0.1", 0))
+            port = s.getsockname()[1]
+            s.listen(10)
+            s.setblocking(False)
+            ev1.set()
+            await ev2.wait()
+            t0 = time.time()
+            r = await waitfn(tgen(wait), s.fileno())
+            dt = time.time() - t0
+            ev3.set()  # unblock the unblocker
+            if check_timing(request):
+                assert dt < 0.1
+            assert r == ready
+    finally:
+        # await gather(t1)
+        await gather(t1, t2)
+
+
+@pytest.mark.slow
+@pytest.mark.parametrize("event", ["W", "RW"])
+@pytest.mark.parametrize("waitfn", waitfns)
+async def test_wait_w(waitfn, event, request):
+    # Test that wait functions handle waiting and returning state correctly
+    waitfn = getattr(waiting, waitfn)
+    wait = getattr(waiting.Wait, event)
+
+    rs, ws = socket.socketpair()  # the w socket is already ready for writing
+    rs.setblocking(False)
+    ws.setblocking(False)
+    with rs, ws:
+        t0 = time.time()
+        r = await waitfn(tgen(wait), ws.fileno(), 0.5)
+        dt = time.time() - t0
+        # Check timing and received waiting state
+        assert r == waiting.Ready.W
+        if check_timing(request):
+            assert dt < 0.1
+
+
+@pytest.mark.parametrize("waitfn", waitfns)
+@pytest.mark.parametrize("interval", intervals)
+async def test_wait(pgconn, waitfn, interval):
+    waitfn = getattr(waiting, waitfn)
+
+    pgconn.send_query(b"select 1")
+    gen = generators.execute(pgconn)
+    (res,) = await waitfn(gen, pgconn.socket, interval)
+    assert res.status == ExecStatus.TUPLES_OK
+
+
+@pytest.mark.parametrize("waitfn", waitfns)
+async def test_wait_bad(pgconn, waitfn):
+    waitfn = getattr(waiting, waitfn)
+
+    pgconn.send_query(b"select 1")
+    gen = generators.execute(pgconn)
+    pgconn.finish()
+    with pytest.raises(psycopg.OperationalError):
+        await waitfn(gen, pgconn.socket)
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+@pytest.mark.parametrize("waitfn", waitfns)
+async def test_wait_timeout(pgconn, waitfn):
+    waitfn = getattr(waiting, waitfn)
+
+    pgconn.send_query(b"select pg_sleep(0.5)")
+    gen = generators.execute(pgconn)
+
+    ts = [time.time()]
+
+    def gen_wrapper():
+        try:
+            for x in gen:
+                res = yield x
+                ts.append(time.time())
+                gen.send(res)
+        except StopIteration as ex:
+            return ex.value
+
+    (res,) = await waitfn(gen_wrapper(), pgconn.socket, interval=0.1)
+    assert res.status == ExecStatus.TUPLES_OK
+    ds = [t1 - t0 for t0, t1 in zip(ts[:-1], ts[1:])]
+    assert len(ds) >= 5
+    for d in ds[:5]:
+        assert d == pytest.approx(0.1, 0.05)
+
+
+@pytest.mark.slow
+@pytest.mark.skipif(
+    "sys.platform == 'win32'", reason="win32 works ok, but FDs are mysterious"
+)
+@pytest.mark.parametrize("waitfn", waitfns)
+async def test_wait_large_fd(dsn, waitfn):
+    waitfn = getattr(waiting, waitfn)
+
+    files = []
+    try:
+        try:
+            for i in range(1100):
+                files.append(open(__file__))
+        except OSError:
+            pytest.skip("can't open the number of files needed for the test")
+
+        pgconn = psycopg.pq.PGconn.connect(dsn.encode())
+        try:
+            assert pgconn.socket > 1024
+            pgconn.send_query(b"select 1")
+            gen = generators.execute(pgconn)
+            if waitfn.__name__ == "wait_select":
+                with pytest.raises(ValueError):
+                    await waitfn(gen, pgconn.socket)
+            else:
+                (res,) = await waitfn(gen, pgconn.socket)
+                assert res.status == ExecStatus.TUPLES_OK
+        finally:
+            pgconn.finish()
+    finally:
+        for f in files:
+            f.close()
+
+
+@pytest.mark.parametrize("waitfn", waitfns)
+async def test_wait_timeout_none_unsupported(waitfn):
+    waitfn = getattr(waiting, waitfn)
+
+    with pytest.raises(ValueError):
+        await waitfn(tgen(waiting.Wait.R), 1, None)
+
+
+def check_timing(request):
+    """Return true if the test run requires to check timing
+
+    Return false if the user has specified something like `pytest -m "not timing"
+
+    Allow to run the tests to verify if the responses are correct but ignoring
+    the timing, which on macOS and Windows in CI is very slow.
+    """
+    tokens = request.config.option.markexpr.split()
+    if "timing" not in tokens:
+        return True
+    if (idx := tokens.index("timing")) > 0 and tokens[idx - 1] == "not":
+        return False
+    return True
index 08e6627f9d3473e5dedfe99591ad7f364fc2c2bd..e26425bfb617c5b774319d590ad4bd96cc2f0665 100755 (executable)
@@ -59,6 +59,7 @@ ALL_INPUTS = """
     tests/test_prepared_async.py
     tests/test_tpc_async.py
     tests/test_transaction_async.py
+    tests/test_waiting_async.py
 """.split()
 
 PROJECT_DIR = Path(__file__).parent.parent