from .. import util
from ..sql import compiler
from ..sql import util as sql_util
+from ..util.typing import TypeVarTuple
+from ..util.typing import Unpack
if typing.TYPE_CHECKING:
from . import CursorResult
_T = TypeVar("_T", bound=Any)
+_Ts = TypeVarTuple("_Ts")
_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.EMPTY_DICT
NO_OPTIONS: Mapping[str, Any] = util.EMPTY_DICT
@overload
def execute(
self,
- statement: TypedReturnsRows[_T],
+ statement: TypedReturnsRows[Tuple[Unpack[_Ts]]],
parameters: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> CursorResult[_T]:
+ ) -> CursorResult[Unpack[_Ts]]:
...
@overload
parameters: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[Tuple[Any, ...]]]:
...
def execute(
parameters: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[Tuple[Any, ...]]]:
r"""Executes a SQL statement construct and returns a
:class:`_engine.CursorResult`.
func: FunctionElement[Any],
distilled_parameters: _CoreMultiExecuteParams,
execution_options: CoreExecuteOptionsParameter,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[Tuple[Any, ...]]]:
"""Execute a sql.FunctionElement object."""
return self._execute_clauseelement(
ddl: ExecutableDDLElement,
distilled_parameters: _CoreMultiExecuteParams,
execution_options: CoreExecuteOptionsParameter,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[Tuple[Any, ...]]]:
"""Execute a schema.DDL object."""
execution_options = ddl._execution_options.merge_with(
elem: Executable,
distilled_parameters: _CoreMultiExecuteParams,
execution_options: CoreExecuteOptionsParameter,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[Tuple[Any, ...]]]:
"""Execute a sql.ClauseElement object."""
execution_options = elem._execution_options.merge_with(
compiled: Compiled,
distilled_parameters: _CoreMultiExecuteParams,
execution_options: CoreExecuteOptionsParameter = _EMPTY_EXECUTION_OPTS,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[Tuple[Any, ...]]]:
"""Execute a sql.Compiled object.
TODO: why do we have this? likely deprecate or remove
statement: str,
parameters: Optional[_DBAPIAnyExecuteParams] = None,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[Tuple[Any, ...]]]:
r"""Executes a string SQL statement on the DBAPI cursor directly,
without any SQL compilation steps.
execution_options: _ExecuteOptions,
*args: Any,
**kw: Any,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[Tuple[Any, ...]]]:
"""Create an :class:`.ExecutionContext` and execute, returning
a :class:`_engine.CursorResult`."""
context: ExecutionContext,
statement: Union[str, Compiled],
parameters: Optional[_AnyMultiExecuteParams],
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[Tuple[Any, ...]]]:
"""continue the _execute_context() method for a single DBAPI
cursor.execute() or cursor.executemany() call.
self,
dialect: Dialect,
context: ExecutionContext,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[Tuple[Any, ...]]]:
"""continue the _execute_context() method for an "insertmanyvalues"
operation, which will invoke DBAPI
cursor.execute() one or more times with individual log and
class ResultInternal(InPlaceGenerative, Generic[_R]):
__slots__ = ()
- _real_result: Optional[Result[Any]] = None
+ _real_result: Optional[Result[Unpack[Tuple[Any, ...]]]] = None
_generate_rows: bool = True
_row_logging_fn: Optional[Callable[[Any], Any]]
@HasMemoized_ro_memoized_attribute
def _row_getter(self) -> Optional[Callable[..., _R]]:
- real_result: Result[Any] = (
+ real_result: Result[Unpack[Tuple[Any, ...]]] = (
self._real_result
if self._real_result
- else cast("Result[Any]", self)
+ else cast("Result[Unpack[Tuple[Any, ...]]]", self)
)
if real_result._source_supports_scalars:
if self._unique_filter_state:
uniques, strategy = self._unique_strategy
- def iterrows(self: Result[Any]) -> Iterator[_R]:
+ def iterrows(
+ self: Result[Unpack[Tuple[Any, ...]]]
+ ) -> Iterator[_R]:
for raw_row in self._fetchiter_impl():
obj: _InterimRowType[Any] = (
make_row(raw_row) if make_row else raw_row
else:
- def iterrows(self: Result[Any]) -> Iterator[_R]:
+ def iterrows(
+ self: Result[Unpack[Tuple[Any, ...]]]
+ ) -> Iterator[_R]:
for raw_row in self._fetchiter_impl():
row: _InterimRowType[Any] = (
make_row(raw_row) if make_row else raw_row
if self._unique_filter_state:
uniques, strategy = self._unique_strategy
- def onerow(self: Result[Any]) -> Union[_NoRow, _R]:
+ def onerow(
+ self: Result[Unpack[Tuple[Any, ...]]]
+ ) -> Union[_NoRow, _R]:
_onerow = self._fetchone_impl
while True:
row = _onerow()
else:
- def onerow(self: Result[Any]) -> Union[_NoRow, _R]:
+ def onerow(
+ self: Result[Unpack[Tuple[Any, ...]]]
+ ) -> Union[_NoRow, _R]:
row = self._fetchone_impl()
if row is None:
return _NO_ROW
real_result = (
self._real_result
if self._real_result
- else cast("Result[Any]", self)
+ else cast("Result[Unpack[Tuple[Any, ...]]]", self)
)
if real_result._yield_per:
num_required = num = real_result._yield_per
real_result = (
self._real_result
if self._real_result
- else cast("Result[Any]", self)
+ else cast("Result[Unpack[Tuple[Any, ...]]]", self)
)
num = real_result._yield_per
real_result = (
self._real_result
if self._real_result
- else cast("Result[Any]", self)
+ else cast("Result[Unpack[Tuple[Any, ...]]]", self)
)
if not real_result._source_supports_scalars or len(indexes) != 1:
real_result = (
self._real_result
if self._real_result is not None
- else cast("Result[Any]", self)
+ else cast("Result[Unpack[Tuple[Any, ...]]]", self)
)
if not strategy and self._metadata._unique_filters:
_post_creational_filter: Optional[Callable[[Any], Any]]
- _real_result: Result[Any]
+ _real_result: Result[Unpack[Tuple[Any, ...]]]
def __enter__(self) -> Self:
return self
from ...util.concurrency import greenlet_spawn
from ...util.typing import Literal
from ...util.typing import Self
+from ...util.typing import TypeVarTuple
+from ...util.typing import Unpack
if TYPE_CHECKING:
from ...engine import CursorResult
_T = TypeVar("_T", bound=Any)
_TP = TypeVar("_TP", bound=Tuple[Any, ...])
+_Ts = TypeVarTuple("_Ts")
class AsyncCommon(FilterResult[_R]):
__slots__ = ()
- _real_result: Result[Any]
+ _real_result: Result[Unpack[Tuple[Any, ...]]]
_metadata: ResultMetaData
async def close(self) -> None: # type: ignore[override]
return self._real_result.closed
-class AsyncResult(_WithKeys, AsyncCommon[Row[_TP]]):
+class AsyncResult(_WithKeys, AsyncCommon[Row[Unpack[_Ts]]]):
"""An asyncio wrapper around a :class:`_result.Result` object.
The :class:`_asyncio.AsyncResult` only applies to statement executions that
__slots__ = ()
- _real_result: Result[_TP]
+ _real_result: Result[Unpack[_Ts]]
- def __init__(self, real_result: Result[_TP]):
+ def __init__(self, real_result: Result[Unpack[_Ts]]):
self._real_result = real_result
self._metadata = real_result._metadata
)
@property
- def t(self) -> AsyncTupleResult[_TP]:
+ def t(self) -> AsyncTupleResult[Tuple[Unpack[_Ts]]]:
"""Apply a "typed tuple" typing filter to returned rows.
The :attr:`_asyncio.AsyncResult.t` attribute is a synonym for
"""
return self # type: ignore
- def tuples(self) -> AsyncTupleResult[_TP]:
+ def tuples(self) -> AsyncTupleResult[Tuple[Unpack[_Ts]]]:
"""Apply a "typed tuple" typing filter to returned rows.
This method returns the same :class:`_asyncio.AsyncResult` object
async def partitions(
self, size: Optional[int] = None
- ) -> AsyncIterator[Sequence[Row[_TP]]]:
+ ) -> AsyncIterator[Sequence[Row[Unpack[_Ts]]]]:
"""Iterate through sub-lists of rows of the size given.
An async iterator is returned::
else:
break
- async def fetchall(self) -> Sequence[Row[_TP]]:
+ async def fetchall(self) -> Sequence[Row[Unpack[_Ts]]]:
"""A synonym for the :meth:`_asyncio.AsyncResult.all` method.
.. versionadded:: 2.0
return await greenlet_spawn(self._allrows)
- async def fetchone(self) -> Optional[Row[_TP]]:
+ async def fetchone(self) -> Optional[Row[Unpack[_Ts]]]:
"""Fetch one row.
When all rows are exhausted, returns None.
async def fetchmany(
self, size: Optional[int] = None
- ) -> Sequence[Row[_TP]]:
+ ) -> Sequence[Row[Unpack[_Ts]]]:
"""Fetch many rows.
When all rows are exhausted, returns an empty list.
return await greenlet_spawn(self._manyrow_getter, self, size)
- async def all(self) -> Sequence[Row[_TP]]:
+ async def all(self) -> Sequence[Row[Unpack[_Ts]]]:
"""Return all rows in a list.
Closes the result set after invocation. Subsequent invocations
return await greenlet_spawn(self._allrows)
- def __aiter__(self) -> AsyncResult[_TP]:
+ def __aiter__(self) -> AsyncResult[Unpack[_Ts]]:
return self
- async def __anext__(self) -> Row[_TP]:
+ async def __anext__(self) -> Row[Unpack[_Ts]]:
row = await greenlet_spawn(self._onerow_getter, self)
if row is _NO_ROW:
raise StopAsyncIteration()
else:
return row
- async def first(self) -> Optional[Row[_TP]]:
+ async def first(self) -> Optional[Row[Unpack[_Ts]]]:
"""Fetch the first row or ``None`` if no row is present.
Closes the result set and discards remaining rows.
"""
return await greenlet_spawn(self._only_one_row, False, False, False)
- async def one_or_none(self) -> Optional[Row[_TP]]:
+ async def one_or_none(self) -> Optional[Row[Unpack[_Ts]]]:
"""Return at most one result or raise an exception.
Returns ``None`` if the result has no rows.
"""
return await greenlet_spawn(self._only_one_row, True, False, True)
- async def one(self) -> Row[_TP]:
+ async def one(self) -> Row[Unpack[_Ts]]:
"""Return exactly one row or raise an exception.
Raises :class:`.NoResultFound` if the result returns no
"""
return await greenlet_spawn(self._only_one_row, False, False, True)
- async def freeze(self) -> FrozenResult[_TP]:
+ async def freeze(self) -> FrozenResult[Tuple[Unpack[_Ts]]]:
"""Return a callable object that will produce copies of this
:class:`_asyncio.AsyncResult` when invoked.
_generate_rows = False
- def __init__(self, real_result: Result[Any], index: _KeyIndexType):
+ def __init__(
+ self,
+ real_result: Result[Unpack[Tuple[Any, ...]]],
+ index: _KeyIndexType,
+ ):
self._real_result = real_result
if real_result._source_supports_scalars:
_post_creational_filter = operator.attrgetter("_mapping")
- def __init__(self, result: Result[Any]):
+ def __init__(self, result: Result[Unpack[Tuple[Any, ...]]]):
self._real_result = result
self._unique_filter_state = result._unique_filter_state
self._metadata = result._metadata
...
-_RT = TypeVar("_RT", bound="Result[Any]")
+_RT = TypeVar("_RT", bound="Result[Unpack[Tuple[Any, ...]]]")
async def _ensure_sync_result(result: _RT, calling_method: Any) -> _RT:
from ...util import ScopedRegistry
from ...util import warn
from ...util import warn_deprecated
+from ...util.typing import Unpack
if TYPE_CHECKING:
from .engine import AsyncConnection
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[Any]:
+ ) -> Result[Unpack[Tuple[Any, ...]]]:
...
async def execute(
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> Result[Any]:
+ ) -> Result[Unpack[Tuple[Any, ...]]]:
r"""Execute a statement and return a buffered
:class:`_engine.Result` object.
from ...orm import SessionTransaction
from ...orm import state as _instance_state
from ...util.concurrency import greenlet_spawn
+from ...util.typing import Unpack
if TYPE_CHECKING:
from .engine import AsyncConnection
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[Any]:
+ ) -> Result[Unpack[Tuple[Any, ...]]]:
...
async def execute(
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> Result[Any]:
+ ) -> Result[Unpack[Tuple[Any, ...]]]:
"""Execute a statement and return a buffered
:class:`_engine.Result` object.
from typing import Iterable
from typing import Optional
from typing import overload
+from typing import Tuple
from typing import TYPE_CHECKING
from typing import TypeVar
from typing import Union
from ..sql.dml import UpdateDMLState
from ..util import EMPTY_DICT
from ..util.typing import Literal
+from ..util.typing import Unpack
if TYPE_CHECKING:
from ._typing import DMLStrategyArgument
update_changed_only: bool,
use_orm_update_stmt: Optional[dml.Update] = ...,
enable_check_rowcount: bool = True,
-) -> _result.Result[Any]:
+) -> _result.Result[Unpack[Tuple[Any, ...]]]:
...
update_changed_only: bool,
use_orm_update_stmt: Optional[dml.Update] = None,
enable_check_rowcount: bool = True,
-) -> Optional[_result.Result[Any]]:
+) -> Optional[_result.Result[Unpack[Tuple[Any, ...]]]]:
base_mapper = mapper.base_mapper
search_keys = mapper._primary_key_propkeys
"are 'raw', 'orm', 'bulk', 'auto"
)
- result: _result.Result[Any]
+ result: _result.Result[Unpack[Tuple[Any, ...]]]
if insert_options._dml_strategy == "raw":
result = conn.execute(
"are 'orm', 'auto', 'bulk', 'core_only'"
)
- result: _result.Result[Any]
+ result: _result.Result[Unpack[Tuple[Any, ...]]]
if update_options._dml_strategy == "bulk":
enable_check_rowcount = not statement._where_criteria
from ..util import warn_deprecated
from ..util.typing import RODescriptorReference
from ..util.typing import TypedDict
+from ..util.typing import Unpack
+
if typing.TYPE_CHECKING:
from ._typing import _EntityType
query_entity: _MapperEntity,
path: AbstractEntityRegistry,
mapper: Mapper[Any],
- result: Result[Any],
+ result: Result[Unpack[Tuple[Any, ...]]],
adapter: Optional[ORMAdapter],
populators: _PopulatorDict,
) -> None:
query_entity: _MapperEntity,
path: AbstractEntityRegistry,
mapper: Mapper[Any],
- result: Result[Any],
+ result: Result[Unpack[Tuple[Any, ...]]],
adapter: Optional[ORMAdapter],
populators: _PopulatorDict,
) -> None:
path: AbstractEntityRegistry,
loadopt: Optional[_LoadElement],
mapper: Mapper[Any],
- result: Result[Any],
+ result: Result[Unpack[Tuple[Any, ...]]],
adapter: Optional[ORMAdapter],
populators: _PopulatorDict,
) -> None:
from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
from ..sql.selectable import SelectState
from ..util import EMPTY_DICT
+from ..util.typing import Unpack
+
if TYPE_CHECKING:
from ._typing import _IdentityKeyType
_PopulatorDict = Dict[str, List[Tuple[str, Any]]]
-def instances(cursor: CursorResult[Any], context: QueryContext) -> Result[Any]:
+def instances(
+ cursor: CursorResult[Unpack[Tuple[Any, ...]]], context: QueryContext
+) -> Result[Unpack[Tuple[Any, ...]]]:
"""Return a :class:`.Result` given an ORM query context.
:param cursor: a :class:`.CursorResult`, generated by a statement
from ..util import warn
from ..util import warn_deprecated
from ..util.typing import Protocol
+from ..util.typing import Unpack
if TYPE_CHECKING:
from ._typing import _EntityType
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[Any]:
+ ) -> Result[Unpack[Tuple[Any, ...]]]:
...
def execute(
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[Any]:
+ ) -> Result[Unpack[Tuple[Any, ...]]]:
r"""Execute a SQL expression construct.
.. container:: class_bases
from ..util import IdentitySet
from ..util.typing import Literal
from ..util.typing import Protocol
+from ..util.typing import TypeVarTuple
+from ..util.typing import Unpack
+
if typing.TYPE_CHECKING:
from ._typing import _EntityType
from ..sql.selectable import TypedReturnsRows
_T = TypeVar("_T", bound=Any)
+_Ts = TypeVarTuple("_Ts")
__all__ = [
"Session",
params: Optional[_CoreAnyExecuteParams] = None,
execution_options: Optional[OrmExecuteOptionsParameter] = None,
bind_arguments: Optional[_BindArguments] = None,
- ) -> Result[Any]:
+ ) -> Result[Unpack[Tuple[Any, ...]]]:
"""Execute the statement represented by this
:class:`.ORMExecuteState`, without re-invoking events that have
already proceeded.
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
_scalar_result: bool = ...,
- ) -> Result[Any]:
+ ) -> Result[Unpack[Tuple[Any, ...]]]:
...
def _execute_internal(
)
for idx, fn in enumerate(events_todo):
orm_exec_state._starting_event_idx = idx
- fn_result: Optional[Result[Any]] = fn(orm_exec_state)
+ fn_result: Optional[Result[Unpack[Tuple[Any, ...]]]] = fn(
+ orm_exec_state
+ )
if fn_result:
if _scalar_result:
return fn_result.scalar()
)
if compile_state_cls:
- result: Result[Any] = compile_state_cls.orm_execute_statement(
+ result: Result[
+ Unpack[Tuple[Any, ...]]
+ ] = compile_state_cls.orm_execute_statement(
self,
statement,
params or {},
@overload
def execute(
self,
- statement: TypedReturnsRows[_T],
+ statement: TypedReturnsRows[Tuple[Unpack[_Ts]]],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[_T]:
+ ) -> Result[Unpack[_Ts]]:
...
@overload
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[Tuple[Any, ...]]]:
...
@overload
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[Any]:
+ ) -> Result[Unpack[Tuple[Any, ...]]]:
...
def execute(
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[Any]:
+ ) -> Result[Unpack[Tuple[Any, ...]]]:
r"""Execute a SQL expression construct.
Returns a :class:`_engine.Result` object representing
from ..util import TypingOnly
from ..util.typing import Literal
from ..util.typing import Self
+from ..util.typing import Unpack
if typing.TYPE_CHECKING:
from ._typing import _ColumnExpressionArgument
connection: Connection,
distilled_params: _CoreMultiExecuteParams,
execution_options: CoreExecuteOptionsParameter,
- ) -> Result[Any]:
+ ) -> Result[Unpack[typing_Tuple[Any, ...]]]:
if self.supports_execution:
if TYPE_CHECKING:
assert isinstance(self, Executable)