]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
use asyncio.Runner for tests
authorMike Bayer <mike_mp@zzzcomputing.com>
Sat, 23 Mar 2024 17:02:46 +0000 (13:02 -0400)
committerNils Philippsen <nils@tiptoe.de>
Thu, 12 Jun 2025 11:31:58 +0000 (13:31 +0200)
Backported to SQLAlchemy 1.4 an improvement to the test suite with regards
to how asyncio related tests are run, now using the newer Python 3.11
``asyncio.Runner`` or a backported equivalent, rather than relying on the
previous implementation based on ``asyncio.get_running_loop()``.  This
should hopefully prevent issues with large suite runs on CPU loaded
hardware where the event loop seems to become corrupted, leading to
cascading failures.

Fixes: #12668
(cherry picked from commit 4bc12e6abda97386b2eb66aff21312d010e57e1d)

doc/build/changelog/unreleased_14/12668.rst [new file with mode: 0644]
lib/sqlalchemy/testing/asyncio.py
lib/sqlalchemy/testing/plugin/pytestplugin.py
lib/sqlalchemy/util/_concurrency_py3k.py
lib/sqlalchemy/util/concurrency.py

diff --git a/doc/build/changelog/unreleased_14/12668.rst b/doc/build/changelog/unreleased_14/12668.rst
new file mode 100644 (file)
index 0000000..200c1c5
--- /dev/null
@@ -0,0 +1,12 @@
+.. change::
+    :tags: bug, tests
+    :tickets: 12668
+
+    Backported to SQLAlchemy 1.4 an improvement to the test suite with regards
+    to how asyncio related tests are run, now using the newer Python 3.11
+    ``asyncio.Runner`` or a backported equivalent, rather than relying on the
+    previous implementation based on ``asyncio.get_running_loop()``.  This
+    should hopefully prevent issues with large suite runs on CPU loaded
+    hardware where the event loop seems to become corrupted, leading to
+    cascading failures.
+
index 5f15162002c7861373f088536571799c0de792ee..6d4201c49458662edbcbcbb9e984dd81abfbb5cd 100644 (file)
@@ -21,16 +21,21 @@ from functools import wraps
 import inspect
 
 from . import config
-from ..util.concurrency import _util_async_run
-from ..util.concurrency import _util_async_run_coroutine_function
+from ..util.concurrency import _AsyncUtil
 
 # may be set to False if the
 # --disable-asyncio flag is passed to the test runner.
 ENABLE_ASYNCIO = True
+_async_util = _AsyncUtil()  # it has lazy init so just always create one
+
+
+def _shutdown():
+    """called when the test finishes"""
+    _async_util.close()
 
 
 def _run_coroutine_function(fn, *args, **kwargs):
-    return _util_async_run_coroutine_function(fn, *args, **kwargs)
+    return _async_util.run(fn, *args, **kwargs)
 
 
 def _assume_async(fn, *args, **kwargs):
@@ -47,7 +52,7 @@ def _assume_async(fn, *args, **kwargs):
     if not ENABLE_ASYNCIO:
         return fn(*args, **kwargs)
 
-    return _util_async_run(fn, *args, **kwargs)
+    return _async_util.run_in_greenlet(fn, *args, **kwargs)
 
 
 def _maybe_async_provisioning(fn, *args, **kwargs):
@@ -66,7 +71,7 @@ def _maybe_async_provisioning(fn, *args, **kwargs):
         return fn(*args, **kwargs)
 
     if config.any_async:
-        return _util_async_run(fn, *args, **kwargs)
+        return _async_util.run_in_greenlet(fn, *args, **kwargs)
     else:
         return fn(*args, **kwargs)
 
@@ -87,7 +92,7 @@ def _maybe_async(fn, *args, **kwargs):
     is_async = config._current.is_async
 
     if is_async:
-        return _util_async_run(fn, *args, **kwargs)
+        return _async_util.run_in_greenlet(fn, *args, **kwargs)
     else:
         return fn(*args, **kwargs)
 
index 2be6e6cda5af2d3501a23ae4275dc262cebdd75f..b33dcdb0d846148fa5a66658c555167ad837235b 100644 (file)
@@ -140,6 +140,12 @@ def pytest_sessionfinish(session):
         collect_types.dump_stats(session.config.option.dump_pyannotate)
 
 
+def pytest_unconfigure(config):
+    from sqlalchemy.testing import asyncio
+
+    asyncio._shutdown()
+
+
 def pytest_collection_finish(session):
     if session.config.option.dump_pyannotate:
         from pyannotate_runtime import collect_types
index 141193ef06ea99baeb7a9ac9c2c9e419af5570ae..f20dcb05b517ae518bd20ed0f2d63560ff62f7ae 100644 (file)
@@ -10,12 +10,17 @@ import sys
 from typing import Any
 from typing import Callable
 from typing import Coroutine
+from typing import TypeVar
+from typing import Union
 
 import greenlet
 
 from . import compat
 from .langhelpers import memoized_property
 from .. import exc
