From 1323b436d8612a2684908f30e35a38eb9d74a9f2 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Wed, 19 Jan 2022 14:31:52 -0500 Subject: [PATCH] Add AdaptedConnection.run_async Added new method :meth:`.AdaptedConnection.run_async` to the DBAPI connection interface used by asyncio drivers, which allows methods to be called against the underlying "driver" connection directly within a sync-style function where the ``await`` keyword can't be used, such as within SQLAlchemy event handler functions. The method is analogous to the :meth:`_asyncio.AsyncConnection.run_sync` method which translates async-style calls to sync-style. The method is useful for things like connection-pool on-connect handlers that need to invoke awaitable methods on the driver connection when it's first created. Fixes: #7580 Change-Id: I03c98a72bda0234deb19c00095b31a36f19bf36d (cherry picked from commit 09ad975505adb2118f229cb5b1a75c2c412420ae) --- doc/build/changelog/unreleased_14/7580.rst | 18 +++++++ doc/build/orm/extensions/asyncio.rst | 57 ++++++++++++++++++++-- lib/sqlalchemy/engine/interfaces.py | 30 ++++++++++++ test/ext/asyncio/test_engine_py3k.py | 25 ++++++++++ 4 files changed, 127 insertions(+), 3 deletions(-) create mode 100644 doc/build/changelog/unreleased_14/7580.rst diff --git a/doc/build/changelog/unreleased_14/7580.rst b/doc/build/changelog/unreleased_14/7580.rst new file mode 100644 index 0000000000..fa02085b2b --- /dev/null +++ b/doc/build/changelog/unreleased_14/7580.rst @@ -0,0 +1,18 @@ +.. change:: + :tags: usecase, asyncio + :tickets: 7580 + + Added new method :meth:`.AdaptedConnection.run_async` to the DBAPI + connection interface used by asyncio drivers, which allows methods to be + called against the underlying "driver" connection directly within a + sync-style function where the ``await`` keyword can't be used, such as + within SQLAlchemy event handler functions. The method is analogous to the + :meth:`_asyncio.AsyncConnection.run_sync` method which translates + async-style calls to sync-style. The method is useful for things like + connection-pool on-connect handlers that need to invoke awaitable methods + on the driver connection when it's first created. + + .. seealso:: + + :ref:`asyncio_events_run_async` + diff --git a/doc/build/orm/extensions/asyncio.rst b/doc/build/orm/extensions/asyncio.rst index a7d2fb16be..0851c52968 100644 --- a/doc/build/orm/extensions/asyncio.rst +++ b/doc/build/orm/extensions/asyncio.rst @@ -622,12 +622,63 @@ The above example prints something along the lines of:: to sync, and outgoing messages to the database API will be converted to asyncio transparently. +.. _asyncio_events_run_async: + +Using awaitable-only driver methods in connection pool and other events +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +As discussed in the above section, event handlers such as those oriented +around the :class:`.PoolEvents` event handlers receive a sync-style "DBAPI" connection, +which is a wrapper object supplied by SQLAlchemy asyncio dialects to adapt +the underlying asyncio "driver" connection into one that can be used by +SQLAlchemy's internals. A special use case arises when the user-defined +implementation for such an event handler needs to make use of the +ultimate "driver" connection directly, using awaitable only methods on that +driver connection. One such example is the ``.set_type_codec()`` method +supplied by the asyncpg driver. + +To accommodate this use case, SQLAlchemy's :class:`.AdaptedConnection` +class provides a method :meth:`.AdaptedConnection.run_async` that allows +an awaitable function to be invoked within the "synchronous" context of +an event handler or other SQLAlchemy internal. This method is directly +analogous to the :meth:`_asyncio.AsyncConnection.run_sync` method that +allows a sync-style method to run under async. + +:meth:`.AdaptedConnection.run_async` should be passed a function that will +accept the innermost "driver" connection as a single argument, and return +an awaitable that will be invoked by the :meth:`.AdaptedConnection.run_async` +method. The given function itself does not need to be declared as ``async``; +it's perfectly fine for it to be a Python ``lambda:``, as the return awaitable +value will be invoked after being returned:: + + from sqlalchemy.ext.asyncio import create_async_engine + from sqlalchemy import event + + engine = create_async_engine(...) + + @event.listens_for(engine.sync_engine, "connect") + def register_custom_types(dbapi_connection, ...): + dbapi_connection.run_async( + lambda connection: connection.set_type_codec('MyCustomType', encoder, decoder, ...) + ) + +Above, the object passed to the ``register_custom_types`` event handler +is an instance of :class:`.AdaptedConnection`, which provides a DBAPI-like +interface to an underlying async-only driver-level connection object. +The :meth:`.AdaptedConnection.run_async` method then provides access to an +awaitable environment where the underlying driver level connection may be +acted upon. + +.. versionadded:: 1.4.30 + + Using multiple asyncio event loops ---------------------------------- -An application that makes use of multiple event loops, for example by combining asyncio -with multithreading, should not share the same :class:`_asyncio.AsyncEngine` -with different event loops when using the default pool implementation. +An application that makes use of multiple event loops, for example in the +uncommon case of combining asyncio with multithreading, should not share the +same :class:`_asyncio.AsyncEngine` with different event loops when using the +default pool implementation. If an :class:`_asyncio.AsyncEngine` is be passed from one event loop to another, the method :meth:`_asyncio.AsyncEngine.dispose()` should be called before it's diff --git a/lib/sqlalchemy/engine/interfaces.py b/lib/sqlalchemy/engine/interfaces.py index 0bfd8fb8b5..e86fa2b6e7 100644 --- a/lib/sqlalchemy/engine/interfaces.py +++ b/lib/sqlalchemy/engine/interfaces.py @@ -10,6 +10,7 @@ from .. import util from ..sql.compiler import Compiled # noqa from ..sql.compiler import TypeCompiler # noqa +from ..util.concurrency import await_only class Dialect(object): @@ -1752,5 +1753,34 @@ class AdaptedConnection(object): """The connection object as returned by the driver after a connect.""" return self._connection + def run_async(self, fn): + """Run the awaitable returned by the given function, which is passed + the raw asyncio driver connection. + + This is used to invoke awaitable-only methods on the driver connection + within the context of a "synchronous" method, like a connection + pool event handler. + + E.g.:: + + engine = create_async_engine(...) + + @event.listens_for(engine.sync_engine, "connect") + def register_custom_types(dbapi_connection, ...): + dbapi_connection.run_async( + lambda connection: connection.set_type_codec( + 'MyCustomType', encoder, decoder, ... + ) + ) + + .. versionadded:: 1.4.30 + + .. seealso:: + + :ref:`asyncio_events_run_async` + + """ + return await_only(fn(self._connection)) + def __repr__(self): return "" % self._connection diff --git a/test/ext/asyncio/test_engine_py3k.py b/test/ext/asyncio/test_engine_py3k.py index bd07bba0db..e88ef5464e 100644 --- a/test/ext/asyncio/test_engine_py3k.py +++ b/test/ext/asyncio/test_engine_py3k.py @@ -1,4 +1,5 @@ import asyncio +import inspect as stdlib_inspect from sqlalchemy import Column from sqlalchemy import create_engine @@ -220,6 +221,30 @@ class AsyncEngineTest(EngineFixture): eq_(async_engine.driver, sync_engine.driver) eq_(async_engine.echo, sync_engine.echo) + @async_test + async def test_run_async(self, async_engine): + async def test_meth(async_driver_connection): + # there's no method that's guaranteed to be on every + # driver, so just stringify it and compare that to the + # outside + return str(async_driver_connection) + + def run_sync_to_async(connection): + connection_fairy = connection.connection + async_return = connection_fairy.run_async( + lambda driver_connection: test_meth(driver_connection) + ) + assert not stdlib_inspect.iscoroutine(async_return) + return async_return + + async with async_engine.connect() as conn: + driver_connection = ( + await conn.get_raw_connection() + ).driver_connection + res = await conn.run_sync(run_sync_to_async) + assert not stdlib_inspect.iscoroutine(res) + eq_(res, str(driver_connection)) + @async_test async def test_engine_eq_ne(self, async_engine): e2 = _async_engine.AsyncEngine(async_engine.sync_engine) -- 2.47.2