--- /dev/null
+.. change::
+ :tags: usecase, asyncio
+ :tickets: 9731
+
+ Added a new helper mixin :class:`_asyncio.AsyncAttrs` that seeks to improve
+ the use of lazy-loader and other expired or deferred ORM attributes with
+ asyncio, providing a simple attribute accessor that provides an ``await``
+ interface to any ORM attribute, whether or not it needs to emit SQL.
+
+ .. seealso::
+
+ :class:`_asyncio.AsyncAttrs`
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import select
+ from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import selectinload
- class Base(DeclarativeBase):
+ class Base(AsyncAttrs, DeclarativeBase):
pass
id: Mapped[int] = mapped_column(primary_key=True)
data: Mapped[str]
create_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
- bs: Mapped[List[B]] = relationship(lazy="raise")
+ bs: Mapped[List[B]] = relationship()
class B(Base):
# expire_on_commit=False allows
print(a1.data)
+ # alternatively, AsyncAttrs may be used to access any attribute
+ # as an awaitable (new in 2.0.13)
+ for b1 in await a1.awaitable_attrs.bs:
+ print(b1)
+
async def async_main() -> None:
engine = create_async_engine(
IO-on-attribute access may occur. Techniques that can be used to help
this are below, many of which are illustrated in the preceding example.
+* Attributes that are lazy-loading relationships, deferred columns or
+ expressions, or are being accessed in expiration scenarios can take advantage
+ of the :class:`_asyncio.AsyncAttrs` mixin. This mixin, when added to a
+ specific class or more generally to the Declarative ``Base`` superclass,
+ provides an accessor :attr:`_asyncio.AsyncAttrs.awaitable_attrs`
+ which delivers any attribute as an awaitable::
+
+ from __future__ import annotations
+
+ from typing import List
+
+ from sqlalchemy.ext.asyncio import AsyncAttrs
+ from sqlalchemy.orm import DeclarativeBase
+ from sqlalchemy.orm import Mapped
+ from sqlalchemy.orm import relationship
+
+
+ class Base(AsyncAttrs, DeclarativeBase):
+ pass
+
+
+ class A(Base):
+ __tablename__ = "a"
+
+ # ... rest of mapping ...
+
+ bs: Mapped[List[B]] = relationship()
+
+
+ class B(Base):
+ __tablename__ = "b"
+
+ # ... rest of mapping ...
+
+ Accessing the ``A.bs`` collection on newly loaded instances of ``A`` when
+ eager loading is not in use will normally use :term:`lazy loading`, which in
+ order to succeed will usually emit IO to the database, which will fail under
+ asyncio as no implicit IO is allowed. To access this attribute directly under
+ asyncio without any prior loading operations, the attribute can be accessed
+ as an awaitable by indicating the :attr:`_asyncio.AsyncAttrs.awaitable_attrs`
+ prefix::
+
+ a1 = await (session.scalars(select(A))).one()
+ for b1 in await a1.awaitable_attrs.bs:
+ print(b1)
+
+ The :class:`_asyncio.AsyncAttrs` mixin provides a succinct facade over the
+ internal approach that's also used by the
+ :meth:`_asyncio.AsyncSession.run_sync` method.
+
+
+ .. versionadded:: 2.0.13
+
+ .. seealso::
+
+ :class:`_asyncio.AsyncAttrs`
+
+
* Collections can be replaced with **write only collections** that will never
emit IO implicitly, by using the :ref:`write_only_relationship` feature in
SQLAlchemy 2.0. Using this feature, collections are never read from, only
bullets below address specific techniques when using traditional lazy-loaded
relationships with asyncio, which requires more care.
-* If using traditional ORM relationships which are subject to lazy loading,
- relationships can be declared with ``lazy="raise"`` so that by
- default they will not attempt to emit SQL. In order to load collections,
- :term:`eager loading` must be used in all cases.
+* If not using :class:`_asyncio.AsyncAttrs`, relationships can be declared
+ with ``lazy="raise"`` so that by default they will not attempt to emit SQL.
+ In order to load collections, :term:`eager loading` would be used instead.
* The most useful eager loading strategy is the
:func:`_orm.selectinload` eager loader, which is employed in the previous
:members:
:inherited-members:
+.. autoclass:: AsyncAttrs
+ :members:
+
.. autoclass:: AsyncSession
:members:
:exclude-members: sync_session_class
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy.ext.asyncio import async_sessionmaker
+from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.future import select
-from sqlalchemy.orm import declarative_base
+from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
from sqlalchemy.orm import selectinload
-Base = declarative_base()
+
+class Base(AsyncAttrs, DeclarativeBase):
+ pass
class A(Base):
create_date: Mapped[datetime.datetime] = mapped_column(
server_default=func.now()
)
- bs: Mapped[List[B]] = relationship(lazy="raise")
+ bs: Mapped[List[B]] = relationship()
class B(Base):
result = await session.scalars(select(A).order_by(A.id))
- a1 = result.first()
+ a1 = result.one()
a1.data = "new data"
await session.commit()
+ # use the AsyncAttrs interface to accommodate for a lazy load
+ for b1 in await a1.awaitable_attrs.bs:
+ print(b1)
+
asyncio.run(async_main())
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy.ext.asyncio import async_sessionmaker
+from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.future import select
-from sqlalchemy.orm import declarative_base
+from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
from sqlalchemy.orm import WriteOnlyMapped
-Base = declarative_base()
+
+class Base(AsyncAttrs, DeclarativeBase):
+ pass
class A(Base):
from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy import String
+from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
-from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.future import select
+from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import relationship
-Base = declarative_base()
+
+class Base(AsyncAttrs, DeclarativeBase):
+ pass
class A(Base):
from .session import async_object_session as async_object_session
from .session import async_session as async_session
from .session import async_sessionmaker as async_sessionmaker
+from .session import AsyncAttrs as AsyncAttrs
from .session import AsyncSession as AsyncSession
from .session import AsyncSessionTransaction as AsyncSessionTransaction
import asyncio
from typing import Any
+from typing import Awaitable
from typing import Callable
from typing import Dict
from typing import Generic
_STREAM_OPTIONS = util.immutabledict({"stream_results": True})
+class AsyncAttrs:
+ """Mixin class which provides an awaitable accessor for all attributes.
+
+ E.g.::
+
+ from __future__ import annotations
+
+ from typing import List
+
+ from sqlalchemy import ForeignKey
+ from sqlalchemy import func
+ from sqlalchemy.ext.asyncio import AsyncAttrs
+ from sqlalchemy.orm import DeclarativeBase
+ from sqlalchemy.orm import Mapped
+ from sqlalchemy.orm import mapped_column
+ from sqlalchemy.orm import relationship
+
+
+ class Base(AsyncAttrs, DeclarativeBase):
+ pass
+
+
+ class A(Base):
+ __tablename__ = "a"
+
+ id: Mapped[int] = mapped_column(primary_key=True)
+ data: Mapped[str]
+ bs: Mapped[List[B]] = relationship()
+
+
+ class B(Base):
+ __tablename__ = "b"
+ id: Mapped[int] = mapped_column(primary_key=True)
+ a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
+ data: Mapped[str]
+
+ In the above example, the :class:`_asyncio.AsyncAttrs` mixin is applied to
+ the declarative ``Base`` class where it takes effect for all subclasses.
+ This mixin adds a single new attribute
+ :attr:`_asyncio.AsyncAttrs.awaitable_attrs` to all classes, which will
+ yield the value of any attribute as an awaitable. This allows attributes
+ which may be subject to lazy loading or deferred / unexpiry loading to be
+ accessed such that IO can still be emitted::
+
+ a1 = (await async_session.scalars(select(A).where(A.id == 5))).one()
+
+ # use the lazy loader on ``a1.bs`` via the ``.async_attrs``
+ # interface, so that it may be awaited
+ for b1 in await a1.async_attrs.bs:
+ print(b1)
+
+ The :attr:`_asyncio.AsyncAttrs.awaitable_attrs` performs a call against the
+ attribute that is approximately equivalent to using the
+ :meth:`_asyncio.AsyncSession.run_sync` method, e.g.::
+
+ for b1 in await async_session.run_sync(lambda sess: a1.bs):
+ print(b1)
+
+ .. versionadded:: 2.0.13
+
+ .. seealso::
+
+ :ref:`asyncio_orm_avoid_lazyloads`
+
+ """
+
+ class _AsyncAttrGetitem:
+ __slots__ = "_instance"
+
+ def __init__(self, _instance: Any):
+ self._instance = _instance
+
+ def __getattr__(self, name: str) -> Awaitable[Any]:
+ return greenlet_spawn(getattr, self._instance, name)
+
+ @property
+ def awaitable_attrs(self) -> AsyncAttrs._AsyncAttrGetitem:
+ """provide a namespace of all attributes on this object wrapped
+ as awaitables.
+
+ e.g.::
+
+
+ a1 = (await async_session.scalars(select(A).where(A.id == 5))).one()
+
+ some_attribute = await a1.async_attrs.some_deferred_attribute
+ some_collection = await a1.async_attrs.some_collection
+
+ """ # noqa: E501
+
+ return AsyncAttrs._AsyncAttrGetitem(self)
+
+
@util.create_proxy_methods(
Session,
":class:`_orm.Session`",
to the database connection by running the given callable in a
specially instrumented greenlet.
- .. note::
+ .. tip::
The provided callable is invoked inline within the asyncio event
loop, and will block on traditional IO calls. IO within this
.. seealso::
+ :class:`.AsyncAttrs` - a mixin for ORM mapped classes that provides
+ a similar feature more succinctly on a per-attribute basis
+
:meth:`.AsyncConnection.run_sync`
:ref:`session_run_sync`
+from __future__ import annotations
+
+from typing import List
+from typing import Optional
+
from sqlalchemy import Column
from sqlalchemy import event
from sqlalchemy import exc
from sqlalchemy import ForeignKey
from sqlalchemy import func
+from sqlalchemy import Identity
from sqlalchemy import inspect
from sqlalchemy import Integer
from sqlalchemy import select
from sqlalchemy import Sequence
+from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import testing
from sqlalchemy import update
from sqlalchemy.ext.asyncio import async_object_session
from sqlalchemy.ext.asyncio import async_sessionmaker
+from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import exc as async_exc
from sqlalchemy.ext.asyncio.base import ReversibleProxy
+from sqlalchemy.orm import DeclarativeBase
+from sqlalchemy.orm import Mapped
+from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import Session
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_raises_message
+from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
def async_engine(self):
return engines.testing_engine(asyncio=True, transfer_staticpool=True)
+ # TODO: this seems to cause deadlocks in
+ # OverrideSyncSession for some reason
+ # @testing.fixture
+ # def async_engine(self, async_testing_engine):
+ # return async_testing_engine(transfer_staticpool=True)
+
@testing.fixture
def async_session(self, async_engine):
return AsyncSession(async_engine)
is_true(not isinstance(ass.sync_session, _MySession))
is_(ass.sync_session_class, Session)
+
+
+class AsyncAttrsTest(
+ testing.AssertsExecutionResults, _AsyncFixture, fixtures.TestBase
+):
+ __requires__ = ("async_dialect",)
+
+ @config.fixture
+ def decl_base(self, metadata):
+ _md = metadata
+
+ class Base(fixtures.ComparableEntity, AsyncAttrs, DeclarativeBase):
+ metadata = _md
+ type_annotation_map = {
+ str: String().with_variant(
+ String(50), "mysql", "mariadb", "oracle"
+ )
+ }
+
+ yield Base
+ Base.registry.dispose()
+
+ @testing.fixture
+ def async_engine(self, async_testing_engine):
+ yield async_testing_engine(transfer_staticpool=True)
+
+ @testing.fixture
+ def ab_fixture(self, decl_base):
+ class A(decl_base):
+ __tablename__ = "a"
+
+ id: Mapped[int] = mapped_column(Identity(), primary_key=True)
+ data: Mapped[Optional[str]]
+ bs: Mapped[List[B]] = relationship(order_by=lambda: B.id)
+
+ class B(decl_base):
+ __tablename__ = "b"
+ id: Mapped[int] = mapped_column(Identity(), primary_key=True)
+ a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
+ data: Mapped[Optional[str]]
+
+ decl_base.metadata.create_all(testing.db)
+
+ return A, B
+
+ @async_test
+ async def test_lazyloaders(self, async_engine, ab_fixture):
+ A, B = ab_fixture
+
+ async with AsyncSession(async_engine) as session:
+ b1, b2, b3 = B(data="b1"), B(data="b2"), B(data="b3")
+ a1 = A(data="a1", bs=[b1, b2, b3])
+ session.add(a1)
+
+ await session.commit()
+
+ assert inspect(a1).expired
+
+ with self.assert_statement_count(async_engine.sync_engine, 1):
+ eq_(await a1.awaitable_attrs.data, "a1")
+
+ with self.assert_statement_count(async_engine.sync_engine, 1):
+ eq_(await a1.awaitable_attrs.bs, [b1, b2, b3])
+
+ # now it's loaded, lazy loading not used anymore
+ eq_(a1.bs, [b1, b2, b3])
+
+ @async_test
+ async def test_it_didnt_load_but_is_ok(self, async_engine, ab_fixture):
+ A, B = ab_fixture
+
+ async with AsyncSession(async_engine) as session:
+ b1, b2, b3 = B(data="b1"), B(data="b2"), B(data="b3")
+ a1 = A(data="a1", bs=[b1, b2, b3])
+ session.add(a1)
+
+ await session.commit()
+
+ async with AsyncSession(async_engine) as session:
+ a1 = (
+ await session.scalars(select(A).options(selectinload(A.bs)))
+ ).one()
+
+ with self.assert_statement_count(async_engine.sync_engine, 0):
+ eq_(await a1.awaitable_attrs.bs, [b1, b2, b3])
+
+ @async_test
+ async def test_the_famous_lazyloader_gotcha(
+ self, async_engine, ab_fixture
+ ):
+ A, B = ab_fixture
+
+ async with AsyncSession(async_engine) as session:
+ a1 = A(data="a1")
+ session.add(a1)
+
+ await session.flush()
+
+ with self.assert_statement_count(async_engine.sync_engine, 1):
+ eq_(await a1.awaitable_attrs.bs, [])