from ...engine import Row
from ...engine import RowMapping
from ...engine.interfaces import _CoreAnyExecuteParams
- from ...engine.interfaces import _CoreSingleExecuteParams
+ from ...engine.interfaces import _ExecuteOptions
from ...engine.result import ScalarResult
from ...orm._typing import _IdentityKeyType
from ...orm._typing import _O
return await self._proxied.commit()
- async def connection(self, **kw: Any) -> AsyncConnection:
+ async def connection(
+ self,
+ bind_arguments: Optional[_BindArguments] = None,
+ execution_options: Optional[_ExecuteOptions] = None,
+ **kw: Any,
+ ) -> AsyncConnection:
r"""Return a :class:`_asyncio.AsyncConnection` object corresponding to
this :class:`.Session` object's transactional state.
""" # noqa: E501
- return await self._proxied.connection(**kw)
+ return await self._proxied.connection(
+ bind_arguments=bind_arguments,
+ execution_options=execution_options,
+ **kw,
+ )
async def delete(self, instance: object) -> None:
r"""Mark an instance as deleted.
async def scalar(
self,
statement: TypedReturnsRows[Tuple[_T]],
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def scalar(
self,
statement: Executable,
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def scalar(
self,
statement: Executable,
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def scalars(
self,
statement: TypedReturnsRows[Tuple[_T]],
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def scalars(
self,
statement: Executable,
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def scalars(
self,
statement: Executable,
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def stream_scalars(
self,
statement: TypedReturnsRows[Tuple[_T]],
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def stream_scalars(
self,
statement: Executable,
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def stream_scalars(
self,
statement: Executable,
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
from ...engine import RowMapping
from ...engine import ScalarResult
from ...engine.interfaces import _CoreAnyExecuteParams
- from ...engine.interfaces import _CoreSingleExecuteParams
+ from ...engine.interfaces import _ExecuteOptions
from ...event import dispatcher
from ...orm._typing import _IdentityKeyType
from ...orm._typing import _O
async def scalar(
self,
statement: TypedReturnsRows[Tuple[_T]],
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def scalar(
self,
statement: Executable,
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def scalar(
self,
statement: Executable,
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def scalars(
self,
statement: TypedReturnsRows[Tuple[_T]],
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def scalars(
self,
statement: Executable,
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def scalars(
self,
statement: Executable,
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def stream_scalars(
self,
statement: TypedReturnsRows[Tuple[_T]],
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def stream_scalars(
self,
statement: Executable,
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def stream_scalars(
self,
statement: Executable,
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
mapper=mapper, clause=clause, bind=bind, **kw
)
- async def connection(self, **kw: Any) -> AsyncConnection:
+ async def connection(
+ self,
+ bind_arguments: Optional[_BindArguments] = None,
+ execution_options: Optional[_ExecuteOptions] = None,
+ **kw: Any,
+ ) -> AsyncConnection:
r"""Return a :class:`_asyncio.AsyncConnection` object corresponding to
this :class:`.Session` object's transactional state.
"""
sync_connection = await greenlet_spawn(
- self.sync_session.connection, **kw
+ self.sync_session.connection,
+ bind_arguments=bind_arguments,
+ execution_options=execution_options,
+ **kw,
)
return engine.AsyncConnection._retrieve_proxy_for_target(
sync_connection