--- /dev/null
+.. change::
+ :tags: usecase, asyncio
+ :tickets: 7580
+
+ Added new method :meth:`.AdaptedConnection.run_async` to the DBAPI
+ connection interface used by asyncio drivers, which allows methods to be
+ called against the underlying "driver" connection directly within a
+ sync-style function where the ``await`` keyword can't be used, such as
+ within SQLAlchemy event handler functions. The method is analogous to the
+ :meth:`_asyncio.AsyncConnection.run_sync` method which translates
+ async-style calls to sync-style. The method is useful for things like
+ connection-pool on-connect handlers that need to invoke awaitable methods
+ on the driver connection when it's first created.
+
+ .. seealso::
+
+ :ref:`asyncio_events_run_async`
+
to sync, and outgoing messages to the database API will be converted
to asyncio transparently.
+.. _asyncio_events_run_async:
+
+Using awaitable-only driver methods in connection pool and other events
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+As discussed in the above section, event handlers such as those oriented
+around the :class:`.PoolEvents` event handlers receive a sync-style "DBAPI" connection,
+which is a wrapper object supplied by SQLAlchemy asyncio dialects to adapt
+the underlying asyncio "driver" connection into one that can be used by
+SQLAlchemy's internals. A special use case arises when the user-defined
+implementation for such an event handler needs to make use of the
+ultimate "driver" connection directly, using awaitable only methods on that
+driver connection. One such example is the ``.set_type_codec()`` method
+supplied by the asyncpg driver.
+
+To accommodate this use case, SQLAlchemy's :class:`.AdaptedConnection`
+class provides a method :meth:`.AdaptedConnection.run_async` that allows
+an awaitable function to be invoked within the "synchronous" context of
+an event handler or other SQLAlchemy internal. This method is directly
+analogous to the :meth:`_asyncio.AsyncConnection.run_sync` method that
+allows a sync-style method to run under async.
+
+:meth:`.AdaptedConnection.run_async` should be passed a function that will
+accept the innermost "driver" connection as a single argument, and return
+an awaitable that will be invoked by the :meth:`.AdaptedConnection.run_async`
+method. The given function itself does not need to be declared as ``async``;
+it's perfectly fine for it to be a Python ``lambda:``, as the return awaitable
+value will be invoked after being returned::
+
+ from sqlalchemy.ext.asyncio import create_async_engine
+ from sqlalchemy import event
+
+ engine = create_async_engine(...)
+
+ @event.listens_for(engine.sync_engine, "connect")
+ def register_custom_types(dbapi_connection, ...):
+ dbapi_connection.run_async(
+ lambda connection: connection.set_type_codec('MyCustomType', encoder, decoder, ...)
+ )
+
+Above, the object passed to the ``register_custom_types`` event handler
+is an instance of :class:`.AdaptedConnection`, which provides a DBAPI-like
+interface to an underlying async-only driver-level connection object.
+The :meth:`.AdaptedConnection.run_async` method then provides access to an
+awaitable environment where the underlying driver level connection may be
+acted upon.
+
+.. versionadded:: 1.4.30
+
+
Using multiple asyncio event loops
----------------------------------
-An application that makes use of multiple event loops, for example by combining asyncio
-with multithreading, should not share the same :class:`_asyncio.AsyncEngine`
-with different event loops when using the default pool implementation.
+An application that makes use of multiple event loops, for example in the
+uncommon case of combining asyncio with multithreading, should not share the
+same :class:`_asyncio.AsyncEngine` with different event loops when using the
+default pool implementation.
If an :class:`_asyncio.AsyncEngine` is be passed from one event loop to another,
the method :meth:`_asyncio.AsyncEngine.dispose()` should be called before it's
from ..pool import PoolProxiedConnection
from ..sql.compiler import Compiled # noqa
from ..sql.compiler import TypeCompiler # noqa
+from ..util.concurrency import await_only
from ..util.typing import _TypeToInstance
from ..util.typing import NotRequired
from ..util.typing import Protocol
"""The connection object as returned by the driver after a connect."""
return self._connection
+ def run_async(self, fn):
+ """Run the awaitable returned by the given function, which is passed
+ the raw asyncio driver connection.
+
+ This is used to invoke awaitable-only methods on the driver connection
+ within the context of a "synchronous" method, like a connection
+ pool event handler.
+
+ E.g.::
+
+ engine = create_async_engine(...)
+
+ @event.listens_for(engine.sync_engine, "connect")
+ def register_custom_types(dbapi_connection, ...):
+ dbapi_connection.run_async(
+ lambda connection: connection.set_type_codec(
+ 'MyCustomType', encoder, decoder, ...
+ )
+ )
+
+ .. versionadded:: 1.4.30
+
+ .. seealso::
+
+ :ref:`asyncio_events_run_async`
+
+ """
+ return await_only(fn(self._connection))
+
def __repr__(self):
return "<AdaptedConnection %s>" % self._connection
import asyncio
+import inspect as stdlib_inspect
from sqlalchemy import Column
from sqlalchemy import create_engine
eq_(async_engine.driver, sync_engine.driver)
eq_(async_engine.echo, sync_engine.echo)
+ @async_test
+ async def test_run_async(self, async_engine):
+ async def test_meth(async_driver_connection):
+ # there's no method that's guaranteed to be on every
+ # driver, so just stringify it and compare that to the
+ # outside
+ return str(async_driver_connection)
+
+ def run_sync_to_async(connection):
+ connection_fairy = connection.connection
+ async_return = connection_fairy.run_async(
+ lambda driver_connection: test_meth(driver_connection)
+ )
+ assert not stdlib_inspect.iscoroutine(async_return)
+ return async_return
+
+ async with async_engine.connect() as conn:
+ driver_connection = (
+ await conn.get_raw_connection()
+ ).driver_connection
+ res = await conn.run_sync(run_sync_to_async)
+ assert not stdlib_inspect.iscoroutine(res)
+ eq_(res, str(driver_connection))
+
@async_test
async def test_engine_eq_ne(self, async_engine):
e2 = _async_engine.AsyncEngine(async_engine.sync_engine)