From: Mike Bayer Date: Wed, 19 Jan 2022 19:31:52 +0000 (-0500) Subject: Add AdaptedConnection.run_async X-Git-Tag: rel_2_0_0b1~533^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=09ad975505adb2118f229cb5b1a75c2c412420ae;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Add AdaptedConnection.run_async Added new method :meth:`.AdaptedConnection.run_async` to the DBAPI connection interface used by asyncio drivers, which allows methods to be called against the underlying "driver" connection directly within a sync-style function where the ``await`` keyword can't be used, such as within SQLAlchemy event handler functions. The method is analogous to the :meth:`_asyncio.AsyncConnection.run_sync` method which translates async-style calls to sync-style. The method is useful for things like connection-pool on-connect handlers that need to invoke awaitable methods on the driver connection when it's first created. Fixes: #7580 Change-Id: I03c98a72bda0234deb19c00095b31a36f19bf36d --- diff --git a/doc/build/changelog/unreleased_14/7580.rst b/doc/build/changelog/unreleased_14/7580.rst new file mode 100644 index 0000000000..fa02085b2b --- /dev/null +++ b/doc/build/changelog/unreleased_14/7580.rst @@ -0,0 +1,18 @@ +.. change:: + :tags: usecase, asyncio + :tickets: 7580 + + Added new method :meth:`.AdaptedConnection.run_async` to the DBAPI + connection interface used by asyncio drivers, which allows methods to be + called against the underlying "driver" connection directly within a + sync-style function where the ``await`` keyword can't be used, such as + within SQLAlchemy event handler functions. The method is analogous to the + :meth:`_asyncio.AsyncConnection.run_sync` method which translates + async-style calls to sync-style. The method is useful for things like + connection-pool on-connect handlers that need to invoke awaitable methods + on the driver connection when it's first created. + + .. seealso:: + + :ref:`asyncio_events_run_async` + diff --git a/doc/build/orm/extensions/asyncio.rst b/doc/build/orm/extensions/asyncio.rst index 7d60d809a4..4aff3130b2 100644 --- a/doc/build/orm/extensions/asyncio.rst +++ b/doc/build/orm/extensions/asyncio.rst @@ -622,12 +622,63 @@ The above example prints something along the lines of:: to sync, and outgoing messages to the database API will be converted to asyncio transparently. +.. _asyncio_events_run_async: + +Using awaitable-only driver methods in connection pool and other events +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +As discussed in the above section, event handlers such as those oriented +around the :class:`.PoolEvents` event handlers receive a sync-style "DBAPI" connection, +which is a wrapper object supplied by SQLAlchemy asyncio dialects to adapt +the underlying asyncio "driver" connection into one that can be used by +SQLAlchemy's internals. A special use case arises when the user-defined +implementation for such an event handler needs to make use of the +ultimate "driver" connection directly, using awaitable only methods on that +driver connection. One such example is the ``.set_type_codec()`` method +supplied by the asyncpg driver. + +To accommodate this use case, SQLAlchemy's :class:`.AdaptedConnection` +class provides a method :meth:`.AdaptedConnection.run_async` that allows +an awaitable function to be invoked within the "synchronous" context of +an event handler or other SQLAlchemy internal. This method is directly +analogous to the :meth:`_asyncio.AsyncConnection.run_sync` method that +allows a sync-style method to run under async. + +:meth:`.AdaptedConnection.run_async` should be passed a function that will +accept the innermost "driver" connection as a single argument, and return +an awaitable that will be invoked by the :meth:`.AdaptedConnection.run_async` +method. The given function itself does not need to be declared as ``async``; +it's perfectly fine for it to be a Python ``lambda:``, as the return awaitable +value will be invoked after being returned:: + + from sqlalchemy.ext.asyncio import create_async_engine + from sqlalchemy import event + + engine = create_async_engine(...) + + @event.listens_for(engine.sync_engine, "connect") + def register_custom_types(dbapi_connection, ...): + dbapi_connection.run_async( + lambda connection: connection.set_type_codec('MyCustomType', encoder, decoder, ...) + ) + +Above, the object passed to the ``register_custom_types`` event handler +is an instance of :class:`.AdaptedConnection`, which provides a DBAPI-like +interface to an underlying async-only driver-level connection object. +The :meth:`.AdaptedConnection.run_async` method then provides access to an +awaitable environment where the underlying driver level connection may be +acted upon. + +.. versionadded:: 1.4.30 + + Using multiple asyncio event loops ---------------------------------- -An application that makes use of multiple event loops, for example by combining asyncio -with multithreading, should not share the same :class:`_asyncio.AsyncEngine` -with different event loops when using the default pool implementation. +An application that makes use of multiple event loops, for example in the +uncommon case of combining asyncio with multithreading, should not share the +same :class:`_asyncio.AsyncEngine` with different event loops when using the +default pool implementation. If an :class:`_asyncio.AsyncEngine` is be passed from one event loop to another, the method :meth:`_asyncio.AsyncEngine.dispose()` should be called before it's diff --git a/lib/sqlalchemy/engine/interfaces.py b/lib/sqlalchemy/engine/interfaces.py index 80c848379c..62477f077b 100644 --- a/lib/sqlalchemy/engine/interfaces.py +++ b/lib/sqlalchemy/engine/interfaces.py @@ -23,6 +23,7 @@ from typing import Union from ..pool import PoolProxiedConnection from ..sql.compiler import Compiled # noqa from ..sql.compiler import TypeCompiler # noqa +from ..util.concurrency import await_only from ..util.typing import _TypeToInstance from ..util.typing import NotRequired from ..util.typing import Protocol @@ -2311,5 +2312,34 @@ class AdaptedConnection: """The connection object as returned by the driver after a connect.""" return self._connection + def run_async(self, fn): + """Run the awaitable returned by the given function, which is passed + the raw asyncio driver connection. + + This is used to invoke awaitable-only methods on the driver connection + within the context of a "synchronous" method, like a connection + pool event handler. + + E.g.:: + + engine = create_async_engine(...) + + @event.listens_for(engine.sync_engine, "connect") + def register_custom_types(dbapi_connection, ...): + dbapi_connection.run_async( + lambda connection: connection.set_type_codec( + 'MyCustomType', encoder, decoder, ... + ) + ) + + .. versionadded:: 1.4.30 + + .. seealso:: + + :ref:`asyncio_events_run_async` + + """ + return await_only(fn(self._connection)) + def __repr__(self): return "" % self._connection diff --git a/test/ext/asyncio/test_engine_py3k.py b/test/ext/asyncio/test_engine_py3k.py index 7e680bd0ec..4de8ae8bb6 100644 --- a/test/ext/asyncio/test_engine_py3k.py +++ b/test/ext/asyncio/test_engine_py3k.py @@ -1,4 +1,5 @@ import asyncio +import inspect as stdlib_inspect from sqlalchemy import Column from sqlalchemy import create_engine @@ -220,6 +221,30 @@ class AsyncEngineTest(EngineFixture): eq_(async_engine.driver, sync_engine.driver) eq_(async_engine.echo, sync_engine.echo) + @async_test + async def test_run_async(self, async_engine): + async def test_meth(async_driver_connection): + # there's no method that's guaranteed to be on every + # driver, so just stringify it and compare that to the + # outside + return str(async_driver_connection) + + def run_sync_to_async(connection): + connection_fairy = connection.connection + async_return = connection_fairy.run_async( + lambda driver_connection: test_meth(driver_connection) + ) + assert not stdlib_inspect.iscoroutine(async_return) + return async_return + + async with async_engine.connect() as conn: + driver_connection = ( + await conn.get_raw_connection() + ).driver_connection + res = await conn.run_sync(run_sync_to_async) + assert not stdlib_inspect.iscoroutine(res) + eq_(res, str(driver_connection)) + @async_test async def test_engine_eq_ne(self, async_engine): e2 = _async_engine.AsyncEngine(async_engine.sync_engine)