From: Daniele Varrazzo Date: Sun, 14 Sep 2025 15:46:34 +0000 (+0200) Subject: test: improve wait functions tests X-Git-Tag: 3.2.11~9^2~3 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=c7a4b20d76aabb19e1b3c41815ccae89dcc29e9d;p=thirdparty%2Fpsycopg.git test: improve wait functions tests Check for consistent behaviour and expected result. The test currently pass on the Python module and highlight shortcoming in the C function. Complete wait async test, generate sync version automatically. See #1141 --- diff --git a/tests/test_waiting.py b/tests/test_waiting.py index 2c40c4767..78eae8652 100644 --- a/tests/test_waiting.py +++ b/tests/test_waiting.py @@ -1,8 +1,10 @@ +# WARNING: this file is auto-generated by 'async_to_sync.py' +# from the original file 'test_waiting_async.py' +# DO NOT CHANGE! Change the original file instead. import sys import time import select # noqa: used in pytest.mark.skipif import socket -from threading import Event import pytest @@ -11,7 +13,7 @@ from psycopg import generators, waiting from psycopg.pq import ConnStatus, ExecStatus from psycopg.conninfo import make_conninfo -from .acompat import gather, spawn +from .acompat import Event, gather, sleep, spawn skip_if_not_linux = pytest.mark.skipif( not sys.platform.startswith("linux"), reason="non-Linux platform" @@ -31,14 +33,19 @@ waitfns = [ ] events = ["R", "W", "RW"] -intervals = [pytest.param({}, id="blank")] -intervals += [pytest.param({"interval": x}, id=str(x)) for x in [0, 0.2, 10]] +intervals = [0, 0.2, 2] -@pytest.mark.parametrize("timeout", intervals) -def test_wait_conn(dsn, timeout): +def tgen(wait): + """A generator waiting for a specific event and returning what waited on.""" + r = yield wait + return r + + +@pytest.mark.parametrize("interval", intervals) +def test_wait_conn(dsn, interval): gen = generators.connect(dsn) - conn = waiting.wait_conn(gen, **timeout) + conn = waiting.wait_conn(gen, interval) assert conn.status == ConnStatus.OK @@ -50,66 +57,194 @@ def test_wait_conn_bad(dsn): @pytest.mark.slow +@pytest.mark.skipif("sys.platform != 'linux'") +@pytest.mark.parametrize("interval", [i for i in intervals if i > 0]) +@pytest.mark.parametrize("ready", ["R", "NONE"]) +@pytest.mark.parametrize("event", ["R", "RW"]) @pytest.mark.parametrize("waitfn", waitfns) -@skip_if_not_linux -def test_wait_r(waitfn): +def test_wait_r(waitfn, event, ready, interval, request): + # Test that wait functions handle waiting and returning state correctly + # This test doesn't work on macOS for some internal race condition betwwn + # listen/connect/accept. waitfn = getattr(waiting, waitfn) + wait = getattr(waiting.Wait, event) + ready = getattr(waiting.Ready, ready) + delay = interval / 2 port = None ev = Event() def writer(): + # Wake up the socket, or let it time out, according to the expected `ready`. ev.wait() assert port - time.sleep(0.2) - with socket.create_connection(("127.0.0.1", port)): - pass + sleep(delay) + if ready == waiting.Ready.R: + with socket.create_connection(("127.0.0.1", port)): + pass - t = spawn(writer) + tasks = [spawn(writer)] - def gen(): - r = yield waiting.Wait.R - return r + try: + with socket.socket() as s: + # Make a listening socket + s.bind(("127.0.0.1", 0)) + port = s.getsockname()[1] + s.listen(10) + s.setblocking(False) + # Let the writer start + ev.set() + # Wait for socket ready to read or timing out + t0 = time.time() + r = waitfn(tgen(wait), s.fileno(), interval) + dt = time.time() - t0 + # Check timing and received waiting state + assert r == ready + if check_timing(request): + exptime = {waiting.Ready.R: delay, waiting.Ready.NONE: interval}[ready] + assert exptime <= dt < exptime * 1.2 + finally: + gather(*tasks) - with socket.socket() as s: - s.bind(("", 0)) - port = s.getsockname()[1] - s.listen() + +@pytest.mark.slow +@pytest.mark.skipif("sys.platform == 'linux'") +@pytest.mark.parametrize("interval", [2]) +@pytest.mark.parametrize("ready", ["R", "NONE"]) +@pytest.mark.parametrize("waitfn", waitfns) +def test_wait_r_no_linux(waitfn, ready, interval, request): + # A version of test_wait_r that works on macOS too, but doesn't allow to + # test for the RW wait (because it seems that the sockets returned by + # socketpair() is immediately w-ready, including the r one. + waitfn = getattr(waiting, waitfn) + wait = waiting.Wait.R + ready = getattr(waiting.Ready, ready) + delay = interval / 2 + + ev = Event() + + def writer(): + # Wake up the socket, or let it time out, according to the expected `ready`. + ev.wait() + sleep(delay) + if ready == waiting.Ready.R: + for att in range(10): + try: + ws.sendall(b"hi") + ws.close() + except Exception as ex: + the_ex = ex + sleep(0.1) + else: + break + else: + pytest.fail( + f"failed after many attempts. Socket: {ws}, Last error: {the_ex}" + ) + + tasks = [spawn(writer)] + try: + rs, ws = socket.socketpair() + rs.setblocking(False) + ws.setblocking(False) + # Let the writer start ev.set() + # Wait for socket ready to read or timing out t0 = time.time() - r = waitfn(gen(), s.fileno(), 0.3) - tf = time.time() - assert r == waiting.Ready.R - assert 0.2 <= tf - t0 < 0.3 - - gather(t) + r = waitfn(tgen(wait), rs.fileno(), interval) + dt = time.time() - t0 + # Check timing and received waiting state + assert r == ready + if check_timing(request): + exptime = {waiting.Ready.R: delay, waiting.Ready.NONE: interval}[ready] + assert exptime <= dt < exptime * 1.2 + finally: + gather(*tasks) + rs.close() + ws.close() +@pytest.mark.parametrize("ready", ["R", "NONE"]) +@pytest.mark.parametrize("event", ["R", "RW"]) @pytest.mark.parametrize("waitfn", waitfns) -@pytest.mark.parametrize("event", events) -@skip_if_not_linux -def test_wait_ready(waitfn, event): - wait = getattr(waiting.Wait, event) - ready = getattr(waiting.Ready, event) +def test_wait_r_nowait(waitfn, event, ready, request): + # Test that wait functions handle a poll when called with no timeout waitfn = getattr(waiting, waitfn) + wait = getattr(waiting.Wait, event) + ready = getattr(waiting.Ready, ready) + + port = None + ev1 = Event() + ev2 = Event() + ev3 = Event() + + def writer(): + ev1.wait() + assert port + if ready == waiting.Ready.R: + with socket.create_connection(("127.0.0.1", port)): + ev2.set() + else: + ev2.set() + + def unblocker(): + # If test doesn't pass, wake up the socket again to avoid hanging forever + if not ev3.wait(0.5): + assert port + with socket.create_connection(("127.0.0.1", port)): + pass + + t1 = spawn(writer) + t2 = spawn(unblocker) + try: + with socket.socket() as s: + s.bind(("127.0.0.1", 0)) + port = s.getsockname()[1] + s.listen(10) + s.setblocking(False) + ev1.set() + ev2.wait() + t0 = time.time() + r = waitfn(tgen(wait), s.fileno()) + dt = time.time() - t0 + ev3.set() # unblock the unblocker + if check_timing(request): + assert dt < 0.1 + assert r == ready + finally: + # await gather(t1) + gather(t1, t2) + - def gen(): - r = yield wait - return r +@pytest.mark.slow +@pytest.mark.parametrize("event", ["W", "RW"]) +@pytest.mark.parametrize("waitfn", waitfns) +def test_wait_w(waitfn, event, request): + # Test that wait functions handle waiting and returning state correctly + waitfn = getattr(waiting, waitfn) + wait = getattr(waiting.Wait, event) - with socket.socket() as s: - r = waitfn(gen(), s.fileno()) - assert r & ready + rs, ws = socket.socketpair() # the w socket is already ready for writing + rs.setblocking(False) + ws.setblocking(False) + with rs, ws: + t0 = time.time() + r = waitfn(tgen(wait), ws.fileno(), 0.5) + dt = time.time() - t0 + # Check timing and received waiting state + assert r == waiting.Ready.W + if check_timing(request): + assert dt < 0.1 @pytest.mark.parametrize("waitfn", waitfns) -@pytest.mark.parametrize("timeout", intervals) -def test_wait(pgconn, waitfn, timeout): +@pytest.mark.parametrize("interval", intervals) +def test_wait(pgconn, waitfn, interval): waitfn = getattr(waiting, waitfn) pgconn.send_query(b"select 1") gen = generators.execute(pgconn) - (res,) = waitfn(gen, pgconn.socket, **timeout) + (res,) = waitfn(gen, pgconn.socket, interval) assert res.status == ExecStatus.TUPLES_OK @@ -156,9 +291,9 @@ def test_wait_timeout(pgconn, waitfn): @pytest.mark.skipif( "sys.platform == 'win32'", reason="win32 works ok, but FDs are mysterious" ) -@pytest.mark.parametrize("fname", waitfns) -def test_wait_large_fd(dsn, fname): - waitfn = getattr(waiting, fname) +@pytest.mark.parametrize("waitfn", waitfns) +def test_wait_large_fd(dsn, waitfn): + waitfn = getattr(waiting, waitfn) files = [] try: @@ -173,7 +308,7 @@ def test_wait_large_fd(dsn, fname): assert pgconn.socket > 1024 pgconn.send_query(b"select 1") gen = generators.execute(pgconn) - if fname == "wait_select": + if waitfn.__name__ == "wait_select": with pytest.raises(ValueError): waitfn(gen, pgconn.socket) else: @@ -186,63 +321,25 @@ def test_wait_large_fd(dsn, fname): f.close() -@pytest.mark.parametrize("timeout", intervals) -@pytest.mark.anyio -async def test_wait_conn_async(dsn, timeout): - gen = generators.connect(dsn) - conn = await waiting.wait_conn_async(gen, **timeout) - assert conn.status == ConnStatus.OK - - -@pytest.mark.anyio -@pytest.mark.crdb("skip", reason="can connect to any db name") -async def test_wait_conn_async_bad(dsn): - gen = generators.connect(make_conninfo(dsn, dbname="nosuchdb")) - with pytest.raises(psycopg.OperationalError): - await waiting.wait_conn_async(gen) - - -@pytest.mark.anyio -@pytest.mark.parametrize("event", events) -@skip_if_not_linux -async def test_wait_ready_async(event): - wait = getattr(waiting.Wait, event) - ready = getattr(waiting.Ready, event) - - def gen(): - r = yield wait - return r - - with socket.socket() as s: - r = await waiting.wait_async(gen(), s.fileno()) - assert r & ready - - -@pytest.mark.anyio -async def test_wait_async(pgconn): - pgconn.send_query(b"select 1") - gen = generators.execute(pgconn) - (res,) = await waiting.wait_async(gen, pgconn.socket) - assert res.status == ExecStatus.TUPLES_OK - - -@pytest.mark.anyio -async def test_wait_async_bad(pgconn): - pgconn.send_query(b"select 1") - gen = generators.execute(pgconn) - socket = pgconn.socket - pgconn.finish() - with pytest.raises(psycopg.OperationalError): - await waiting.wait_async(gen, socket) - - @pytest.mark.parametrize("waitfn", waitfns) def test_wait_timeout_none_unsupported(waitfn): waitfn = getattr(waiting, waitfn) - def gen(): - r = yield waiting.Wait.R - return r - with pytest.raises(ValueError): - waitfn(gen(), 1, None) + waitfn(tgen(waiting.Wait.R), 1, None) + + +def check_timing(request): + """Return true if the test run requires to check timing + + Return false if the user has specified something like `pytest -m "not timing" + + Allow to run the tests to verify if the responses are correct but ignoring + the timing, which on macOS and Windows in CI is very slow. + """ + tokens = request.config.option.markexpr.split() + if "timing" not in tokens: + return True + if (idx := tokens.index("timing")) > 0 and tokens[idx - 1] == "not": + return False + return True diff --git a/tests/test_waiting_async.py b/tests/test_waiting_async.py new file mode 100644 index 000000000..212c9e12d --- /dev/null +++ b/tests/test_waiting_async.py @@ -0,0 +1,353 @@ +import sys +import time +import select # noqa: used in pytest.mark.skipif +import socket + +import pytest + +import psycopg +from psycopg import generators, waiting +from psycopg.pq import ConnStatus, ExecStatus +from psycopg.conninfo import make_conninfo + +from .acompat import AEvent, asleep, gather, spawn + +skip_if_not_linux = pytest.mark.skipif( + not sys.platform.startswith("linux"), reason="non-Linux platform" +) + +if True: # ASYNC + pytestmark = [pytest.mark.anyio] + waitfns = [ + "wait_async", + ] +else: + waitfns = [ + "wait", + "wait_selector", + pytest.param( + "wait_select", marks=pytest.mark.skipif("not hasattr(select, 'select')") + ), + pytest.param( + "wait_epoll", marks=pytest.mark.skipif("not hasattr(select, 'epoll')") + ), + pytest.param( + "wait_poll", marks=pytest.mark.skipif("not hasattr(select, 'poll')") + ), + pytest.param( + "wait_c", marks=pytest.mark.skipif("not psycopg._cmodule._psycopg") + ), + ] + + +events = ["R", "W", "RW"] +intervals = [0, 0.2, 2] + + +def tgen(wait): + """A generator waiting for a specific event and returning what waited on.""" + r = yield wait + return r + + +@pytest.mark.parametrize("interval", intervals) +async def test_wait_conn(dsn, interval): + gen = generators.connect(dsn) + conn = await waiting.wait_conn_async(gen, interval) + assert conn.status == ConnStatus.OK + + +@pytest.mark.crdb("skip", reason="can connect to any db name") +async def test_wait_conn_bad(dsn): + gen = generators.connect(make_conninfo(dsn, dbname="nosuchdb")) + with pytest.raises(psycopg.OperationalError): + await waiting.wait_conn_async(gen) + + +@pytest.mark.slow +@pytest.mark.skipif("sys.platform != 'linux'") +@pytest.mark.parametrize("interval", [i for i in intervals if i > 0]) +@pytest.mark.parametrize("ready", ["R", "NONE"]) +@pytest.mark.parametrize("event", ["R", "RW"]) +@pytest.mark.parametrize("waitfn", waitfns) +async def test_wait_r(waitfn, event, ready, interval, request): + # Test that wait functions handle waiting and returning state correctly + # This test doesn't work on macOS for some internal race condition betwwn + # listen/connect/accept. + waitfn = getattr(waiting, waitfn) + wait = getattr(waiting.Wait, event) + ready = getattr(waiting.Ready, ready) + delay = interval / 2 + + port = None + ev = AEvent() + + async def writer(): + # Wake up the socket, or let it time out, according to the expected `ready`. + await ev.wait() + assert port + await asleep(delay) + if ready == waiting.Ready.R: + with socket.create_connection(("127.0.0.1", port)): + pass + + tasks = [spawn(writer)] + + try: + with socket.socket() as s: + # Make a listening socket + s.bind(("127.0.0.1", 0)) + port = s.getsockname()[1] + s.listen(10) + s.setblocking(False) + # Let the writer start + ev.set() + # Wait for socket ready to read or timing out + t0 = time.time() + r = await waitfn(tgen(wait), s.fileno(), interval) + dt = time.time() - t0 + # Check timing and received waiting state + assert r == ready + if check_timing(request): + exptime = {waiting.Ready.R: delay, waiting.Ready.NONE: interval}[ready] + assert exptime <= dt < (exptime * 1.2) + finally: + await gather(*tasks) + + +@pytest.mark.slow +@pytest.mark.skipif("sys.platform == 'linux'") +@pytest.mark.parametrize("interval", [2]) +@pytest.mark.parametrize("ready", ["R", "NONE"]) +@pytest.mark.parametrize("waitfn", waitfns) +async def test_wait_r_no_linux(waitfn, ready, interval, request): + # A version of test_wait_r that works on macOS too, but doesn't allow to + # test for the RW wait (because it seems that the sockets returned by + # socketpair() is immediately w-ready, including the r one. + waitfn = getattr(waiting, waitfn) + wait = waiting.Wait.R + ready = getattr(waiting.Ready, ready) + delay = interval / 2 + + ev = AEvent() + + async def writer(): + # Wake up the socket, or let it time out, according to the expected `ready`. + await ev.wait() + await asleep(delay) + if ready == waiting.Ready.R: + for att in range(10): + try: + ws.sendall(b"hi") + ws.close() + except Exception as ex: + the_ex = ex + await asleep(0.1) + else: + break + else: + pytest.fail( + f"failed after many attempts. Socket: {ws}, Last error: {the_ex}" + ) + + tasks = [spawn(writer)] + try: + rs, ws = socket.socketpair() + rs.setblocking(False) + ws.setblocking(False) + # Let the writer start + ev.set() + # Wait for socket ready to read or timing out + t0 = time.time() + r = await waitfn(tgen(wait), rs.fileno(), interval) + dt = time.time() - t0 + # Check timing and received waiting state + assert r == ready + if check_timing(request): + exptime = {waiting.Ready.R: delay, waiting.Ready.NONE: interval}[ready] + assert exptime <= dt < (exptime * 1.2) + finally: + await gather(*tasks) + rs.close() + ws.close() + + +@pytest.mark.parametrize("ready", ["R", "NONE"]) +@pytest.mark.parametrize("event", ["R", "RW"]) +@pytest.mark.parametrize("waitfn", waitfns) +async def test_wait_r_nowait(waitfn, event, ready, request): + # Test that wait functions handle a poll when called with no timeout + waitfn = getattr(waiting, waitfn) + wait = getattr(waiting.Wait, event) + ready = getattr(waiting.Ready, ready) + + port = None + ev1 = AEvent() + ev2 = AEvent() + ev3 = AEvent() + + async def writer(): + await ev1.wait() + assert port + if ready == waiting.Ready.R: + with socket.create_connection(("127.0.0.1", port)): + ev2.set() + else: + ev2.set() + + async def unblocker(): + # If test doesn't pass, wake up the socket again to avoid hanging forever + if not await ev3.wait_timeout(0.5): + assert port + with socket.create_connection(("127.0.0.1", port)): + pass + + t1 = spawn(writer) + t2 = spawn(unblocker) + try: + with socket.socket() as s: + s.bind(("127.0.0.1", 0)) + port = s.getsockname()[1] + s.listen(10) + s.setblocking(False) + ev1.set() + await ev2.wait() + t0 = time.time() + r = await waitfn(tgen(wait), s.fileno()) + dt = time.time() - t0 + ev3.set() # unblock the unblocker + if check_timing(request): + assert dt < 0.1 + assert r == ready + finally: + # await gather(t1) + await gather(t1, t2) + + +@pytest.mark.slow +@pytest.mark.parametrize("event", ["W", "RW"]) +@pytest.mark.parametrize("waitfn", waitfns) +async def test_wait_w(waitfn, event, request): + # Test that wait functions handle waiting and returning state correctly + waitfn = getattr(waiting, waitfn) + wait = getattr(waiting.Wait, event) + + rs, ws = socket.socketpair() # the w socket is already ready for writing + rs.setblocking(False) + ws.setblocking(False) + with rs, ws: + t0 = time.time() + r = await waitfn(tgen(wait), ws.fileno(), 0.5) + dt = time.time() - t0 + # Check timing and received waiting state + assert r == waiting.Ready.W + if check_timing(request): + assert dt < 0.1 + + +@pytest.mark.parametrize("waitfn", waitfns) +@pytest.mark.parametrize("interval", intervals) +async def test_wait(pgconn, waitfn, interval): + waitfn = getattr(waiting, waitfn) + + pgconn.send_query(b"select 1") + gen = generators.execute(pgconn) + (res,) = await waitfn(gen, pgconn.socket, interval) + assert res.status == ExecStatus.TUPLES_OK + + +@pytest.mark.parametrize("waitfn", waitfns) +async def test_wait_bad(pgconn, waitfn): + waitfn = getattr(waiting, waitfn) + + pgconn.send_query(b"select 1") + gen = generators.execute(pgconn) + pgconn.finish() + with pytest.raises(psycopg.OperationalError): + await waitfn(gen, pgconn.socket) + + +@pytest.mark.slow +@pytest.mark.timing +@pytest.mark.parametrize("waitfn", waitfns) +async def test_wait_timeout(pgconn, waitfn): + waitfn = getattr(waiting, waitfn) + + pgconn.send_query(b"select pg_sleep(0.5)") + gen = generators.execute(pgconn) + + ts = [time.time()] + + def gen_wrapper(): + try: + for x in gen: + res = yield x + ts.append(time.time()) + gen.send(res) + except StopIteration as ex: + return ex.value + + (res,) = await waitfn(gen_wrapper(), pgconn.socket, interval=0.1) + assert res.status == ExecStatus.TUPLES_OK + ds = [t1 - t0 for t0, t1 in zip(ts[:-1], ts[1:])] + assert len(ds) >= 5 + for d in ds[:5]: + assert d == pytest.approx(0.1, 0.05) + + +@pytest.mark.slow +@pytest.mark.skipif( + "sys.platform == 'win32'", reason="win32 works ok, but FDs are mysterious" +) +@pytest.mark.parametrize("waitfn", waitfns) +async def test_wait_large_fd(dsn, waitfn): + waitfn = getattr(waiting, waitfn) + + files = [] + try: + try: + for i in range(1100): + files.append(open(__file__)) + except OSError: + pytest.skip("can't open the number of files needed for the test") + + pgconn = psycopg.pq.PGconn.connect(dsn.encode()) + try: + assert pgconn.socket > 1024 + pgconn.send_query(b"select 1") + gen = generators.execute(pgconn) + if waitfn.__name__ == "wait_select": + with pytest.raises(ValueError): + await waitfn(gen, pgconn.socket) + else: + (res,) = await waitfn(gen, pgconn.socket) + assert res.status == ExecStatus.TUPLES_OK + finally: + pgconn.finish() + finally: + for f in files: + f.close() + + +@pytest.mark.parametrize("waitfn", waitfns) +async def test_wait_timeout_none_unsupported(waitfn): + waitfn = getattr(waiting, waitfn) + + with pytest.raises(ValueError): + await waitfn(tgen(waiting.Wait.R), 1, None) + + +def check_timing(request): + """Return true if the test run requires to check timing + + Return false if the user has specified something like `pytest -m "not timing" + + Allow to run the tests to verify if the responses are correct but ignoring + the timing, which on macOS and Windows in CI is very slow. + """ + tokens = request.config.option.markexpr.split() + if "timing" not in tokens: + return True + if (idx := tokens.index("timing")) > 0 and tokens[idx - 1] == "not": + return False + return True diff --git a/tools/async_to_sync.py b/tools/async_to_sync.py index 08e6627f9..e26425bfb 100755 --- a/tools/async_to_sync.py +++ b/tools/async_to_sync.py @@ -59,6 +59,7 @@ ALL_INPUTS = """ tests/test_prepared_async.py tests/test_tpc_async.py tests/test_transaction_async.py + tests/test_waiting_async.py """.split() PROJECT_DIR = Path(__file__).parent.parent