"""
await greenlet_spawn(self._proxied.close)
+ async def aclose(self) -> None:
+ """Call the close() method of :class:`_asyncio.AsyncConnection`
+ """
+ await self.close()
+
async def exec_driver_sql(
self,
statement: str,
"""
await greenlet_spawn(self.sync_session.close)
+ async def aclose(self) -> None:
+ """Call the close() method of :class:`_asyncio.AsyncSession`
+ """
+ await self.close()
+
async def invalidate(self) -> None:
"""Close this Session, using connection invalidation.
import asyncio
+import contextlib
import inspect as stdlib_inspect
from unittest.mock import patch
with expect_raises(exc.TimeoutError):
await engine.connect()
+ @testing.requires.python310
+ @async_test
+ async def test_engine_aclose(self, async_engine):
+ users = self.tables.users
+ async with contextlib.aclosing(async_engine.connect()) as conn:
+ await conn.start()
+ trans = conn.begin()
+ await trans.start()
+ await conn.execute(delete(users))
+ await trans.commit()
+ assert conn.closed
+
@testing.requires.queue_pool
@async_test
async def test_pool_exhausted_no_timeout(self, async_engine):
"get_nested_transaction",
"in_transaction",
"in_nested_transaction",
+ "aclose",
}
SM = async_scoped_session(
from __future__ import annotations
+import contextlib
from typing import List
from typing import Optional
result = await async_session.scalar(stmt)
eq_(result, 7)
+ @testing.requires.python310
+ @async_test
+ async def test_session_aclose(self, async_session):
+ User = self.classes.User
+ u = User(name="u")
+ async with contextlib.aclosing(async_session) as session:
+ session.add(u)
+ await session.commit()
+ assert async_session.sync_session.identity_map.values() == []
+
@testing.combinations(
("scalars",), ("stream_scalars",), argnames="filter_"
)