"__iter__",
"add",
"add_all",
- "delete",
"expire",
"expire_all",
"expunge",
)
return _result.AsyncResult(result)
+ async def delete(self, instance):
+ """Mark an instance as deleted.
+
+ The database delete operation occurs upon ``flush()``.
+
+ As this operation may need to cascade along unloaded relationships,
+ it is awaitable to allow for those queries to take place.
+
+
+ """
+ return await greenlet_spawn(self.sync_session.delete, instance)
+
async def merge(self, instance, load=True):
"""Copy the state of a given instance into a corresponding instance
within this :class:`_asyncio.AsyncSession`.
from sqlalchemy import testing
from sqlalchemy import update
from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy.orm import relationship
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import sessionmaker
from sqlalchemy.testing import async_test
eq_(await conn.scalar(select(func.count(User.id))), 1)
- async_session.delete(u1)
+ await async_session.delete(u1)
await async_session.flush()
eq_(result.all(), [])
+class AsyncCascadesTest(AsyncFixture):
+ run_inserts = None
+
+ @classmethod
+ def setup_mappers(cls):
+ User, Address = cls.classes("User", "Address")
+ users, addresses = cls.tables("users", "addresses")
+
+ cls.mapper(
+ User,
+ users,
+ properties={
+ "addresses": relationship(
+ Address, cascade="all, delete-orphan"
+ )
+ },
+ )
+ cls.mapper(
+ Address,
+ addresses,
+ )
+
+ @async_test
+ async def test_delete_w_cascade(self, async_session):
+ User = self.classes.User
+ Address = self.classes.Address
+
+ async with async_session.begin():
+ u1 = User(id=1, name="u1", addresses=[Address(email_address="e1")])
+
+ async_session.add(u1)
+
+ async with async_session.begin():
+ u1 = (await async_session.execute(select(User))).scalar_one()
+
+ await async_session.delete(u1)
+
+ eq_(
+ (
+ await async_session.execute(
+ select(func.count()).select_from(Address)
+ )
+ ).scalar(),
+ 0,
+ )
+
+
class AsyncEventTest(AsyncFixture):
"""The engine events all run in their normal synchronous context.