]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
use asyncio.Runner for tests
authorMike Bayer <mike_mp@zzzcomputing.com>
Sat, 23 Mar 2024 17:02:46 +0000 (13:02 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sat, 23 Mar 2024 17:45:18 +0000 (13:45 -0400)
Backported to SQLAlchemy 2.0 an improvement to the test suite with regards
to how asyncio related tests are run, now using the newer Python 3.11
``asyncio.Runner`` or a backported equivalent, rather than relying on the
previous implementation based on ``asyncio.get_running_loop()``.  This
should hopefully prevent issues with large suite runs on CPU loaded
hardware where the event loop seems to become corrupted, leading to
cascading failures.

Fixes: #11187
Change-Id: I867b2478b9ba3a152fbfef380650eb987527ba46

doc/build/changelog/unreleased_20/11187.rst [new file with mode: 0644]
lib/sqlalchemy/testing/asyncio.py
lib/sqlalchemy/testing/plugin/pytestplugin.py
lib/sqlalchemy/util/_concurrency_py3k.py
lib/sqlalchemy/util/concurrency.py

diff --git a/doc/build/changelog/unreleased_20/11187.rst b/doc/build/changelog/unreleased_20/11187.rst
new file mode 100644 (file)
index 0000000..be16ef3
--- /dev/null
@@ -0,0 +1,12 @@
+.. change::
+    :tags: bug, tests
+    :tickets: 11187
+
+    Backported to SQLAlchemy 2.0 an improvement to the test suite with regards
+    to how asyncio related tests are run, now using the newer Python 3.11
+    ``asyncio.Runner`` or a backported equivalent, rather than relying on the
+    previous implementation based on ``asyncio.get_running_loop()``.  This
+    should hopefully prevent issues with large suite runs on CPU loaded
+    hardware where the event loop seems to become corrupted, leading to
+    cascading failures.
+
index 17dc861c95ad931359ba1da95d0bf9a4ffc71b41..f71ca57fe57147fcf61e9b152085832f58d3505a 100644 (file)
@@ -24,16 +24,21 @@ from functools import wraps
 import inspect
 
 from . import config
-from ..util.concurrency import _util_async_run
-from ..util.concurrency import _util_async_run_coroutine_function
+from ..util.concurrency import _AsyncUtil
 
 # may be set to False if the
 # --disable-asyncio flag is passed to the test runner.
 ENABLE_ASYNCIO = True
+_async_util = _AsyncUtil()  # it has lazy init so just always create one
+
+
+def _shutdown():
+    """called when the test finishes"""
+    _async_util.close()
 
 
 def _run_coroutine_function(fn, *args, **kwargs):
-    return _util_async_run_coroutine_function(fn, *args, **kwargs)
+    return _async_util.run(fn, *args, **kwargs)
 
 
 def _assume_async(fn, *args, **kwargs):
@@ -50,7 +55,7 @@ def _assume_async(fn, *args, **kwargs):
     if not ENABLE_ASYNCIO:
         return fn(*args, **kwargs)
 
-    return _util_async_run(fn, *args, **kwargs)
+    return _async_util.run_in_greenlet(fn, *args, **kwargs)
 
 
 def _maybe_async_provisioning(fn, *args, **kwargs):
@@ -69,7 +74,7 @@ def _maybe_async_provisioning(fn, *args, **kwargs):
         return fn(*args, **kwargs)
 
     if config.any_async:
-        return _util_async_run(fn, *args, **kwargs)
+        return _async_util.run_in_greenlet(fn, *args, **kwargs)
     else:
         return fn(*args, **kwargs)
 
@@ -89,7 +94,7 @@ def _maybe_async(fn, *args, **kwargs):
     is_async = config._current.is_async
 
     if is_async:
-        return _util_async_run(fn, *args, **kwargs)
+        return _async_util.run_in_greenlet(fn, *args, **kwargs)
     else:
         return fn(*args, **kwargs)
 
index b63e06359c5dfb337886e89b99a609f21da2dfd4..1a4d4bb30a179ceb48467e66d5d85ce49bcb0554 100644 (file)
@@ -182,6 +182,12 @@ def pytest_sessionfinish(session):
         collect_types.dump_stats(session.config.option.dump_pyannotate)
 
 
+def pytest_unconfigure(config):
+    from sqlalchemy.testing import asyncio
+
+    asyncio._shutdown()
+
+
 def pytest_collection_finish(session):
     if session.config.option.dump_pyannotate:
         from pyannotate_runtime import collect_types
index 42ceb8122ee9c85082ea566216d3415686e040cc..defef1f6bf3c12facc54dc13a9e0abf96b44b39a 100644 (file)
@@ -19,10 +19,14 @@ from typing import Coroutine
 from typing import Optional
 from typing import TYPE_CHECKING
 from typing import TypeVar
+from typing import Union
 
 from .langhelpers import memoized_property
 from .. import exc
+from ..util import py311
+from ..util.typing import Literal
 from ..util.typing import Protocol
+from ..util.typing import Self
 from ..util.typing import TypeGuard
 
 _T = TypeVar("_T")
@@ -225,34 +229,6 @@ class AsyncAdaptedLock:
         self.mutex.release()
 
 
-def _util_async_run_coroutine_function(
-    fn: Callable[..., Coroutine[Any, Any, Any]], *args: Any, **kwargs: Any
-) -> Any:
-    """for test suite/ util only"""
-
-    loop = get_event_loop()
-    if loop.is_running():
-        raise Exception(
-            "for async run coroutine we expect that no greenlet or event "
-            "loop is running when we start out"
-        )
-    return loop.run_until_complete(fn(*args, **kwargs))
-
-
-def _util_async_run(
-    fn: Callable[..., Coroutine[Any, Any, Any]], *args: Any, **kwargs: Any
-) -> Any:
-    """for test suite/ util only"""
-
-    loop = get_event_loop()
-    if not loop.is_running():
-        return loop.run_until_complete(greenlet_spawn(fn, *args, **kwargs))
-    else:
-        # allow for a wrapped test function to call another
-        assert isinstance(getcurrent(), _AsyncIoGreenlet)
-        return fn(*args, **kwargs)
-
-
 def get_event_loop() -> asyncio.AbstractEventLoop:
     """vendor asyncio.get_event_loop() for python 3.7 and above.
 
@@ -265,3 +241,50 @@ def get_event_loop() -> asyncio.AbstractEventLoop:
         # avoid "During handling of the above exception, another exception..."
         pass
     return asyncio.get_event_loop_policy().get_event_loop()
+
+
+if py311:
+    _Runner = asyncio.Runner
+else:
+
+    class _Runner:  # type: ignore[no-redef]
+        """Runner implementation for test only"""
+
+        _loop: Union[None, asyncio.AbstractEventLoop, Literal[False]]
+
+        def __init__(self) -> None:
+            self._loop = None
+
+        def __enter__(self) -> Self:
+            self._lazy_init()
+            return self
+
+        def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
+            self.close()
+
+        def close(self) -> None:
+            if self._loop:
+                try:
+                    self._loop.run_until_complete(
+                        self._loop.shutdown_asyncgens()
+                    )
+                finally:
+                    self._loop.close()
+                    self._loop = False
+
+        def get_loop(self) -> asyncio.AbstractEventLoop:
+            """Return embedded event loop."""
+            self._lazy_init()
+            assert self._loop
+            return self._loop
+
+        def run(self, coro: Coroutine[Any, Any, _T]) -> _T:
+            self._lazy_init()
+            assert self._loop
+            return self._loop.run_until_complete(coro)
+
+        def _lazy_init(self) -> None:
+            if self._loop is False:
+                raise RuntimeError("Runner is closed")
+            if self._loop is None:
+                self._loop = asyncio.new_event_loop()
index 65a62052125f0b2d457d7c977d379015275b3731..96ba416cabfaebaa17376596053a5ac472bb061d 100644 (file)
@@ -10,6 +10,10 @@ from __future__ import annotations
 
 import asyncio  # noqa
 import typing
+from typing import Any
+from typing import Callable
+from typing import Coroutine
+from typing import TypeVar
 
 have_greenlet = False
 greenlet_error = None
@@ -26,12 +30,43 @@ else:
     from ._concurrency_py3k import greenlet_spawn as greenlet_spawn
     from ._concurrency_py3k import is_exit_exception as is_exit_exception
     from ._concurrency_py3k import AsyncAdaptedLock as AsyncAdaptedLock
-    from ._concurrency_py3k import (
-        _util_async_run as _util_async_run,
-    )  # noqa: F401
-    from ._concurrency_py3k import (
-        _util_async_run_coroutine_function as _util_async_run_coroutine_function,  # noqa: F401, E501
-    )
+    from ._concurrency_py3k import _Runner
+
+_T = TypeVar("_T")
+
+
+class _AsyncUtil:
+    """Asyncio util for test suite/ util only"""
+
+    def __init__(self) -> None:
+        if have_greenlet:
+            self.runner = _Runner()
+
+    def run(
+        self,
+        fn: Callable[..., Coroutine[Any, Any, _T]],
+        *args: Any,
+        **kwargs: Any,
+    ) -> _T:
+        """Run coroutine on the loop"""
+        return self.runner.run(fn(*args, **kwargs))
+
+    def run_in_greenlet(
+        self, fn: Callable[..., _T], *args: Any, **kwargs: Any
+    ) -> _T:
+        """Run sync function in greenlet. Support nested calls"""
+        if have_greenlet:
+            if self.runner.get_loop().is_running():
+                return fn(*args, **kwargs)
+            else:
+                return self.runner.run(greenlet_spawn(fn, *args, **kwargs))
+        else:
+            return fn(*args, **kwargs)
+
+    def close(self) -> None:
+        if have_greenlet:
+            self.runner.close()
+
 
 if not typing.TYPE_CHECKING and not have_greenlet: