from ...engine.base import Transaction
from ...exc import ArgumentError
from ...util.concurrency import greenlet_spawn
+from ...util.typing import TupleAny
from ...util.typing import TypeVarTuple
from ...util.typing import Unpack
parameters: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> GeneratorStartableContext[AsyncResult[Any]]:
+ ) -> GeneratorStartableContext[AsyncResult[Unpack[TupleAny]]]:
...
@asyncstartablecontext
parameters: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> AsyncIterator[AsyncResult[Any]]:
+ ) -> AsyncIterator[AsyncResult[Unpack[TupleAny]]]:
"""Execute a statement and return an awaitable yielding a
:class:`_asyncio.AsyncResult` object.
parameters: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[TupleAny]]:
...
async def execute(
parameters: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[TupleAny]]:
r"""Executes a SQL statement construct and return a buffered
:class:`_engine.Result`.
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[TupleAny]]:
...
@overload
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[Unpack[Tuple[Any, ...]]]:
+ ) -> Result[Unpack[TupleAny]]:
...
async def execute(
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> Result[Unpack[Tuple[Any, ...]]]:
+ ) -> Result[Unpack[TupleAny]]:
r"""Execute a statement and return a buffered
:class:`_engine.Result` object.
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> AsyncResult[Any]:
+ ) -> AsyncResult[Unpack[TupleAny]]:
...
async def stream(
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> AsyncResult[Any]:
+ ) -> AsyncResult[Unpack[TupleAny]]:
r"""Execute a statement and return a streaming
:class:`_asyncio.AsyncResult` object.
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[TupleAny]]:
...
@overload
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[Unpack[Tuple[Any, ...]]]:
+ ) -> Result[Unpack[TupleAny]]:
...
async def execute(
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> Result[Unpack[Tuple[Any, ...]]]:
+ ) -> Result[Unpack[TupleAny]]:
"""Execute a statement and return a buffered
:class:`_engine.Result` object.
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> AsyncResult[Any]:
+ ) -> AsyncResult[Unpack[TupleAny]]:
...
async def stream(
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> AsyncResult[Any]:
+ ) -> AsyncResult[Unpack[TupleAny]]:
"""Execute a statement and return a streaming
:class:`_asyncio.AsyncResult` object.
__ent6: _TCCA[_T6],
__ent7: _TCCA[_T7],
/,
- ) -> RowReturningQuery[_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7]:
+ *entities: _ColumnsClauseArgument[Any],
+ ) -> RowReturningQuery[
+ _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7, Unpack[TupleAny]
+ ]:
...
# END OVERLOADED FUNCTIONS self.query
result = await conn.execute(text("select * from table"))
- # EXPECTED_TYPE: CursorResult[Any]
+ # EXPECTED_TYPE: CursorResult[Unpack[.*tuple[Any, ...]]]
reveal_type(result)
# stream with direct await
async_result = await conn.stream(text("select * from table"))
- # EXPECTED_TYPE: AsyncResult[Any]
+ # EXPECTED_TYPE: AsyncResult[Unpack[.*tuple[Any, ...]]]
reveal_type(async_result)
# stream with context manager
async with conn.stream(
text("select * from table")
) as ctx_async_result:
- # EXPECTED_TYPE: AsyncResult[Any]
+ # EXPECTED_TYPE: AsyncResult[Unpack[.*tuple[Any, ...]]]
reveal_type(ctx_async_result)
# stream_scalars with direct await
result = await conn.execute(text("select * from table"))
- # EXPECTED_TYPE: CursorResult[Any]
+ # EXPECTED_TYPE: CursorResult[Unpack[.*tuple[Any, ...]]]
reveal_type(result)