+from ..util import py311
+
+_T = TypeVar("_T")
 
 # If greenlet.gr_context is present in current version of greenlet,
 # it will be set with the current context on creation.
@@ -154,32 +159,6 @@ class AsyncAdaptedLock:
         self.mutex.release()
 
 
-def _util_async_run_coroutine_function(fn, *args, **kwargs):
-    """for test suite/ util only"""
-
-    loop = get_event_loop()
-    if loop.is_running():
-        raise Exception(
-            "for async run coroutine we expect that no greenlet or event "
-            "loop is running when we start out"
-        )
-    return loop.run_until_complete(fn(*args, **kwargs))
-
-
-def _util_async_run(fn, *args, **kwargs):
-    """for test suite/ util only"""
-
-    loop = get_event_loop()
-    if not loop.is_running():
-        return loop.run_until_complete(greenlet_spawn(fn, *args, **kwargs))
-    else:
-        # allow for a wrapped test function to call another
-        assert getattr(
-            greenlet.getcurrent(), "__sqlalchemy_greenlet_provider__", False
-        )
-        return fn(*args, **kwargs)
-
-
 def get_event_loop():
     """vendor asyncio.get_event_loop() for python 3.7 and above.
 
@@ -193,3 +172,50 @@ def get_event_loop():
             return asyncio.get_event_loop_policy().get_event_loop()
     else:
         return asyncio.get_event_loop()
+
+
+if py311:
+    _Runner = asyncio.Runner
+else:
+
+    class _Runner:  # type: ignore[no-redef]
+        """Runner implementation for test only"""
+
+        _loop: Union[None, asyncio.AbstractEventLoop, bool]
+
+        def __init__(self) -> None:
+            self._loop = None
+
+        def __enter__(self):
+            self._lazy_init()
+            return self
+
+        def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
+            self.close()
+
+        def close(self) -> None:
+            if self._loop:
+                try:
+                    self._loop.run_until_complete(
+                        self._loop.shutdown_asyncgens()
+                    )
+                finally:
+                    self._loop.close()
+                    self._loop = False
+
+        def get_loop(self) -> asyncio.AbstractEventLoop:
+            """Return embedded event loop."""
+            self._lazy_init()
+            assert self._loop
+            return self._loop
+
+        def run(self, coro: Coroutine[Any, Any, _T]) -> _T:
+            self._lazy_init()
+            assert self._loop
+            return self._loop.run_until_complete(coro)
+
+        def _lazy_init(self) -> None:
+            if self._loop is False:
+                raise RuntimeError("Runner is closed")
+            if self._loop is None:
+                self._loop = asyncio.new_event_loop()
index 7341dbe685cf59d515c0b57800295c4f00e511e4..29c633e16164e9b71ae37bb01cbb73dbe14cfbb7 100644 (file)
@@ -5,6 +5,11 @@
 # This module is part of SQLAlchemy and is released under
 # the MIT License: https://www.opensource.org/licenses/mit-license.php
 
+from typing import Any
+from typing import Callable
+from typing import Coroutine
+from typing import TypeVar
+
 from . import compat
 
 have_greenlet = False
@@ -22,15 +27,48 @@ if compat.py3k:
         from ._concurrency_py3k import greenlet_spawn
         from ._concurrency_py3k import is_exit_exception
         from ._concurrency_py3k import AsyncAdaptedLock
-        from ._concurrency_py3k import _util_async_run  # noqa: F401
-        from ._concurrency_py3k import (
-            _util_async_run_coroutine_function,
-        )  # noqa: F401, E501
+        from ._concurrency_py3k import _Runner
         from ._concurrency_py3k import asyncio  # noqa: F401
 
-    # does not need greennlet, just Python 3
+    # does not need greenlet, just Python 3
     from ._compat_py3k import asynccontextmanager  # noqa: F401
 
+_T = TypeVar("_T")
+
+
+class _AsyncUtil:
+    """Asyncio util for test suite/ util only"""
+
+    def __init__(self) -> None:
+        if have_greenlet:
+            self.runner = _Runner()
+
+    def run(
+        self,
+        fn: Callable[..., Coroutine[Any, Any, _T]],
+        *args: Any,
+        **kwargs: Any,
+    ) -> _T:
+        """Run coroutine on the loop"""
+        return self.runner.run(fn(*args, **kwargs))
+
+    def run_in_greenlet(
+        self, fn: Callable[..., _T], *args: Any, **kwargs: Any
+    ) -> _T:
+        """Run sync function in greenlet. Support nested calls"""
+        if have_greenlet:
+            if self.runner.get_loop().is_running():
+                return fn(*args, **kwargs)
+            else:
+                return self.runner.run(greenlet_spawn(fn, *args, **kwargs))
+        else:
+            return fn(*args, **kwargs)
+
+    def close(self) -> None:
+        if have_greenlet:
+            self.runner.close()
+
+
 if not have_greenlet:
 
     asyncio = None  # noqa: F811