]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Add AdaptedConnection.run_async
authorMike Bayer <mike_mp@zzzcomputing.com>
Wed, 19 Jan 2022 19:31:52 +0000 (14:31 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Wed, 19 Jan 2022 20:48:28 +0000 (15:48 -0500)
Added new method :meth:`.AdaptedConnection.run_async` to the DBAPI
connection interface used by asyncio drivers, which allows methods to be
called against the underlying "driver" connection directly within a
sync-style function where the ``await`` keyword can't be used, such as
within SQLAlchemy event handler functions. The method is analogous to the
:meth:`_asyncio.AsyncConnection.run_sync` method which translates
async-style calls to sync-style. The method is useful for things like
connection-pool on-connect handlers that need to invoke awaitable methods
on the driver connection when it's first created.

Fixes: #7580
Change-Id: I03c98a72bda0234deb19c00095b31a36f19bf36d

doc/build/changelog/unreleased_14/7580.rst [new file with mode: 0644]
doc/build/orm/extensions/asyncio.rst
lib/sqlalchemy/engine/interfaces.py
test/ext/asyncio/test_engine_py3k.py

diff --git a/doc/build/changelog/unreleased_14/7580.rst b/doc/build/changelog/unreleased_14/7580.rst
new file mode 100644 (file)
index 0000000..fa02085
--- /dev/null
@@ -0,0 +1,18 @@
+.. change::
+    :tags: usecase, asyncio
+    :tickets: 7580
+
+    Added new method :meth:`.AdaptedConnection.run_async` to the DBAPI
+    connection interface used by asyncio drivers, which allows methods to be
+    called against the underlying "driver" connection directly within a
+    sync-style function where the ``await`` keyword can't be used, such as
+    within SQLAlchemy event handler functions. The method is analogous to the
+    :meth:`_asyncio.AsyncConnection.run_sync` method which translates
+    async-style calls to sync-style. The method is useful for things like
+    connection-pool on-connect handlers that need to invoke awaitable methods
+    on the driver connection when it's first created.
+
+    .. seealso::
+
+        :ref:`asyncio_events_run_async`
+
index 7d60d809a4b28bd75d2145542d58483fd25c1ebe..4aff3130b29c44f7d7a2941a9aad30ec8be474a1 100644 (file)
@@ -622,12 +622,63 @@ The above example prints something along the lines of::
     to sync, and outgoing messages to the database API will be converted
     to asyncio transparently.
 
+.. _asyncio_events_run_async:
+
+Using awaitable-only driver methods in connection pool and other events
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+As discussed in the above section, event handlers such as those oriented
+around the :class:`.PoolEvents` event handlers receive a sync-style "DBAPI" connection,
+which is a wrapper object supplied by SQLAlchemy asyncio dialects to adapt
+the underlying asyncio "driver" connection into one that can be used by
+SQLAlchemy's internals.    A special use case arises when the user-defined
+implementation for such an event handler needs to make use of the
+ultimate "driver" connection directly, using awaitable only methods on that
+driver connection.  One such example is the ``.set_type_codec()`` method
+supplied by the asyncpg driver.
+
+To accommodate this use case, SQLAlchemy's :class:`.AdaptedConnection`
+class provides a method :meth:`.AdaptedConnection.run_async` that allows
+an awaitable function to be invoked within the "synchronous" context of
+an event handler or other SQLAlchemy internal.  This method is directly
+analogous to the :meth:`_asyncio.AsyncConnection.run_sync` method that
+allows a sync-style method to run under async.
+
+:meth:`.AdaptedConnection.run_async` should be passed a function that will
+accept the innermost "driver" connection as a single argument, and return
+an awaitable that will be invoked by the :meth:`.AdaptedConnection.run_async`
+method.  The given function itself does not need to be declared as ``async``;
+it's perfectly fine for it to be a Python ``lambda:``, as the return awaitable
+value will be invoked after being returned::
+
+    from sqlalchemy.ext.asyncio import create_async_engine
+    from sqlalchemy import event
+
+    engine = create_async_engine(...)
+
+    @event.listens_for(engine.sync_engine, "connect")
+    def register_custom_types(dbapi_connection, ...):
+        dbapi_connection.run_async(
+            lambda connection: connection.set_type_codec('MyCustomType', encoder, decoder, ...)
+        )
+
+Above, the object passed to the ``register_custom_types`` event handler
+is an instance of :class:`.AdaptedConnection`, which provides a DBAPI-like
+interface to an underlying async-only driver-level connection object.
+The :meth:`.AdaptedConnection.run_async` method then provides access to an
+awaitable environment where the underlying driver level connection may be
+acted upon.
+
+.. versionadded:: 1.4.30
+
+
 Using multiple asyncio event loops
 ----------------------------------
 
-An application that makes use of multiple event loops, for example by combining asyncio
-with multithreading, should not share the same :class:`_asyncio.AsyncEngine`
-with different event loops when using the default pool implementation.
+An application that makes use of multiple event loops, for example in the
+uncommon case of combining asyncio with multithreading, should not share the
+same :class:`_asyncio.AsyncEngine` with different event loops when using the
+default pool implementation.
 
 If an :class:`_asyncio.AsyncEngine` is be passed from one event loop to another,
 the method :meth:`_asyncio.AsyncEngine.dispose()` should be called before it's
index 80c848379cf23a999ed77c58d268e1e37c1549e1..62477f077bcfb561cdb6bb5b0dde2936f6bd89da 100644 (file)
@@ -23,6 +23,7 @@ from typing import Union
 from ..pool import PoolProxiedConnection
 from ..sql.compiler import Compiled  # noqa
 from ..sql.compiler import TypeCompiler  # noqa
+from ..util.concurrency import await_only
 from ..util.typing import _TypeToInstance
 from ..util.typing import NotRequired
 from ..util.typing import Protocol
@@ -2311,5 +2312,34 @@ class AdaptedConnection:
         """The connection object as returned by the driver after a connect."""
         return self._connection
 
+    def run_async(self, fn):
+        """Run the awaitable returned by the given function, which is passed
+        the raw asyncio driver connection.
+
+        This is used to invoke awaitable-only methods on the driver connection
+        within the context of a "synchronous" method, like a connection
+        pool event handler.
+
+        E.g.::
+
+            engine = create_async_engine(...)
+
+            @event.listens_for(engine.sync_engine, "connect")
+            def register_custom_types(dbapi_connection, ...):
+                dbapi_connection.run_async(
+                    lambda connection: connection.set_type_codec(
+                        'MyCustomType', encoder, decoder, ...
+                    )
+                )
+
+        .. versionadded:: 1.4.30
+
+        .. seealso::
+
+            :ref:`asyncio_events_run_async`
+
+        """
+        return await_only(fn(self._connection))
+
     def __repr__(self):
         return "<AdaptedConnection %s>" % self._connection
index 7e680bd0ec0fa9c422500419e7e6700150511e61..4de8ae8bb66bd1eba6becd967a06496fc26d17a4 100644 (file)
@@ -1,4 +1,5 @@
 import asyncio
+import inspect as stdlib_inspect
 
 from sqlalchemy import Column
 from sqlalchemy import create_engine
@@ -220,6 +221,30 @@ class AsyncEngineTest(EngineFixture):
         eq_(async_engine.driver, sync_engine.driver)
         eq_(async_engine.echo, sync_engine.echo)
 
+    @async_test
+    async def test_run_async(self, async_engine):
+        async def test_meth(async_driver_connection):
+            # there's no method that's guaranteed to be on every
+            # driver, so just stringify it and compare that to the
+            # outside
+            return str(async_driver_connection)
+
+        def run_sync_to_async(connection):
+            connection_fairy = connection.connection
+            async_return = connection_fairy.run_async(
+                lambda driver_connection: test_meth(driver_connection)
+            )
+            assert not stdlib_inspect.iscoroutine(async_return)
+            return async_return
+
+        async with async_engine.connect() as conn:
+            driver_connection = (
+                await conn.get_raw_connection()
+            ).driver_connection
+            res = await conn.run_sync(run_sync_to_async)
+            assert not stdlib_inspect.iscoroutine(res)
+            eq_(res, str(driver_connection))
+
     @async_test
     async def test_engine_eq_ne(self, async_engine):
         e2 = _async_engine.AsyncEngine(async_engine.sync_engine)