from ...engine import Row
from ...engine import RowMapping
from ...engine.interfaces import _CoreAnyExecuteParams
- from ...engine.interfaces import _CoreSingleExecuteParams
+ from ...engine.interfaces import _ExecuteOptions
from ...engine.result import ScalarResult
from ...orm._typing import _IdentityKeyType
from ...orm._typing import _O
return await self._proxied.commit()
- async def connection(self, **kw: Any) -> AsyncConnection:
+ async def connection(
+ self,
+ bind_arguments: Optional[_BindArguments] = None,
+ execution_options: Optional[_ExecuteOptions] = None,
+ **kw: Any,
+ ) -> AsyncConnection:
r"""Return a :class:`_asyncio.AsyncConnection` object corresponding to
this :class:`.Session` object's transactional state.
""" # noqa: E501
- return await self._proxied.connection(**kw)
+ return await self._proxied.connection(
+ bind_arguments=bind_arguments,
+ execution_options=execution_options,
+ **kw,
+ )
async def delete(self, instance: object) -> None:
r"""Mark an instance as deleted.
async def scalar(
self,
statement: TypedReturnsRows[Tuple[_T]],
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def scalar(
self,
statement: Executable,
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def scalar(
self,
statement: Executable,
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def scalars(
self,
statement: TypedReturnsRows[Tuple[_T]],
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def scalars(
self,
statement: Executable,
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def scalars(
self,
statement: Executable,
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def stream_scalars(
self,
statement: TypedReturnsRows[Tuple[_T]],
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def stream_scalars(
self,
statement: Executable,
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
async def stream_scalars(
self,
statement: Executable,
- params: Optional[_CoreSingleExecuteParams] = None,
+ params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,