From: Daniele Varrazzo Date: Mon, 11 Sep 2023 17:50:44 +0000 (+0100) Subject: refactor(tests): generate test_sched from async counterpart X-Git-Tag: pool-3.2.0~12^2~36 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=8412452174b6fec44bb788a373b17300143e4ca5;p=thirdparty%2Fpsycopg.git refactor(tests): generate test_sched from async counterpart --- diff --git a/tests/pool/test_sched.py b/tests/pool/test_sched.py index b58be2f81..2639dc1f8 100644 --- a/tests/pool/test_sched.py +++ b/tests/pool/test_sched.py @@ -1,10 +1,15 @@ +# WARNING: this file is auto-generated by 'async_to_sync.py' +# from the original file 'test_sched_async.py' +# DO NOT CHANGE! Change the original file instead. import logging -from time import time, sleep +from time import time from functools import partial -from threading import Thread +from contextlib import contextmanager import pytest +from ..utils import spawn, gather, sleep + try: from psycopg_pool.sched import Scheduler except ImportError: @@ -15,7 +20,6 @@ pytestmark = [pytest.mark.timing] @pytest.mark.slow -@pytest.mark.timing def test_sched(): s = Scheduler() results = [] @@ -37,11 +41,9 @@ def test_sched(): @pytest.mark.slow -@pytest.mark.timing -def test_sched_thread(): +def test_sched_task(): s = Scheduler() - t = Thread(target=s.run, daemon=True) - t.start() + t = spawn(s.run) results = [] @@ -54,7 +56,7 @@ def test_sched_thread(): s.enter(0.3, None) s.enter(0.2, partial(worker, 2)) - t.join() + gather(t) t1 = time() assert t1 - t0 == pytest.approx(0.3, 0.2) @@ -66,12 +68,10 @@ def test_sched_thread(): @pytest.mark.slow -@pytest.mark.timing def test_sched_error(caplog): caplog.set_level(logging.WARNING, logger="psycopg") s = Scheduler() - t = Thread(target=s.run, daemon=True) - t.start() + t = spawn(s.run) results = [] @@ -87,7 +87,7 @@ def test_sched_error(caplog): s.enter(0.3, partial(worker, 2)) s.enter(0.2, error) - t.join() + gather(t) t1 = time() assert t1 - t0 == pytest.approx(0.4, 0.1) @@ -105,25 +105,14 @@ def test_sched_error(caplog): def test_empty_queue_timeout(): s = Scheduler() - t0 = time() - times = [] + with timed_wait(s) as times: + s.EMPTY_QUEUE_TIMEOUT = 0.2 - wait_orig = s._event.wait - - def wait_logging(timeout=None): - rv = wait_orig(timeout) - times.append(time() - t0) - return rv + t = spawn(s.run) + sleep(0.5) + s.enter(0.5, None) + gather(t) - setattr(s._event, "wait", wait_logging) - s.EMPTY_QUEUE_TIMEOUT = 0.2 - - t = Thread(target=s.run) - t.start() - sleep(0.5) - s.enter(0.5, None) - t.join() - times.append(time() - t0) for got, want in zip(times, [0.2, 0.4, 0.5, 1.0]): assert got == pytest.approx(want, 0.2), times @@ -132,26 +121,47 @@ def test_empty_queue_timeout(): def test_first_task_rescheduling(): s = Scheduler() + with timed_wait(s) as times: + s.EMPTY_QUEUE_TIMEOUT = 0.1 + + s.enter(0.4, noop) + t = spawn(s.run) + s.enter(0.6, None) # this task doesn't trigger a reschedule + sleep(0.1) + s.enter(0.1, noop) # this triggers a reschedule + gather(t) + + for got, want in zip(times, [0.1, 0.2, 0.4, 0.6, 0.6]): + assert got == pytest.approx(want, 0.2), times + + +@contextmanager +def timed_wait(s): + """ + Hack the scheduler's Event.wait() function in order to log waited time. + + The context is a list where the times are accumulated. + """ t0 = time() times = [] wait_orig = s._event.wait def wait_logging(timeout=None): - rv = wait_orig(timeout) - times.append(time() - t0) + args = (timeout,) + + try: + rv = wait_orig(*args) + finally: + times.append(time() - t0) return rv setattr(s._event, "wait", wait_logging) - s.EMPTY_QUEUE_TIMEOUT = 0.1 - - s.enter(0.4, lambda: None) - t = Thread(target=s.run) - t.start() - s.enter(0.6, None) # this task doesn't trigger a reschedule - sleep(0.1) - s.enter(0.1, lambda: None) # this triggers a reschedule - t.join() + + yield times + times.append(time() - t0) - for got, want in zip(times, [0.1, 0.2, 0.4, 0.6, 0.6]): - assert got == pytest.approx(want, 0.2), times + + +def noop(): + pass diff --git a/tests/pool/test_sched_async.py b/tests/pool/test_sched_async.py index 259c66383..23c7cefbd 100644 --- a/tests/pool/test_sched_async.py +++ b/tests/pool/test_sched_async.py @@ -1,22 +1,24 @@ -import asyncio import logging from time import time -from asyncio import create_task from functools import partial +from contextlib import asynccontextmanager import pytest +from ..utils import spawn, gather, asleep + try: from psycopg_pool.sched_async import AsyncScheduler except ImportError: # Tests should have been skipped if the package is not available pass -pytestmark = [pytest.mark.anyio, pytest.mark.timing] +pytestmark = [pytest.mark.timing] +if True: # ASYNC: + pytestmark.append(pytest.mark.anyio) @pytest.mark.slow -@pytest.mark.timing async def test_sched(): s = AsyncScheduler() results = [] @@ -38,10 +40,9 @@ async def test_sched(): @pytest.mark.slow -@pytest.mark.timing async def test_sched_task(): s = AsyncScheduler() - t = create_task(s.run()) + t = spawn(s.run) results = [] @@ -54,7 +55,7 @@ async def test_sched_task(): await s.enter(0.3, None) await s.enter(0.2, partial(worker, 2)) - await asyncio.gather(t) + await gather(t) t1 = time() assert t1 - t0 == pytest.approx(0.3, 0.2) @@ -66,11 +67,10 @@ async def test_sched_task(): @pytest.mark.slow -@pytest.mark.timing async def test_sched_error(caplog): caplog.set_level(logging.WARNING, logger="psycopg") s = AsyncScheduler() - t = create_task(s.run()) + t = spawn(s.run) results = [] @@ -86,7 +86,7 @@ async def test_sched_error(caplog): await s.enter(0.3, partial(worker, 2)) await s.enter(0.2, error) - await asyncio.gather(t) + await gather(t) t1 = time() assert t1 - t0 == pytest.approx(0.4, 0.1) @@ -104,26 +104,14 @@ async def test_sched_error(caplog): async def test_empty_queue_timeout(): s = AsyncScheduler() - t0 = time() - times = [] + async with timed_wait(s) as times: + s.EMPTY_QUEUE_TIMEOUT = 0.2 - wait_orig = s._event.wait + t = spawn(s.run) + await asleep(0.5) + await s.enter(0.5, None) + await gather(t) - async def wait_logging(): - try: - rv = await wait_orig() - finally: - times.append(time() - t0) - return rv - - setattr(s._event, "wait", wait_logging) - s.EMPTY_QUEUE_TIMEOUT = 0.2 - - t = create_task(s.run()) - await asyncio.sleep(0.5) - await s.enter(0.5, None) - await asyncio.gather(t) - times.append(time() - t0) for got, want in zip(times, [0.2, 0.4, 0.5, 1.0]): assert got == pytest.approx(want, 0.2), times @@ -132,30 +120,50 @@ async def test_empty_queue_timeout(): async def test_first_task_rescheduling(): s = AsyncScheduler() + async with timed_wait(s) as times: + s.EMPTY_QUEUE_TIMEOUT = 0.1 + + await s.enter(0.4, noop) + t = spawn(s.run) + await s.enter(0.6, None) # this task doesn't trigger a reschedule + await asleep(0.1) + await s.enter(0.1, noop) # this triggers a reschedule + await gather(t) + + for got, want in zip(times, [0.1, 0.2, 0.4, 0.6, 0.6]): + assert got == pytest.approx(want, 0.2), times + + +@asynccontextmanager +async def timed_wait(s): + """ + Hack the scheduler's Event.wait() function in order to log waited time. + + The context is a list where the times are accumulated. + """ t0 = time() times = [] wait_orig = s._event.wait - async def wait_logging(): + async def wait_logging(timeout=None): + if True: # ASYNC + args = () + else: + args = (timeout,) + try: - rv = await wait_orig() + rv = await wait_orig(*args) finally: times.append(time() - t0) return rv setattr(s._event, "wait", wait_logging) - s.EMPTY_QUEUE_TIMEOUT = 0.1 - async def noop(): - pass + yield times - await s.enter(0.4, noop) - t = create_task(s.run()) - await s.enter(0.6, None) # this task doesn't trigger a reschedule - await asyncio.sleep(0.1) - await s.enter(0.1, noop) # this triggers a reschedule - await asyncio.gather(t) times.append(time() - t0) - for got, want in zip(times, [0.1, 0.2, 0.4, 0.6, 0.6]): - assert got == pytest.approx(want, 0.2), times + + +async def noop(): + pass diff --git a/tests/utils.py b/tests/utils.py index 64d72fe7f..5b9b73c1a 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,10 +1,14 @@ import gc import re import sys +import asyncio +import inspect import operator +from time import sleep as sleep # noqa: F401 -- re-export from typing import Callable, Optional, Tuple +from threading import Thread from contextlib import contextmanager, asynccontextmanager -from contextlib import closing as closing # noqa: F401 - re-export +from contextlib import closing as closing # noqa: F401 -- re-export import pytest @@ -226,3 +230,33 @@ def raiseif(cond, *args, **kwargs): with pytest.raises(*args, **kwargs) as ex: yield ex return + + +def spawn(f): + """ + Equivalent to asyncio.create_task or creating and running a Thread. + """ + if inspect.iscoroutinefunction(f): + return asyncio.create_task(f()) + else: + t = Thread(target=f, daemon=True) + t.start() + return t + + +def gather(*ts): + """ + Equivalent to asyncio.gather or Thread.join() + """ + if ts and inspect.isawaitable(ts[0]): + return asyncio.gather(*ts) + else: + for t in ts: + t.join() + + +def asleep(s): + """ + Equivalent to asyncio.sleep(), converted to time.sleep() by async_to_sync. + """ + return asyncio.sleep(s) diff --git a/tools/async_to_sync.py b/tools/async_to_sync.py index ab2c4fb8e..40ed2ccc6 100755 --- a/tools/async_to_sync.py +++ b/tools/async_to_sync.py @@ -179,12 +179,14 @@ class RenameAsyncToSync(ast.NodeTransformer): "aconn_cls": "conn_cls", "alist": "list", "anext": "next", + "asleep": "sleep", "apipeline": "pipeline", "asynccontextmanager": "contextmanager", "connection_async": "connection", "cursor_async": "cursor", "ensure_table_async": "ensure_table", "find_insert_problem_async": "find_insert_problem", + "psycopg_pool.sched_async": "psycopg_pool.sched", "wait_async": "wait", "wait_conn_async": "wait_conn", } diff --git a/tools/convert_async_to_sync.sh b/tools/convert_async_to_sync.sh index bea2963a8..983d9c8e4 100755 --- a/tools/convert_async_to_sync.sh +++ b/tools/convert_async_to_sync.sh @@ -21,6 +21,7 @@ for async in \ psycopg/psycopg/connection_async.py \ psycopg/psycopg/cursor_async.py \ psycopg_pool/psycopg_pool/sched_async.py \ + tests/pool/test_sched_async.py \ tests/test_client_cursor_async.py \ tests/test_connection_async.py \ tests/test_copy_async.py \