From 4bc12e6abda97386b2eb66aff21312d010e57e1d Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Sat, 23 Mar 2024 13:02:46 -0400 Subject: [PATCH] use asyncio.Runner for tests Backported to SQLAlchemy 2.0 an improvement to the test suite with regards to how asyncio related tests are run, now using the newer Python 3.11 ``asyncio.Runner`` or a backported equivalent, rather than relying on the previous implementation based on ``asyncio.get_running_loop()``. This should hopefully prevent issues with large suite runs on CPU loaded hardware where the event loop seems to become corrupted, leading to cascading failures. Fixes: #11187 Change-Id: I867b2478b9ba3a152fbfef380650eb987527ba46 --- doc/build/changelog/unreleased_20/11187.rst | 12 +++ lib/sqlalchemy/testing/asyncio.py | 17 ++-- lib/sqlalchemy/testing/plugin/pytestplugin.py | 6 ++ lib/sqlalchemy/util/_concurrency_py3k.py | 79 ++++++++++++------- lib/sqlalchemy/util/concurrency.py | 47 +++++++++-- 5 files changed, 121 insertions(+), 40 deletions(-) create mode 100644 doc/build/changelog/unreleased_20/11187.rst diff --git a/doc/build/changelog/unreleased_20/11187.rst b/doc/build/changelog/unreleased_20/11187.rst new file mode 100644 index 0000000000..be16ef301e --- /dev/null +++ b/doc/build/changelog/unreleased_20/11187.rst @@ -0,0 +1,12 @@ +.. change:: + :tags: bug, tests + :tickets: 11187 + + Backported to SQLAlchemy 2.0 an improvement to the test suite with regards + to how asyncio related tests are run, now using the newer Python 3.11 + ``asyncio.Runner`` or a backported equivalent, rather than relying on the + previous implementation based on ``asyncio.get_running_loop()``. This + should hopefully prevent issues with large suite runs on CPU loaded + hardware where the event loop seems to become corrupted, leading to + cascading failures. + diff --git a/lib/sqlalchemy/testing/asyncio.py b/lib/sqlalchemy/testing/asyncio.py index 17dc861c95..f71ca57fe5 100644 --- a/lib/sqlalchemy/testing/asyncio.py +++ b/lib/sqlalchemy/testing/asyncio.py @@ -24,16 +24,21 @@ from functools import wraps import inspect from . import config -from ..util.concurrency import _util_async_run -from ..util.concurrency import _util_async_run_coroutine_function +from ..util.concurrency import _AsyncUtil # may be set to False if the # --disable-asyncio flag is passed to the test runner. ENABLE_ASYNCIO = True +_async_util = _AsyncUtil() # it has lazy init so just always create one + + +def _shutdown(): + """called when the test finishes""" + _async_util.close() def _run_coroutine_function(fn, *args, **kwargs): - return _util_async_run_coroutine_function(fn, *args, **kwargs) + return _async_util.run(fn, *args, **kwargs) def _assume_async(fn, *args, **kwargs): @@ -50,7 +55,7 @@ def _assume_async(fn, *args, **kwargs): if not ENABLE_ASYNCIO: return fn(*args, **kwargs) - return _util_async_run(fn, *args, **kwargs) + return _async_util.run_in_greenlet(fn, *args, **kwargs) def _maybe_async_provisioning(fn, *args, **kwargs): @@ -69,7 +74,7 @@ def _maybe_async_provisioning(fn, *args, **kwargs): return fn(*args, **kwargs) if config.any_async: - return _util_async_run(fn, *args, **kwargs) + return _async_util.run_in_greenlet(fn, *args, **kwargs) else: return fn(*args, **kwargs) @@ -89,7 +94,7 @@ def _maybe_async(fn, *args, **kwargs): is_async = config._current.is_async if is_async: - return _util_async_run(fn, *args, **kwargs) + return _async_util.run_in_greenlet(fn, *args, **kwargs) else: return fn(*args, **kwargs) diff --git a/lib/sqlalchemy/testing/plugin/pytestplugin.py b/lib/sqlalchemy/testing/plugin/pytestplugin.py index b63e06359c..1a4d4bb30a 100644 --- a/lib/sqlalchemy/testing/plugin/pytestplugin.py +++ b/lib/sqlalchemy/testing/plugin/pytestplugin.py @@ -182,6 +182,12 @@ def pytest_sessionfinish(session): collect_types.dump_stats(session.config.option.dump_pyannotate) +def pytest_unconfigure(config): + from sqlalchemy.testing import asyncio + + asyncio._shutdown() + + def pytest_collection_finish(session): if session.config.option.dump_pyannotate: from pyannotate_runtime import collect_types diff --git a/lib/sqlalchemy/util/_concurrency_py3k.py b/lib/sqlalchemy/util/_concurrency_py3k.py index 42ceb8122e..defef1f6bf 100644 --- a/lib/sqlalchemy/util/_concurrency_py3k.py +++ b/lib/sqlalchemy/util/_concurrency_py3k.py @@ -19,10 +19,14 @@ from typing import Coroutine from typing import Optional from typing import TYPE_CHECKING from typing import TypeVar +from typing import Union from .langhelpers import memoized_property from .. import exc +from ..util import py311 +from ..util.typing import Literal from ..util.typing import Protocol +from ..util.typing import Self from ..util.typing import TypeGuard _T = TypeVar("_T") @@ -225,34 +229,6 @@ class AsyncAdaptedLock: self.mutex.release() -def _util_async_run_coroutine_function( - fn: Callable[..., Coroutine[Any, Any, Any]], *args: Any, **kwargs: Any -) -> Any: - """for test suite/ util only""" - - loop = get_event_loop() - if loop.is_running(): - raise Exception( - "for async run coroutine we expect that no greenlet or event " - "loop is running when we start out" - ) - return loop.run_until_complete(fn(*args, **kwargs)) - - -def _util_async_run( - fn: Callable[..., Coroutine[Any, Any, Any]], *args: Any, **kwargs: Any -) -> Any: - """for test suite/ util only""" - - loop = get_event_loop() - if not loop.is_running(): - return loop.run_until_complete(greenlet_spawn(fn, *args, **kwargs)) - else: - # allow for a wrapped test function to call another - assert isinstance(getcurrent(), _AsyncIoGreenlet) - return fn(*args, **kwargs) - - def get_event_loop() -> asyncio.AbstractEventLoop: """vendor asyncio.get_event_loop() for python 3.7 and above. @@ -265,3 +241,50 @@ def get_event_loop() -> asyncio.AbstractEventLoop: # avoid "During handling of the above exception, another exception..." pass return asyncio.get_event_loop_policy().get_event_loop() + + +if py311: + _Runner = asyncio.Runner +else: + + class _Runner: # type: ignore[no-redef] + """Runner implementation for test only""" + + _loop: Union[None, asyncio.AbstractEventLoop, Literal[False]] + + def __init__(self) -> None: + self._loop = None + + def __enter__(self) -> Self: + self._lazy_init() + return self + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + self.close() + + def close(self) -> None: + if self._loop: + try: + self._loop.run_until_complete( + self._loop.shutdown_asyncgens() + ) + finally: + self._loop.close() + self._loop = False + + def get_loop(self) -> asyncio.AbstractEventLoop: + """Return embedded event loop.""" + self._lazy_init() + assert self._loop + return self._loop + + def run(self, coro: Coroutine[Any, Any, _T]) -> _T: + self._lazy_init() + assert self._loop + return self._loop.run_until_complete(coro) + + def _lazy_init(self) -> None: + if self._loop is False: + raise RuntimeError("Runner is closed") + if self._loop is None: + self._loop = asyncio.new_event_loop() diff --git a/lib/sqlalchemy/util/concurrency.py b/lib/sqlalchemy/util/concurrency.py index 65a6205212..96ba416cab 100644 --- a/lib/sqlalchemy/util/concurrency.py +++ b/lib/sqlalchemy/util/concurrency.py @@ -10,6 +10,10 @@ from __future__ import annotations import asyncio # noqa import typing +from typing import Any +from typing import Callable +from typing import Coroutine +from typing import TypeVar have_greenlet = False greenlet_error = None @@ -26,12 +30,43 @@ else: from ._concurrency_py3k import greenlet_spawn as greenlet_spawn from ._concurrency_py3k import is_exit_exception as is_exit_exception from ._concurrency_py3k import AsyncAdaptedLock as AsyncAdaptedLock - from ._concurrency_py3k import ( - _util_async_run as _util_async_run, - ) # noqa: F401 - from ._concurrency_py3k import ( - _util_async_run_coroutine_function as _util_async_run_coroutine_function, # noqa: F401, E501 - ) + from ._concurrency_py3k import _Runner + +_T = TypeVar("_T") + + +class _AsyncUtil: + """Asyncio util for test suite/ util only""" + + def __init__(self) -> None: + if have_greenlet: + self.runner = _Runner() + + def run( + self, + fn: Callable[..., Coroutine[Any, Any, _T]], + *args: Any, + **kwargs: Any, + ) -> _T: + """Run coroutine on the loop""" + return self.runner.run(fn(*args, **kwargs)) + + def run_in_greenlet( + self, fn: Callable[..., _T], *args: Any, **kwargs: Any + ) -> _T: + """Run sync function in greenlet. Support nested calls""" + if have_greenlet: + if self.runner.get_loop().is_running(): + return fn(*args, **kwargs) + else: + return self.runner.run(greenlet_spawn(fn, *args, **kwargs)) + else: + return fn(*args, **kwargs) + + def close(self) -> None: + if have_greenlet: + self.runner.close() + if not typing.TYPE_CHECKING and not have_greenlet: -- 2.47.2