From 410a4f00ba701c5655fe5de69ab77e866fcc8ee5 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Wed, 12 Jun 2024 12:42:29 -0400 Subject: [PATCH] open up async greenlet for third parties Modified the internal representation used for adapting asyncio calls to greenlets to allow for duck-typed compatibility with third party libraries that implement SQLAlchemy's "greenlet-to-asyncio" pattern directly. Running code within a greenlet that features the attribute ``__sqlalchemy_greenlet_provider__ = True`` will allow calls to :func:`sqlalchemy.util.await_only` directly. Change-Id: I79c67264e1a642b9a80d3b46dc64bdda80acf0aa (cherry picked from commit c1e2d9180a14c74495b712e08d8156b92f907ac0) (cherry picked from commit 1a6ff466b29ad3a114a27f2776538d8d998db2dd) --- .../unreleased_14/greenlet_compat.rst | 10 ++++ lib/sqlalchemy/util/_concurrency_py3k.py | 49 ++++++++++--------- 2 files changed, 35 insertions(+), 24 deletions(-) create mode 100644 doc/build/changelog/unreleased_14/greenlet_compat.rst diff --git a/doc/build/changelog/unreleased_14/greenlet_compat.rst b/doc/build/changelog/unreleased_14/greenlet_compat.rst new file mode 100644 index 0000000000..d9eb51cd9c --- /dev/null +++ b/doc/build/changelog/unreleased_14/greenlet_compat.rst @@ -0,0 +1,10 @@ +.. change:: + :tags: usecase, engine + + Modified the internal representation used for adapting asyncio calls to + greenlets to allow for duck-typed compatibility with third party libraries + that implement SQLAlchemy's "greenlet-to-asyncio" pattern directly. + Running code within a greenlet that features the attribute + ``__sqlalchemy_greenlet_provider__ = True`` will allow calls to + :func:`sqlalchemy.util.await_only` directly. + diff --git a/lib/sqlalchemy/util/_concurrency_py3k.py b/lib/sqlalchemy/util/_concurrency_py3k.py index cc5b1c2fae..1e4ffefa40 100644 --- a/lib/sqlalchemy/util/_concurrency_py3k.py +++ b/lib/sqlalchemy/util/_concurrency_py3k.py @@ -37,9 +37,11 @@ def is_exit_exception(e): class _AsyncIoGreenlet(greenlet.greenlet): + + __sqlalchemy_greenlet_provider__ = True + def __init__(self, fn, driver): greenlet.greenlet.__init__(self, fn, driver) - self.driver = driver if _has_gr_context: self.gr_context = driver.gr_context @@ -55,7 +57,7 @@ def await_only(awaitable: Coroutine) -> Any: """ # this is called in the context greenlet while running fn current = greenlet.getcurrent() - if not isinstance(current, _AsyncIoGreenlet): + if not getattr(current, "__sqlalchemy_greenlet_provider__", False): raise exc.MissingGreenlet( "greenlet_spawn has not been called; can't call await_only() " "here. Was IO attempted in an unexpected place?" @@ -65,7 +67,7 @@ def await_only(awaitable: Coroutine) -> Any: # a coroutine to run. Once the awaitable is done, the driver greenlet # switches back to this greenlet with the result of awaitable that is # then returned to the caller (or raised as error) - return current.driver.switch(awaitable) + return current.parent.switch(awaitable) def await_fallback(awaitable: Coroutine) -> Any: @@ -79,7 +81,7 @@ def await_fallback(awaitable: Coroutine) -> Any: """ # this is called in the context greenlet while running fn current = greenlet.getcurrent() - if not isinstance(current, _AsyncIoGreenlet): + if not getattr(current, "__sqlalchemy_greenlet_provider__", False): loop = get_event_loop() if loop.is_running(): raise exc.MissingGreenlet( @@ -89,7 +91,7 @@ def await_fallback(awaitable: Coroutine) -> Any: ) return loop.run_until_complete(awaitable) - return current.driver.switch(awaitable) + return current.parent.switch(awaitable) async def greenlet_spawn( @@ -111,24 +113,21 @@ async def greenlet_spawn( # coroutine to wait. If the context is dead the function has # returned, and its result can be returned. switch_occurred = False - try: - result = context.switch(*args, **kwargs) - while not context.dead: - switch_occurred = True - try: - # wait for a coroutine from await_only and then return its - # result back to it. - value = await result - except BaseException: - # this allows an exception to be raised within - # the moderated greenlet so that it can continue - # its expected flow. - result = context.throw(*sys.exc_info()) - else: - result = context.switch(value) - finally: - # clean up to avoid cycle resolution by gc - del context.driver + result = context.switch(*args, **kwargs) + while not context.dead: + switch_occurred = True + try: + # wait for a coroutine from await_only and then return its + # result back to it. + value = await result + except BaseException: + # this allows an exception to be raised within + # the moderated greenlet so that it can continue + # its expected flow. + result = context.throw(*sys.exc_info()) + else: + result = context.switch(value) + if _require_await and not switch_occurred: raise exc.AwaitRequired( "The current operation required an async execution but none was " @@ -175,7 +174,9 @@ def _util_async_run(fn, *args, **kwargs): return loop.run_until_complete(greenlet_spawn(fn, *args, **kwargs)) else: # allow for a wrapped test function to call another - assert isinstance(greenlet.getcurrent(), _AsyncIoGreenlet) + assert getattr( + greenlet.getcurrent(), "__sqlalchemy_greenlet_provider__", False + ) return fn(*args, **kwargs) -- 2.47.2