)
from sqlalchemy import util
+from sqlalchemy.engine.cursor import CursorResult
from sqlalchemy.engine.interfaces import _CoreAnyExecuteParams
from sqlalchemy.engine.result import Result, ScalarResult, TupleResult
from sqlalchemy.ext.asyncio import AsyncSession as _AsyncSession
from sqlalchemy.ext.asyncio.session import _EXECUTE_OPTIONS
from sqlalchemy.orm._typing import OrmExecuteOptionsParameter
from sqlalchemy.sql.base import Executable as _Executable
+from sqlalchemy.sql.dml import UpdateBase
from sqlalchemy.util.concurrency import greenlet_spawn
from typing_extensions import deprecated
_add_event: Optional[Any] = None,
) -> ScalarResult[_TSelectParam]: ...
+ @overload
+ async def exec(
+ self,
+ statement: UpdateBase,
+ *,
+ params: Optional[Union[Mapping[str, Any], Sequence[Mapping[str, Any]]]] = None,
+ execution_options: Mapping[str, Any] = util.EMPTY_DICT,
+ bind_arguments: Optional[Dict[str, Any]] = None,
+ _parent_execute_state: Optional[Any] = None,
+ _add_event: Optional[Any] = None,
+ ) -> CursorResult[Any]: ...
+
async def exec(
self,
statement: Union[
Select[_TSelectParam],
SelectOfScalar[_TSelectParam],
Executable[_TSelectParam],
+ UpdateBase,
],
*,
params: Optional[Union[Mapping[str, Any], Sequence[Mapping[str, Any]]]] = None,
bind_arguments: Optional[Dict[str, Any]] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Union[TupleResult[_TSelectParam], ScalarResult[_TSelectParam]]:
+ ) -> Union[
+ TupleResult[_TSelectParam], ScalarResult[_TSelectParam], CursorResult[Any]
+ ]:
if execution_options:
execution_options = util.immutabledict(execution_options).union(
_EXECUTE_OPTIONS
)
from sqlalchemy import util
+from sqlalchemy.engine.cursor import CursorResult
from sqlalchemy.engine.interfaces import _CoreAnyExecuteParams
from sqlalchemy.engine.result import Result, ScalarResult, TupleResult
from sqlalchemy.orm import Query as _Query
from sqlalchemy.orm._typing import OrmExecuteOptionsParameter
from sqlalchemy.sql._typing import _ColumnsClauseArgument
from sqlalchemy.sql.base import Executable as _Executable
+from sqlalchemy.sql.dml import UpdateBase
from sqlmodel.sql.base import Executable
from sqlmodel.sql.expression import Select, SelectOfScalar
from typing_extensions import deprecated
_add_event: Optional[Any] = None,
) -> ScalarResult[_TSelectParam]: ...
+ @overload
+ def exec(
+ self,
+ statement: UpdateBase,
+ *,
+ params: Optional[Union[Mapping[str, Any], Sequence[Mapping[str, Any]]]] = None,
+ execution_options: Mapping[str, Any] = util.EMPTY_DICT,
+ bind_arguments: Optional[Dict[str, Any]] = None,
+ _parent_execute_state: Optional[Any] = None,
+ _add_event: Optional[Any] = None,
+ ) -> CursorResult[Any]: ...
+
def exec(
self,
statement: Union[
Select[_TSelectParam],
SelectOfScalar[_TSelectParam],
Executable[_TSelectParam],
+ UpdateBase,
],
*,
params: Optional[Union[Mapping[str, Any], Sequence[Mapping[str, Any]]]] = None,
bind_arguments: Optional[Dict[str, Any]] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Union[TupleResult[_TSelectParam], ScalarResult[_TSelectParam]]:
+ ) -> Union[
+ TupleResult[_TSelectParam], ScalarResult[_TSelectParam], CursorResult[Any]
+ ]:
results = super().execute(
statement,
params=params,