--- /dev/null
+.. change::
+ :tags: bug, tests
+ :tickets: 11187
+
+ Backported to SQLAlchemy 2.0 an improvement to the test suite with regards
+ to how asyncio related tests are run, now using the newer Python 3.11
+ ``asyncio.Runner`` or a backported equivalent, rather than relying on the
+ previous implementation based on ``asyncio.get_running_loop()``. This
+ should hopefully prevent issues with large suite runs on CPU loaded
+ hardware where the event loop seems to become corrupted, leading to
+ cascading failures.
+
import inspect
from . import config
-from ..util.concurrency import _util_async_run
-from ..util.concurrency import _util_async_run_coroutine_function
+from ..util.concurrency import _AsyncUtil
# may be set to False if the
# --disable-asyncio flag is passed to the test runner.
ENABLE_ASYNCIO = True
+_async_util = _AsyncUtil() # it has lazy init so just always create one
+
+
+def _shutdown():
+ """called when the test finishes"""
+ _async_util.close()
def _run_coroutine_function(fn, *args, **kwargs):
- return _util_async_run_coroutine_function(fn, *args, **kwargs)
+ return _async_util.run(fn, *args, **kwargs)
def _assume_async(fn, *args, **kwargs):
if not ENABLE_ASYNCIO:
return fn(*args, **kwargs)
- return _util_async_run(fn, *args, **kwargs)
+ return _async_util.run_in_greenlet(fn, *args, **kwargs)
def _maybe_async_provisioning(fn, *args, **kwargs):
return fn(*args, **kwargs)
if config.any_async:
- return _util_async_run(fn, *args, **kwargs)
+ return _async_util.run_in_greenlet(fn, *args, **kwargs)
else:
return fn(*args, **kwargs)
is_async = config._current.is_async
if is_async:
- return _util_async_run(fn, *args, **kwargs)
+ return _async_util.run_in_greenlet(fn, *args, **kwargs)
else:
return fn(*args, **kwargs)
collect_types.dump_stats(session.config.option.dump_pyannotate)
+def pytest_unconfigure(config):
+ from sqlalchemy.testing import asyncio
+
+ asyncio._shutdown()
+
+
def pytest_collection_finish(session):
if session.config.option.dump_pyannotate:
from pyannotate_runtime import collect_types
from typing import Optional
from typing import TYPE_CHECKING
from typing import TypeVar
+from typing import Union
from .langhelpers import memoized_property
from .. import exc
+from ..util import py311
+from ..util.typing import Literal
from ..util.typing import Protocol
+from ..util.typing import Self
from ..util.typing import TypeGuard
_T = TypeVar("_T")
self.mutex.release()
-def _util_async_run_coroutine_function(
- fn: Callable[..., Coroutine[Any, Any, Any]], *args: Any, **kwargs: Any
-) -> Any:
- """for test suite/ util only"""
-
- loop = get_event_loop()
- if loop.is_running():
- raise Exception(
- "for async run coroutine we expect that no greenlet or event "
- "loop is running when we start out"
- )
- return loop.run_until_complete(fn(*args, **kwargs))
-
-
-def _util_async_run(
- fn: Callable[..., Coroutine[Any, Any, Any]], *args: Any, **kwargs: Any
-) -> Any:
- """for test suite/ util only"""
-
- loop = get_event_loop()
- if not loop.is_running():
- return loop.run_until_complete(greenlet_spawn(fn, *args, **kwargs))
- else:
- # allow for a wrapped test function to call another
- assert isinstance(getcurrent(), _AsyncIoGreenlet)
- return fn(*args, **kwargs)
-
-
def get_event_loop() -> asyncio.AbstractEventLoop:
"""vendor asyncio.get_event_loop() for python 3.7 and above.
# avoid "During handling of the above exception, another exception..."
pass
return asyncio.get_event_loop_policy().get_event_loop()
+
+
+if py311:
+ _Runner = asyncio.Runner
+else:
+
+ class _Runner: # type: ignore[no-redef]
+ """Runner implementation for test only"""
+
+ _loop: Union[None, asyncio.AbstractEventLoop, Literal[False]]
+
+ def __init__(self) -> None:
+ self._loop = None
+
+ def __enter__(self) -> Self:
+ self._lazy_init()
+ return self
+
+ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
+ self.close()
+
+ def close(self) -> None:
+ if self._loop:
+ try:
+ self._loop.run_until_complete(
+ self._loop.shutdown_asyncgens()
+ )
+ finally:
+ self._loop.close()
+ self._loop = False
+
+ def get_loop(self) -> asyncio.AbstractEventLoop:
+ """Return embedded event loop."""
+ self._lazy_init()
+ assert self._loop
+ return self._loop
+
+ def run(self, coro: Coroutine[Any, Any, _T]) -> _T:
+ self._lazy_init()
+ assert self._loop
+ return self._loop.run_until_complete(coro)
+
+ def _lazy_init(self) -> None:
+ if self._loop is False:
+ raise RuntimeError("Runner is closed")
+ if self._loop is None:
+ self._loop = asyncio.new_event_loop()
import asyncio # noqa
import typing
+from typing import Any
+from typing import Callable
+from typing import Coroutine
+from typing import TypeVar
have_greenlet = False
greenlet_error = None
from ._concurrency_py3k import greenlet_spawn as greenlet_spawn
from ._concurrency_py3k import is_exit_exception as is_exit_exception
from ._concurrency_py3k import AsyncAdaptedLock as AsyncAdaptedLock
- from ._concurrency_py3k import (
- _util_async_run as _util_async_run,
- ) # noqa: F401
- from ._concurrency_py3k import (
- _util_async_run_coroutine_function as _util_async_run_coroutine_function, # noqa: F401, E501
- )
+ from ._concurrency_py3k import _Runner
+
+_T = TypeVar("_T")
+
+
+class _AsyncUtil:
+ """Asyncio util for test suite/ util only"""
+
+ def __init__(self) -> None:
+ if have_greenlet:
+ self.runner = _Runner()
+
+ def run(
+ self,
+ fn: Callable[..., Coroutine[Any, Any, _T]],
+ *args: Any,
+ **kwargs: Any,
+ ) -> _T:
+ """Run coroutine on the loop"""
+ return self.runner.run(fn(*args, **kwargs))
+
+ def run_in_greenlet(
+ self, fn: Callable[..., _T], *args: Any, **kwargs: Any
+ ) -> _T:
+ """Run sync function in greenlet. Support nested calls"""
+ if have_greenlet:
+ if self.runner.get_loop().is_running():
+ return fn(*args, **kwargs)
+ else:
+ return self.runner.run(greenlet_spawn(fn, *args, **kwargs))
+ else:
+ return fn(*args, **kwargs)
+
+ def close(self) -> None:
+ if have_greenlet:
+ self.runner.close()
+
if not typing.TYPE_CHECKING and not have_greenlet: