version 2.1.
+.. _change_10635:
+
+``Row`` now represents individual column types directly without ``Tuple``
+--------------------------------------------------------------------------
+
+SQLAlchemy 2.0 implemented a broad array of :pep:`484` typing throughout
+all components, including a new ability for row-returning statements such
+as :func:`_sql.select` to maintain track of individual column types, which
+were then passed through the execution phase onto the :class:`_engine.Result`
+object and then to the individual :class:`_engine.Row` objects. Described
+at :ref:`change_result_typing_20`, this approach solved several issues
+with statement / row typing, but some remained unsolvable. In 2.1, one
+of those issues, that the individual column types needed to be packaged
+into a ``typing.Tuple``, is now resolved using new :pep:`646` integration,
+which allows for tuple-like types that are not actually typed as ``Tuple``.
+
+In SQLAlchemy 2.0, a statement such as::
+
+ stmt = select(column("x", Integer), column("y", String))
+
+Would be typed as::
+
+ Select[Tuple[int, str]]
+
+In 2.1, it's now typed as::
+
+ Select[int, str]
+
+When executing ``stmt``, the :class:`_engine.Result` and :class:`_engine.Row`
+objects will be typed as ``Result[int, str]`` and ``Row[int, str]``, respectively.
+The prior workaround using :attr:`_engine.Row._t` to type as a real ``Tuple``
+is no longer needed and projects can migrate off this pattern.
+
+Mypy users will need to make use of **Mypy 1.7 or greater** for pep-646
+integration to be available.
+
+Limitations
+^^^^^^^^^^^
+
+Not yet solved by pep-646 or any other pep is the ability for an arbitrary
+number of expressions within :class:`_sql.Select` and others to be mapped to
+row objects, without stating each argument position explicitly within typing
+annotations. To work around this issue, SQLAlchemy makes use of automated
+"stub generation" tools to generate hardcoded mappings of different numbers of
+positional arguments to constructs like :func:`_sql.select` to resolve to
+individual ``Unpack[]`` expressions (in SQLAlchemy 2.0, this generation
+prodcued ``Tuple[]`` annotations instead). This means that there are arbitrary
+limits on how many specific column expressions will be typed within the
+:class:`_engine.Row` object, without restoring to ``Any`` for remaining
+expressions; for :func:`_sql.select`, it's currently ten expressions, and
+for DML expresions like :func:`_dml.insert` that use :meth:`_dml.Insert.returning`,
+it's eight. If and when a new pep that provides a ``Map`` operator
+to pep-646 is proposed, this limitation can be lifted. [1]_ Originally, it was
+mistakenly assumed that this limitation prevented pep-646 from being usable at all,
+however, the ``Unpack`` construct does in fact replace everything that
+was done using ``Tuple`` in 2.0.
+
+An additional limitation for which there is no proposed solution is that
+there's no way for the name-based attributes on :class:`_engine.Row` to be
+automatically typed, so these continue to be typed as ``Any`` (e.g. ``row.x``
+and ``row.y`` for the above example). With current language features,
+this could only be fixed by having an explicit class-based construct that
+allows one to compose an explicit :class:`_engine.Row` with explicit fields
+up front, which would be verbose and not automatic.
+
+.. [1] https://github.com/python/typing/discussions/1001#discussioncomment-1897813
+
+:ticket:`10635`
+
+
.. _change_10197:
Asyncio "greenlet" dependency no longer installs by default
be imported only when the asyncio extension is first imported.
Alternatively, the ``greenlet`` library is still imported lazily on
first use to support use case that don't make direct use of the
- SQLAlchemy asyncio extension.
\ No newline at end of file
+ SQLAlchemy asyncio extension.
--- /dev/null
+.. change::
+ :tags: typing, feature
+ :tickets: 10635
+
+ The :class:`.Row` object now no longer makes use of an intermediary
+ ``Tuple`` in order to represent its individual element types; instead,
+ the individual element types are present directly, via new :pep:`646`
+ integration, now available in more recent versions of Mypy. Mypy
+ 1.7 or greater is now required for statements, results and rows
+ to be correctly typed. Pull request courtesy Yurii Karabas.
+
+ .. seealso::
+
+ :ref:`change_10635`
for the 2.0 series. Typing details are subject to change however
significant backwards-incompatible changes are not planned.
+.. _change_result_typing_20:
SQL Expression / Statement / Result Set Typing
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
from typing import Tuple
from typing import Type
+from ..util.typing import TupleAny
+
if typing.TYPE_CHECKING:
from .result import _KeyType
from .result import _ProcessorsType
- from .result import _RawRowType
from .result import _TupleGetterType
from .result import ResultMetaData
_parent: ResultMetaData
_key_to_index: Mapping[_KeyType, int]
- _data: _RawRowType
+ _data: TupleAny
def __init__(
self,
parent: ResultMetaData,
processors: Optional[_ProcessorsType],
key_to_index: Mapping[_KeyType, int],
- data: _RawRowType,
+ data: TupleAny,
):
"""Row objects are constructed by CursorResult objects."""
object.__setattr__(self, "_parent", parent)
from .. import util
from ..sql import compiler
from ..sql import util as sql_util
+from ..util.typing import TupleAny
+from ..util.typing import TypeVarTuple
+from ..util.typing import Unpack
if typing.TYPE_CHECKING:
from . import CursorResult
_T = TypeVar("_T", bound=Any)
+_Ts = TypeVarTuple("_Ts")
_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.EMPTY_DICT
NO_OPTIONS: Mapping[str, Any] = util.EMPTY_DICT
@overload
def scalar(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
parameters: Optional[_CoreSingleExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
@overload
def scalars(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
parameters: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
@overload
def execute(
self,
- statement: TypedReturnsRows[_T],
+ statement: TypedReturnsRows[Unpack[_Ts]],
parameters: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> CursorResult[_T]:
+ ) -> CursorResult[Unpack[_Ts]]:
...
@overload
parameters: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[TupleAny]]:
...
def execute(
parameters: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[TupleAny]]:
r"""Executes a SQL statement construct and returns a
:class:`_engine.CursorResult`.
func: FunctionElement[Any],
distilled_parameters: _CoreMultiExecuteParams,
execution_options: CoreExecuteOptionsParameter,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[TupleAny]]:
"""Execute a sql.FunctionElement object."""
return self._execute_clauseelement(
ddl: ExecutableDDLElement,
distilled_parameters: _CoreMultiExecuteParams,
execution_options: CoreExecuteOptionsParameter,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[TupleAny]]:
"""Execute a schema.DDL object."""
exec_opts = ddl._execution_options.merge_with(
elem: Executable,
distilled_parameters: _CoreMultiExecuteParams,
execution_options: CoreExecuteOptionsParameter,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[TupleAny]]:
"""Execute a sql.ClauseElement object."""
execution_options = elem._execution_options.merge_with(
compiled: Compiled,
distilled_parameters: _CoreMultiExecuteParams,
execution_options: CoreExecuteOptionsParameter = _EMPTY_EXECUTION_OPTS,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[TupleAny]]:
"""Execute a sql.Compiled object.
TODO: why do we have this? likely deprecate or remove
statement: str,
parameters: Optional[_DBAPIAnyExecuteParams] = None,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[TupleAny]]:
r"""Executes a string SQL statement on the DBAPI cursor directly,
without any SQL compilation steps.
execution_options: _ExecuteOptions,
*args: Any,
**kw: Any,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[TupleAny]]:
"""Create an :class:`.ExecutionContext` and execute, returning
a :class:`_engine.CursorResult`."""
context: ExecutionContext,
statement: Union[str, Compiled],
parameters: Optional[_AnyMultiExecuteParams],
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[TupleAny]]:
"""continue the _execute_context() method for a single DBAPI
cursor.execute() or cursor.executemany() call.
self,
dialect: Dialect,
context: ExecutionContext,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[TupleAny]]:
"""continue the _execute_context() method for an "insertmanyvalues"
operation, which will invoke DBAPI
cursor.execute() one or more times with individual log and
from typing import Sequence
from typing import Tuple
from typing import TYPE_CHECKING
-from typing import TypeVar
from typing import Union
from .result import IteratorResult
from ..util import compat
from ..util.typing import Literal
from ..util.typing import Self
+from ..util.typing import TupleAny
+from ..util.typing import TypeVarTuple
+from ..util.typing import Unpack
if typing.TYPE_CHECKING:
from ..sql.type_api import _ResultProcessorType
-_T = TypeVar("_T", bound=Any)
+_Ts = TypeVarTuple("_Ts")
# metadata entry tuple indexes.
def __init__(
self,
- parent: CursorResult[Any],
+ parent: CursorResult[Unpack[TupleAny]],
cursor_description: _DBAPICursorDescription,
):
context = parent.context
alternate_cursor_description: Optional[_DBAPICursorDescription] = None
def soft_close(
- self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
+ self,
+ result: CursorResult[Unpack[TupleAny]],
+ dbapi_cursor: Optional[DBAPICursor],
) -> None:
raise NotImplementedError()
def hard_close(
- self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
+ self,
+ result: CursorResult[Unpack[TupleAny]],
+ dbapi_cursor: Optional[DBAPICursor],
) -> None:
raise NotImplementedError()
def yield_per(
self,
- result: CursorResult[Any],
+ result: CursorResult[Unpack[TupleAny]],
dbapi_cursor: Optional[DBAPICursor],
num: int,
) -> None:
def fetchone(
self,
- result: CursorResult[Any],
+ result: CursorResult[Unpack[TupleAny]],
dbapi_cursor: DBAPICursor,
hard_close: bool = False,
) -> Any:
def fetchmany(
self,
- result: CursorResult[Any],
+ result: CursorResult[Unpack[TupleAny]],
dbapi_cursor: DBAPICursor,
size: Optional[int] = None,
) -> Any:
def fetchall(
self,
- result: CursorResult[Any],
+ result: CursorResult[Unpack[TupleAny]],
dbapi_cursor: DBAPICursor,
) -> Any:
raise NotImplementedError()
def handle_exception(
self,
- result: CursorResult[Any],
+ result: CursorResult[Unpack[TupleAny]],
dbapi_cursor: Optional[DBAPICursor],
err: BaseException,
) -> NoReturn:
return it
-class CursorResult(Result[_T]):
+class CursorResult(Result[Unpack[_Ts]]):
"""A Result that is representing state from a DBAPI cursor.
.. versionchanged:: 1.4 The :class:`.CursorResult``
def _raw_row_iterator(self):
return self._fetchiter_impl()
- def merge(self, *others: Result[Any]) -> MergedResult[Any]:
+ def merge(
+ self, *others: Result[Unpack[TupleAny]]
+ ) -> MergedResult[Unpack[TupleAny]]:
merged_result = super().merge(*others)
setup_rowcounts = self.context._has_rowcount
if setup_rowcounts:
from ..sql.compiler import SQLCompiler
from ..sql.elements import quoted_name
from ..util.typing import Literal
+from ..util.typing import TupleAny
+from ..util.typing import Unpack
+
if typing.TYPE_CHECKING:
from types import ModuleType
result_column_struct: Optional[
Tuple[List[ResultColumnsEntry], bool, bool, bool, bool]
] = None
- returned_default_rows: Optional[Sequence[Row[Any]]] = None
+ returned_default_rows: Optional[Sequence[Row[Unpack[TupleAny]]]] = None
execution_options: _ExecuteOptions = util.EMPTY_DICT
from .. import event
from .. import exc
from ..util.typing import Literal
+from ..util.typing import TupleAny
+from ..util.typing import Unpack
if typing.TYPE_CHECKING:
from .interfaces import _CoreMultiExecuteParams
multiparams: _CoreMultiExecuteParams,
params: _CoreSingleExecuteParams,
execution_options: _ExecuteOptions,
- result: Result[Any],
+ result: Result[Unpack[TupleAny]],
) -> None:
"""Intercept high level execute() events after execute.
from ..sql.base import _generative
from ..sql.base import HasMemoized
from ..sql.base import InPlaceGenerative
+from ..util import deprecated
from ..util import HasMemoized_ro_memoized_attribute
from ..util import NONE_SET
from ..util._has_cy import HAS_CYEXTENSION
from ..util.typing import Literal
from ..util.typing import Self
+from ..util.typing import TupleAny
+from ..util.typing import TypeVarTuple
+from ..util.typing import Unpack
if typing.TYPE_CHECKING or not HAS_CYEXTENSION:
from ._py_row import tuplegetter as tuplegetter
_KeyMapType = Mapping[_KeyType, _KeyMapRecType]
-_RowData = Union[Row[Any], RowMapping, Any]
+_RowData = Union[Row[Unpack[TupleAny]], RowMapping, Any]
"""A generic form of "row" that accommodates for the different kinds of
"rows" that different result objects return, including row, row mapping, and
scalar values"""
-_RawRowType = Tuple[Any, ...]
-"""represents the kind of row we get from a DBAPI cursor"""
_R = TypeVar("_R", bound=_RowData)
_T = TypeVar("_T", bound=Any)
-_TP = TypeVar("_TP", bound=Tuple[Any, ...])
+_Ts = TypeVarTuple("_Ts")
-_InterimRowType = Union[_R, _RawRowType]
+_InterimRowType = Union[_R, TupleAny]
"""a catchall "anything" kind of return type that can be applied
across all the result types
"""
-_InterimSupportsScalarsRowType = Union[Row[Any], Any]
+_InterimSupportsScalarsRowType = Union[Row[Unpack[TupleAny]], Any]
_ProcessorsType = Sequence[Optional["_ResultProcessorType[Any]"]]
_TupleGetterType = Callable[[Sequence[Any]], Sequence[Any]]
def _getter(
self, key: Any, raiseerr: bool = True
- ) -> Optional[Callable[[Row[Any]], Any]]:
+ ) -> Optional[Callable[[Row[Unpack[TupleAny]]], Any]]:
index = self._index_for_key(key, raiseerr)
if index is not None:
def result_tuple(
fields: Sequence[str], extra: Optional[Any] = None
-) -> Callable[[Iterable[Any]], Row[Any]]:
+) -> Callable[[Iterable[Any]], Row[Unpack[TupleAny]]]:
parent = SimpleResultMetaData(fields, extra)
return functools.partial(
Row, parent, parent._effective_processors, parent._key_to_index
class ResultInternal(InPlaceGenerative, Generic[_R]):
__slots__ = ()
- _real_result: Optional[Result[Any]] = None
+ _real_result: Optional[Result[Unpack[TupleAny]]] = None
_generate_rows: bool = True
_row_logging_fn: Optional[Callable[[Any], Any]]
_source_supports_scalars: bool
- def _fetchiter_impl(self) -> Iterator[_InterimRowType[Row[Any]]]:
+ def _fetchiter_impl(
+ self,
+ ) -> Iterator[_InterimRowType[Row[Unpack[TupleAny]]]]:
raise NotImplementedError()
def _fetchone_impl(
self, hard_close: bool = False
- ) -> Optional[_InterimRowType[Row[Any]]]:
+ ) -> Optional[_InterimRowType[Row[Unpack[TupleAny]]]]:
raise NotImplementedError()
def _fetchmany_impl(
self, size: Optional[int] = None
- ) -> List[_InterimRowType[Row[Any]]]:
+ ) -> List[_InterimRowType[Row[Unpack[TupleAny]]]]:
raise NotImplementedError()
- def _fetchall_impl(self) -> List[_InterimRowType[Row[Any]]]:
+ def _fetchall_impl(
+ self,
+ ) -> List[_InterimRowType[Row[Unpack[TupleAny]]]]:
raise NotImplementedError()
def _soft_close(self, hard: bool = False) -> None:
@HasMemoized_ro_memoized_attribute
def _row_getter(self) -> Optional[Callable[..., _R]]:
- real_result: Result[Any] = (
+ real_result: Result[Unpack[TupleAny]] = (
self._real_result
if self._real_result
- else cast("Result[Any]", self)
+ else cast("Result[Unpack[TupleAny]]", self)
)
if real_result._source_supports_scalars:
processors: Optional[_ProcessorsType],
key_to_index: Mapping[_KeyType, int],
scalar_obj: Any,
- ) -> Row[Any]:
+ ) -> Row[Unpack[TupleAny]]:
return _proc(
metadata, processors, key_to_index, (scalar_obj,)
)
fixed_tf = tf
- def make_row(row: _InterimRowType[Row[Any]]) -> _R:
+ def make_row(row: _InterimRowType[Row[Unpack[TupleAny]]]) -> _R:
return _make_row_orig(fixed_tf(row))
else:
_log_row = real_result._row_logging_fn
_make_row = make_row
- def make_row(row: _InterimRowType[Row[Any]]) -> _R:
+ def make_row(row: _InterimRowType[Row[Unpack[TupleAny]]]) -> _R:
return _log_row(_make_row(row)) # type: ignore
return make_row
if self._unique_filter_state:
uniques, strategy = self._unique_strategy
- def iterrows(self: Result[Any]) -> Iterator[_R]:
+ def iterrows(self: Result[Unpack[TupleAny]]) -> Iterator[_R]:
for raw_row in self._fetchiter_impl():
obj: _InterimRowType[Any] = (
make_row(raw_row) if make_row else raw_row
else:
- def iterrows(self: Result[Any]) -> Iterator[_R]:
+ def iterrows(self: Result[Unpack[TupleAny]]) -> Iterator[_R]:
for raw_row in self._fetchiter_impl():
row: _InterimRowType[Any] = (
make_row(raw_row) if make_row else raw_row
if self._unique_filter_state:
uniques, strategy = self._unique_strategy
- def onerow(self: Result[Any]) -> Union[_NoRow, _R]:
+ def onerow(self: Result[Unpack[TupleAny]]) -> Union[_NoRow, _R]:
_onerow = self._fetchone_impl
while True:
row = _onerow()
else:
- def onerow(self: Result[Any]) -> Union[_NoRow, _R]:
+ def onerow(self: Result[Unpack[TupleAny]]) -> Union[_NoRow, _R]:
row = self._fetchone_impl()
if row is None:
return _NO_ROW
real_result = (
self._real_result
if self._real_result
- else cast("Result[Any]", self)
+ else cast("Result[Unpack[TupleAny]]", self)
)
if real_result._yield_per:
num_required = num = real_result._yield_per
real_result = (
self._real_result
if self._real_result
- else cast("Result[Any]", self)
+ else cast("Result[Unpack[TupleAny]]", self)
)
num = real_result._yield_per
real_result = (
self._real_result
if self._real_result is not None
- else cast("Result[Any]", self)
+ else cast("Result[Unpack[TupleAny]]", self)
)
if not strategy and self._metadata._unique_filters:
return self._metadata.keys
-class Result(_WithKeys, ResultInternal[Row[_TP]]):
+class Result(_WithKeys, ResultInternal[Row[Unpack[_Ts]]]):
"""Represent a set of database results.
.. versionadded:: 1.4 The :class:`_engine.Result` object provides a
__slots__ = ("_metadata", "__dict__")
- _row_logging_fn: Optional[Callable[[Row[Any]], Row[Any]]] = None
+ _row_logging_fn: Optional[
+ Callable[[Row[Unpack[TupleAny]]], Row[Unpack[TupleAny]]]
+ ] = None
_source_supports_scalars: bool = False
return self._column_slices(col_expressions)
@overload
- def scalars(self: Result[Tuple[_T]]) -> ScalarResult[_T]:
+ def scalars(self: Result[_T, Unpack[TupleAny]]) -> ScalarResult[_T]:
...
@overload
def scalars(
- self: Result[Tuple[_T]], index: Literal[0]
+ self: Result[_T, Unpack[TupleAny]], index: Literal[0]
) -> ScalarResult[_T]:
...
def _getter(
self, key: _KeyIndexType, raiseerr: bool = True
- ) -> Optional[Callable[[Row[Any]], Any]]:
+ ) -> Optional[Callable[[Row[Unpack[TupleAny]]], Any]]:
"""return a callable that will retrieve the given key from a
:class:`_engine.Row`.
return MappingResult(self)
@property
- def t(self) -> TupleResult[_TP]:
+ @deprecated(
+ "2.1.0",
+ "The :attr:`.Result.t` method is deprecated, :class:`.Row` "
+ "now behaves like a tuple and can unpack types directly.",
+ )
+ def t(self) -> TupleResult[Tuple[Unpack[_Ts]]]:
"""Apply a "typed tuple" typing filter to returned rows.
The :attr:`_engine.Result.t` attribute is a synonym for
.. versionadded:: 2.0
+ .. seealso::
+
+ :ref:`change_10635` - describes a migration path from this
+ workaround for SQLAlchemy 2.1.
+
"""
return self # type: ignore
- def tuples(self) -> TupleResult[_TP]:
+ @deprecated(
+ "2.1.0",
+ "The :method:`.Result.tuples` method is deprecated, :class:`.Row` "
+ "now behaves like a tuple and can unpack types directly.",
+ )
+ def tuples(self) -> TupleResult[Tuple[Unpack[_Ts]]]:
"""Apply a "typed tuple" typing filter to returned rows.
This method returns the same :class:`_engine.Result` object
.. seealso::
+ :ref:`change_10635` - describes a migration path from this
+ workaround for SQLAlchemy 2.1.
+
:attr:`_engine.Result.t` - shorter synonym
:attr:`_engine.Row._t` - :class:`_engine.Row` version
"""
raise NotImplementedError()
- def __iter__(self) -> Iterator[Row[_TP]]:
+ def __iter__(self) -> Iterator[Row[Unpack[_Ts]]]:
return self._iter_impl()
- def __next__(self) -> Row[_TP]:
+ def __next__(self) -> Row[Unpack[_Ts]]:
return self._next_impl()
def partitions(
self, size: Optional[int] = None
- ) -> Iterator[Sequence[Row[_TP]]]:
+ ) -> Iterator[Sequence[Row[Unpack[_Ts]]]]:
"""Iterate through sub-lists of rows of the size given.
Each list will be of the size given, excluding the last list to
else:
break
- def fetchall(self) -> Sequence[Row[_TP]]:
+ def fetchall(self) -> Sequence[Row[Unpack[_Ts]]]:
"""A synonym for the :meth:`_engine.Result.all` method."""
return self._allrows()
- def fetchone(self) -> Optional[Row[_TP]]:
+ def fetchone(self) -> Optional[Row[Unpack[_Ts]]]:
"""Fetch one row.
When all rows are exhausted, returns None.
else:
return row
- def fetchmany(self, size: Optional[int] = None) -> Sequence[Row[_TP]]:
+ def fetchmany(
+ self, size: Optional[int] = None
+ ) -> Sequence[Row[Unpack[_Ts]]]:
"""Fetch many rows.
When all rows are exhausted, returns an empty sequence.
return self._manyrow_getter(self, size)
- def all(self) -> Sequence[Row[_TP]]:
+ def all(self) -> Sequence[Row[Unpack[_Ts]]]:
"""Return all rows in a sequence.
Closes the result set after invocation. Subsequent invocations
return self._allrows()
- def first(self) -> Optional[Row[_TP]]:
+ def first(self) -> Optional[Row[Unpack[_Ts]]]:
"""Fetch the first row or ``None`` if no row is present.
Closes the result set and discards remaining rows.
raise_for_second_row=False, raise_for_none=False, scalar=False
)
- def one_or_none(self) -> Optional[Row[_TP]]:
+ def one_or_none(self) -> Optional[Row[Unpack[_Ts]]]:
"""Return at most one result or raise an exception.
Returns ``None`` if the result has no rows.
)
@overload
- def scalar_one(self: Result[Tuple[_T]]) -> _T:
+ def scalar_one(self: Result[_T]) -> _T:
...
@overload
)
@overload
- def scalar_one_or_none(self: Result[Tuple[_T]]) -> Optional[_T]:
+ def scalar_one_or_none(self: Result[_T]) -> Optional[_T]:
...
@overload
raise_for_second_row=True, raise_for_none=False, scalar=True
)
- def one(self) -> Row[_TP]:
+ def one(self) -> Row[Unpack[_Ts]]:
"""Return exactly one row or raise an exception.
Raises :class:`.NoResultFound` if the result returns no
)
@overload
- def scalar(self: Result[Tuple[_T]]) -> Optional[_T]:
+ def scalar(self: Result[_T]) -> Optional[_T]:
...
@overload
raise_for_second_row=False, raise_for_none=False, scalar=True
)
- def freeze(self) -> FrozenResult[_TP]:
+ def freeze(self) -> FrozenResult[Unpack[_Ts]]:
"""Return a callable object that will produce copies of this
:class:`_engine.Result` when invoked.
return FrozenResult(self)
- def merge(self, *others: Result[Any]) -> MergedResult[_TP]:
+ def merge(
+ self, *others: Result[Unpack[TupleAny]]
+ ) -> MergedResult[Unpack[TupleAny]]:
"""Merge this :class:`_engine.Result` with other compatible result
objects.
_post_creational_filter: Optional[Callable[[Any], Any]]
- _real_result: Result[Any]
+ _real_result: Result[Unpack[TupleAny]]
def __enter__(self) -> Self:
return self
def _attributes(self) -> Dict[Any, Any]:
return self._real_result._attributes
- def _fetchiter_impl(self) -> Iterator[_InterimRowType[Row[Any]]]:
+ def _fetchiter_impl(
+ self,
+ ) -> Iterator[_InterimRowType[Row[Unpack[TupleAny]]]]:
return self._real_result._fetchiter_impl()
def _fetchone_impl(
self, hard_close: bool = False
- ) -> Optional[_InterimRowType[Row[Any]]]:
+ ) -> Optional[_InterimRowType[Row[Unpack[TupleAny]]]]:
return self._real_result._fetchone_impl(hard_close=hard_close)
- def _fetchall_impl(self) -> List[_InterimRowType[Row[Any]]]:
+ def _fetchall_impl(
+ self,
+ ) -> List[_InterimRowType[Row[Unpack[TupleAny]]]]:
return self._real_result._fetchall_impl()
def _fetchmany_impl(
self, size: Optional[int] = None
- ) -> List[_InterimRowType[Row[Any]]]:
+ ) -> List[_InterimRowType[Row[Unpack[TupleAny]]]]:
return self._real_result._fetchmany_impl(size=size)
_post_creational_filter: Optional[Callable[[Any], Any]]
- def __init__(self, real_result: Result[Any], index: _KeyIndexType):
+ def __init__(
+ self, real_result: Result[Unpack[TupleAny]], index: _KeyIndexType
+ ):
self._real_result = real_result
if real_result._source_supports_scalars:
_post_creational_filter = operator.attrgetter("_mapping")
- def __init__(self, result: Result[Any]):
+ def __init__(self, result: Result[Unpack[TupleAny]]):
self._real_result = result
self._unique_filter_state = result._unique_filter_state
self._metadata = result._metadata
)
-class FrozenResult(Generic[_TP]):
+class FrozenResult(Generic[Unpack[_Ts]]):
"""Represents a :class:`_engine.Result` object in a "frozen" state suitable
for caching.
data: Sequence[Any]
- def __init__(self, result: Result[_TP]):
+ def __init__(self, result: Result[Unpack[_Ts]]):
self.metadata = result._metadata._for_freeze()
self._source_supports_scalars = result._source_supports_scalars
self._attributes = result._attributes
return [list(row) for row in self.data]
def with_new_rows(
- self, tuple_data: Sequence[Row[_TP]]
- ) -> FrozenResult[_TP]:
+ self, tuple_data: Sequence[Row[Unpack[_Ts]]]
+ ) -> FrozenResult[Unpack[_Ts]]:
fr = FrozenResult.__new__(FrozenResult)
fr.metadata = self.metadata
fr._attributes = self._attributes
fr._source_supports_scalars = self._source_supports_scalars
if self._source_supports_scalars:
- fr.data = [d[0] for d in tuple_data]
+ fr.data = [d[0] for d in tuple_data] # type: ignore[misc]
else:
fr.data = tuple_data
return fr
- def __call__(self) -> Result[_TP]:
- result: IteratorResult[_TP] = IteratorResult(
+ def __call__(self) -> Result[Unpack[_Ts]]:
+ result: IteratorResult[Unpack[_Ts]] = IteratorResult(
self.metadata, iter(self.data)
)
result._attributes = self._attributes
return result
-class IteratorResult(Result[_TP]):
+class IteratorResult(Result[Unpack[_Ts]]):
"""A :class:`_engine.Result` that gets data from a Python iterator of
:class:`_engine.Row` objects or similar row-like data.
def _fetchone_impl(
self, hard_close: bool = False
- ) -> Optional[_InterimRowType[Row[Any]]]:
+ ) -> Optional[_InterimRowType[Row[Unpack[TupleAny]]]]:
if self._hard_closed:
self._raise_hard_closed()
else:
return row
- def _fetchall_impl(self) -> List[_InterimRowType[Row[Any]]]:
+ def _fetchall_impl(
+ self,
+ ) -> List[_InterimRowType[Row[Unpack[TupleAny]]]]:
if self._hard_closed:
self._raise_hard_closed()
try:
def _fetchmany_impl(
self, size: Optional[int] = None
- ) -> List[_InterimRowType[Row[Any]]]:
+ ) -> List[_InterimRowType[Row[Unpack[TupleAny]]]]:
if self._hard_closed:
self._raise_hard_closed()
return IteratorResult(SimpleResultMetaData([]), iter([]))
-class ChunkedIteratorResult(IteratorResult[_TP]):
+class ChunkedIteratorResult(IteratorResult[Unpack[_Ts]]):
"""An :class:`_engine.IteratorResult` that works from an
iterator-producing callable.
def _fetchmany_impl(
self, size: Optional[int] = None
- ) -> List[_InterimRowType[Row[Any]]]:
+ ) -> List[_InterimRowType[Row[Unpack[TupleAny]]]]:
if self.dynamic_yield_per:
self.iterator = itertools.chain.from_iterable(self.chunks(size))
return super()._fetchmany_impl(size=size)
-class MergedResult(IteratorResult[_TP]):
+class MergedResult(IteratorResult[Unpack[_Ts]]):
"""A :class:`_engine.Result` that is merged from any number of
:class:`_engine.Result` objects.
rowcount: Optional[int]
def __init__(
- self, cursor_metadata: ResultMetaData, results: Sequence[Result[_TP]]
+ self,
+ cursor_metadata: ResultMetaData,
+ results: Sequence[Result[Unpack[_Ts]]],
):
self._results = results
super().__init__(
from typing import Mapping
from typing import NoReturn
from typing import Optional
-from typing import overload
from typing import Sequence
from typing import Tuple
from typing import TYPE_CHECKING
from typing import TypeVar
-from typing import Union
from ..sql import util as sql_util
from ..util import deprecated
from ..util._has_cy import HAS_CYEXTENSION
+from ..util.typing import TypeVarTuple
+from ..util.typing import Unpack
if TYPE_CHECKING or not HAS_CYEXTENSION:
from ._py_row import BaseRow as BaseRow
from sqlalchemy.cyextension.resultproxy import BaseRow as BaseRow
if TYPE_CHECKING:
+ from typing import Tuple as _RowBase
+
from .result import _KeyType
from .result import _ProcessorsType
from .result import RMKeyView
+else:
+ _RowBase = Sequence
+
_T = TypeVar("_T", bound=Any)
-_TP = TypeVar("_TP", bound=Tuple[Any, ...])
+_Ts = TypeVarTuple("_Ts")
-class Row(BaseRow, Sequence[Any], Generic[_TP]):
+class Row(BaseRow, _RowBase[Unpack[_Ts]], Generic[Unpack[_Ts]]):
"""Represent a single result row.
The :class:`.Row` object represents a row of a database result. It is
def __delattr__(self, name: str) -> NoReturn:
raise AttributeError("can't delete attribute")
- def _tuple(self) -> _TP:
+ @deprecated(
+ "2.1.0",
+ "The :meth:`.Row._tuple` method is deprecated, :class:`.Row` "
+ "now behaves like a tuple and can unpack types directly.",
+ )
+ def _tuple(self) -> Tuple[Unpack[_Ts]]:
"""Return a 'tuple' form of this :class:`.Row`.
At runtime, this method returns "self"; the :class:`.Row` object is
.. seealso::
+ :ref:`change_10635` - describes a migration path from this
+ workaround for SQLAlchemy 2.1.
+
:attr:`.Row._t` - shorthand attribute notation
:meth:`.Result.tuples`
"""
- return self # type: ignore
+ return self
@deprecated(
"2.0.19",
"methods and library-level attributes are intended to be underscored "
"to avoid name conflicts. Please use :meth:`Row._tuple`.",
)
- def tuple(self) -> _TP:
+ def tuple(self) -> Tuple[Unpack[_Ts]]:
"""Return a 'tuple' form of this :class:`.Row`.
.. versionadded:: 2.0
+ .. seealso::
+
+ :ref:`change_10635` - describes a migration path from this
+ workaround for SQLAlchemy 2.1.
+
"""
return self._tuple()
@property
- def _t(self) -> _TP:
+ @deprecated(
+ "2.1.0",
+ "The :attr:`.Row._t` attribute is deprecated, :class:`.Row` "
+ "now behaves like a tuple and can unpack types directly.",
+ )
+ def _t(self) -> Tuple[Unpack[_Ts]]:
"""A synonym for :meth:`.Row._tuple`.
.. versionadded:: 2.0.19 - The :attr:`.Row._t` attribute supersedes
.. seealso::
+ :ref:`change_10635` - describes a migration path from this
+ workaround for SQLAlchemy 2.1.
+
:attr:`.Result.t`
"""
- return self # type: ignore
+ return self
@property
@deprecated(
"methods and library-level attributes are intended to be underscored "
"to avoid name conflicts. Please use :attr:`Row._t`.",
)
- def t(self) -> _TP:
+ def t(self) -> Tuple[Unpack[_Ts]]:
"""A synonym for :meth:`.Row._tuple`.
.. versionadded:: 2.0
+ .. seealso::
+
+ :ref:`change_10635` - describes a migration path from this
+ workaround for SQLAlchemy 2.1.
+
"""
return self._t
def _filter_on_values(
self, processor: Optional[_ProcessorsType]
- ) -> Row[Any]:
+ ) -> Row[Unpack[_Ts]]:
return Row(self._parent, processor, self._key_to_index, self._data)
if not TYPE_CHECKING:
__hash__ = BaseRow.__hash__
- if TYPE_CHECKING:
-
- @overload
- def __getitem__(self, index: int) -> Any:
- ...
-
- @overload
- def __getitem__(self, index: slice) -> Sequence[Any]:
- ...
-
- def __getitem__(self, index: Union[int, slice]) -> Any:
- ...
-
def __lt__(self, other: Any) -> bool:
return self._op(other, operator.lt)
from typing import NoReturn
from typing import Optional
from typing import overload
-from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
from typing import TypeVar
from ...engine.base import Transaction
from ...exc import ArgumentError
from ...util.concurrency import greenlet_spawn
+from ...util.typing import TupleAny
+from ...util.typing import TypeVarTuple
+from ...util.typing import Unpack
if TYPE_CHECKING:
from ...engine.cursor import CursorResult
from ...sql.selectable import TypedReturnsRows
_T = TypeVar("_T", bound=Any)
+_Ts = TypeVarTuple("_Ts")
def create_async_engine(url: Union[str, URL], **kw: Any) -> AsyncEngine:
@overload
def stream(
self,
- statement: TypedReturnsRows[_T],
+ statement: TypedReturnsRows[Unpack[_Ts]],
parameters: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> GeneratorStartableContext[AsyncResult[_T]]:
+ ) -> GeneratorStartableContext[AsyncResult[Unpack[_Ts]]]:
...
@overload
parameters: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> GeneratorStartableContext[AsyncResult[Any]]:
+ ) -> GeneratorStartableContext[AsyncResult[Unpack[TupleAny]]]:
...
@asyncstartablecontext
parameters: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> AsyncIterator[AsyncResult[Any]]:
+ ) -> AsyncIterator[AsyncResult[Unpack[TupleAny]]]:
"""Execute a statement and return an awaitable yielding a
:class:`_asyncio.AsyncResult` object.
@overload
async def execute(
self,
- statement: TypedReturnsRows[_T],
+ statement: TypedReturnsRows[Unpack[_Ts]],
parameters: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> CursorResult[_T]:
+ ) -> CursorResult[Unpack[_Ts]]:
...
@overload
parameters: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[TupleAny]]:
...
async def execute(
parameters: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[TupleAny]]:
r"""Executes a SQL statement construct and return a buffered
:class:`_engine.Result`.
@overload
async def scalar(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
parameters: Optional[_CoreSingleExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
@overload
async def scalars(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
parameters: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
@overload
def stream_scalars(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
parameters: Optional[_CoreSingleExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
from ...engine.row import Row
from ...engine.row import RowMapping
from ...sql.base import _generative
+from ...util import deprecated
from ...util.concurrency import greenlet_spawn
from ...util.typing import Literal
from ...util.typing import Self
+from ...util.typing import TupleAny
+from ...util.typing import TypeVarTuple
+from ...util.typing import Unpack
if TYPE_CHECKING:
from ...engine import CursorResult
from ...engine.result import _UniqueFilterType
_T = TypeVar("_T", bound=Any)
-_TP = TypeVar("_TP", bound=Tuple[Any, ...])
+_Ts = TypeVarTuple("_Ts")
class AsyncCommon(FilterResult[_R]):
__slots__ = ()
- _real_result: Result[Any]
+ _real_result: Result[Unpack[TupleAny]]
_metadata: ResultMetaData
async def close(self) -> None: # type: ignore[override]
return self._real_result.closed
-class AsyncResult(_WithKeys, AsyncCommon[Row[_TP]]):
+class AsyncResult(_WithKeys, AsyncCommon[Row[Unpack[_Ts]]]):
"""An asyncio wrapper around a :class:`_result.Result` object.
The :class:`_asyncio.AsyncResult` only applies to statement executions that
__slots__ = ()
- _real_result: Result[_TP]
+ _real_result: Result[Unpack[_Ts]]
- def __init__(self, real_result: Result[_TP]):
+ def __init__(self, real_result: Result[Unpack[_Ts]]):
self._real_result = real_result
self._metadata = real_result._metadata
)
@property
- def t(self) -> AsyncTupleResult[_TP]:
+ @deprecated(
+ "2.1.0",
+ "The :attr:`.AsyncResult.t` attribute is deprecated, :class:`.Row` "
+ "now behaves like a tuple and can unpack types directly.",
+ )
+ def t(self) -> AsyncTupleResult[Tuple[Unpack[_Ts]]]:
"""Apply a "typed tuple" typing filter to returned rows.
The :attr:`_asyncio.AsyncResult.t` attribute is a synonym for
.. versionadded:: 2.0
+ .. seealso::
+
+ :ref:`change_10635` - describes a migration path from this
+ workaround for SQLAlchemy 2.1.
+
"""
return self # type: ignore
- def tuples(self) -> AsyncTupleResult[_TP]:
+ @deprecated(
+ "2.1.0",
+ "The :method:`.AsyncResult.tuples` method is deprecated, "
+ ":class:`.Row` now behaves like a tuple and can unpack types "
+ "directly.",
+ )
+ def tuples(self) -> AsyncTupleResult[Tuple[Unpack[_Ts]]]:
"""Apply a "typed tuple" typing filter to returned rows.
This method returns the same :class:`_asyncio.AsyncResult` object
.. seealso::
+ :ref:`change_10635` - describes a migration path from this
+ workaround for SQLAlchemy 2.1.
+
:attr:`_asyncio.AsyncResult.t` - shorter synonym
:attr:`_engine.Row.t` - :class:`_engine.Row` version
async def partitions(
self, size: Optional[int] = None
- ) -> AsyncIterator[Sequence[Row[_TP]]]:
+ ) -> AsyncIterator[Sequence[Row[Unpack[_Ts]]]]:
"""Iterate through sub-lists of rows of the size given.
An async iterator is returned::
else:
break
- async def fetchall(self) -> Sequence[Row[_TP]]:
+ async def fetchall(self) -> Sequence[Row[Unpack[_Ts]]]:
"""A synonym for the :meth:`_asyncio.AsyncResult.all` method.
.. versionadded:: 2.0
return await greenlet_spawn(self._allrows)
- async def fetchone(self) -> Optional[Row[_TP]]:
+ async def fetchone(self) -> Optional[Row[Unpack[_Ts]]]:
"""Fetch one row.
When all rows are exhausted, returns None.
async def fetchmany(
self, size: Optional[int] = None
- ) -> Sequence[Row[_TP]]:
+ ) -> Sequence[Row[Unpack[_Ts]]]:
"""Fetch many rows.
When all rows are exhausted, returns an empty list.
return await greenlet_spawn(self._manyrow_getter, self, size)
- async def all(self) -> Sequence[Row[_TP]]:
+ async def all(self) -> Sequence[Row[Unpack[_Ts]]]:
"""Return all rows in a list.
Closes the result set after invocation. Subsequent invocations
return await greenlet_spawn(self._allrows)
- def __aiter__(self) -> AsyncResult[_TP]:
+ def __aiter__(self) -> AsyncResult[Unpack[_Ts]]:
return self
- async def __anext__(self) -> Row[_TP]:
+ async def __anext__(self) -> Row[Unpack[_Ts]]:
row = await greenlet_spawn(self._onerow_getter, self)
if row is _NO_ROW:
raise StopAsyncIteration()
else:
return row
- async def first(self) -> Optional[Row[_TP]]:
+ async def first(self) -> Optional[Row[Unpack[_Ts]]]:
"""Fetch the first row or ``None`` if no row is present.
Closes the result set and discards remaining rows.
"""
return await greenlet_spawn(self._only_one_row, False, False, False)
- async def one_or_none(self) -> Optional[Row[_TP]]:
+ async def one_or_none(self) -> Optional[Row[Unpack[_Ts]]]:
"""Return at most one result or raise an exception.
Returns ``None`` if the result has no rows.
return await greenlet_spawn(self._only_one_row, True, False, False)
@overload
- async def scalar_one(self: AsyncResult[Tuple[_T]]) -> _T:
+ async def scalar_one(self: AsyncResult[_T]) -> _T:
...
@overload
@overload
async def scalar_one_or_none(
- self: AsyncResult[Tuple[_T]],
+ self: AsyncResult[_T],
) -> Optional[_T]:
...
"""
return await greenlet_spawn(self._only_one_row, True, False, True)
- async def one(self) -> Row[_TP]:
+ async def one(self) -> Row[Unpack[_Ts]]:
"""Return exactly one row or raise an exception.
Raises :class:`.NoResultFound` if the result returns no
return await greenlet_spawn(self._only_one_row, True, True, False)
@overload
- async def scalar(self: AsyncResult[Tuple[_T]]) -> Optional[_T]:
+ async def scalar(self: AsyncResult[_T]) -> Optional[_T]:
...
@overload
"""
return await greenlet_spawn(self._only_one_row, False, False, True)
- async def freeze(self) -> FrozenResult[_TP]:
+ async def freeze(self) -> FrozenResult[Unpack[_Ts]]:
"""Return a callable object that will produce copies of this
:class:`_asyncio.AsyncResult` when invoked.
@overload
def scalars(
- self: AsyncResult[Tuple[_T]], index: Literal[0]
+ self: AsyncResult[_T, Unpack[TupleAny]], index: Literal[0]
) -> AsyncScalarResult[_T]:
...
@overload
- def scalars(self: AsyncResult[Tuple[_T]]) -> AsyncScalarResult[_T]:
+ def scalars(
+ self: AsyncResult[_T, Unpack[TupleAny]],
+ ) -> AsyncScalarResult[_T]:
...
@overload
_generate_rows = False
- def __init__(self, real_result: Result[Any], index: _KeyIndexType):
+ def __init__(
+ self,
+ real_result: Result[Unpack[TupleAny]],
+ index: _KeyIndexType,
+ ):
self._real_result = real_result
if real_result._source_supports_scalars:
_post_creational_filter = operator.attrgetter("_mapping")
- def __init__(self, result: Result[Any]):
+ def __init__(self, result: Result[Unpack[TupleAny]]):
self._real_result = result
self._unique_filter_state = result._unique_filter_state
self._metadata = result._metadata
...
-_RT = TypeVar("_RT", bound="Result[Any]")
+_RT = TypeVar("_RT", bound="Result[Unpack[TupleAny]]")
async def _ensure_sync_result(result: _RT, calling_method: Any) -> _RT:
from ...util import ScopedRegistry
from ...util import warn
from ...util import warn_deprecated
+from ...util.typing import TupleAny
+from ...util.typing import TypeVarTuple
+from ...util.typing import Unpack
if TYPE_CHECKING:
from .engine import AsyncConnection
from ...sql.selectable import TypedReturnsRows
_T = TypeVar("_T", bound=Any)
+_Ts = TypeVarTuple("_Ts")
@create_proxy_methods(
@overload
async def execute(
self,
- statement: TypedReturnsRows[_T],
+ statement: TypedReturnsRows[Unpack[_Ts]],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[_T]:
+ ) -> Result[Unpack[_Ts]]:
...
@overload
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[TupleAny]]:
...
@overload
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[Any]:
+ ) -> Result[Unpack[TupleAny]]:
...
async def execute(
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> Result[Any]:
+ ) -> Result[Unpack[TupleAny]]:
r"""Execute a statement and return a buffered
:class:`_engine.Result` object.
@overload
async def scalar(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
@overload
async def scalars(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
@overload
async def stream(
self,
- statement: TypedReturnsRows[_T],
+ statement: TypedReturnsRows[Unpack[_Ts]],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> AsyncResult[_T]:
+ ) -> AsyncResult[Unpack[_Ts]]:
...
@overload
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> AsyncResult[Any]:
+ ) -> AsyncResult[Unpack[TupleAny]]:
...
async def stream(
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> AsyncResult[Any]:
+ ) -> AsyncResult[Unpack[TupleAny]]:
r"""Execute a statement and return a streaming
:class:`_asyncio.AsyncResult` object.
@overload
async def stream_scalars(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
ident: Union[Any, Tuple[Any, ...]] = None,
*,
instance: Optional[Any] = None,
- row: Optional[Union[Row[Any], RowMapping]] = None,
+ row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None,
identity_token: Optional[Any] = None,
) -> _IdentityKeyType[Any]:
r"""Return an identity key.
from ...orm import SessionTransaction
from ...orm import state as _instance_state
from ...util.concurrency import greenlet_spawn
+from ...util.typing import TupleAny
+from ...util.typing import TypeVarTuple
+from ...util.typing import Unpack
+
if TYPE_CHECKING:
from .engine import AsyncConnection
_AsyncSessionBind = Union["AsyncEngine", "AsyncConnection"]
_T = TypeVar("_T", bound=Any)
-
+_Ts = TypeVarTuple("_Ts")
_EXECUTE_OPTIONS = util.immutabledict({"prebuffer_rows": True})
_STREAM_OPTIONS = util.immutabledict({"stream_results": True})
@overload
async def execute(
self,
- statement: TypedReturnsRows[_T],
+ statement: TypedReturnsRows[Unpack[_Ts]],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[_T]:
+ ) -> Result[Unpack[_Ts]]:
...
@overload
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[TupleAny]]:
...
@overload
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[Any]:
+ ) -> Result[Unpack[TupleAny]]:
...
async def execute(
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> Result[Any]:
+ ) -> Result[Unpack[TupleAny]]:
"""Execute a statement and return a buffered
:class:`_engine.Result` object.
@overload
async def scalar(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
@overload
async def scalars(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
@overload
async def stream(
self,
- statement: TypedReturnsRows[_T],
+ statement: TypedReturnsRows[Unpack[_Ts]],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> AsyncResult[_T]:
+ ) -> AsyncResult[Unpack[_Ts]]:
...
@overload
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> AsyncResult[Any]:
+ ) -> AsyncResult[Unpack[TupleAny]]:
...
async def stream(
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> AsyncResult[Any]:
+ ) -> AsyncResult[Unpack[TupleAny]]:
"""Execute a statement and return a streaming
:class:`_asyncio.AsyncResult` object.
@overload
async def stream_scalars(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
ident: Union[Any, Tuple[Any, ...]] = None,
*,
instance: Optional[Any] = None,
- row: Optional[Union[Row[Any], RowMapping]] = None,
+ row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None,
identity_token: Optional[Any] = None,
) -> _IdentityKeyType[Any]:
r"""Return an identity key.
from ..orm.session import _PKIdentityArgument
from ..orm.session import Session
from ..util.typing import Self
+from ..util.typing import TupleAny
+from ..util.typing import TypeVarTuple
+from ..util.typing import Unpack
+
if TYPE_CHECKING:
from ..engine.base import Connection
from ..engine.base import Engine
from ..engine.base import OptionEngine
- from ..engine.result import IteratorResult
from ..engine.result import Result
from ..orm import LoaderCallableStatus
from ..orm._typing import _O
from ..orm.session import ORMExecuteState
from ..orm.state import InstanceState
from ..sql import Executable
- from ..sql._typing import _TP
from ..sql.elements import ClauseElement
__all__ = ["ShardedSession", "ShardedQuery"]
_T = TypeVar("_T", bound=Any)
+_Ts = TypeVarTuple("_Ts")
ShardIdentifier = str
def execute_and_instances(
orm_context: ORMExecuteState,
-) -> Union[Result[_T], IteratorResult[_TP]]:
+) -> Result[Unpack[TupleAny]]:
active_options: Union[
None,
QueryContext.default_load_options,
def iter_for_shard(
shard_id: ShardIdentifier,
- ) -> Union[Result[_T], IteratorResult[_TP]]:
+ ) -> Result[Unpack[TupleAny]]:
bind_arguments = dict(orm_context.bind_arguments)
bind_arguments["shard_id"] = shard_id
from ..sql.dml import UpdateDMLState
from ..util import EMPTY_DICT
from ..util.typing import Literal
+from ..util.typing import TupleAny
+from ..util.typing import Unpack
if TYPE_CHECKING:
from ._typing import DMLStrategyArgument
update_changed_only: bool,
use_orm_update_stmt: Optional[dml.Update] = ...,
enable_check_rowcount: bool = True,
-) -> _result.Result[Any]:
+) -> _result.Result[Unpack[TupleAny]]:
...
update_changed_only: bool,
use_orm_update_stmt: Optional[dml.Update] = None,
enable_check_rowcount: bool = True,
-) -> Optional[_result.Result[Any]]:
+) -> Optional[_result.Result[Unpack[TupleAny]]]:
base_mapper = mapper.base_mapper
search_keys = mapper._primary_key_propkeys
"are 'raw', 'orm', 'bulk', 'auto"
)
- result: _result.Result[Any]
+ result: _result.Result[Unpack[TupleAny]]
if insert_options._dml_strategy == "raw":
result = conn.execute(
"are 'orm', 'auto', 'bulk', 'core_only'"
)
- result: _result.Result[Any]
+ result: _result.Result[Unpack[TupleAny]]
if update_options._dml_strategy == "bulk":
enable_check_rowcount = not statement._where_criteria
from ..sql import roles
from ..sql import util as sql_util
from ..sql import visitors
-from ..sql._typing import _TP
from ..sql._typing import is_dml
from ..sql._typing import is_insert_update
from ..sql._typing import is_select_base
from ..sql.selectable import SelectState
from ..sql.selectable import TypedReturnsRows
from ..sql.visitors import InternalTraversal
+from ..util.typing import TupleAny
+from ..util.typing import TypeVarTuple
+from ..util.typing import Unpack
+
if TYPE_CHECKING:
from ._typing import _InternalEntityType
from ..sql.type_api import TypeEngine
_T = TypeVar("_T", bound=Any)
+_Ts = TypeVarTuple("_Ts")
_path_registry = PathRegistry.root
_EMPTY_DICT = util.immutabledict()
def __init__(
self,
compile_state: CompileState,
- statement: Union[Select[Any], FromStatement[Any]],
+ statement: Union[
+ Select[Unpack[TupleAny]],
+ FromStatement[Unpack[TupleAny]],
+ ],
params: _CoreSingleExecuteParams,
session: Session,
load_options: Union[
attributes: Dict[Any, Any]
global_attributes: Dict[Any, Any]
- statement: Union[Select[Any], FromStatement[Any]]
- select_statement: Union[Select[Any], FromStatement[Any]]
+ statement: Union[Select[Unpack[TupleAny]], FromStatement[Unpack[TupleAny]]]
+ select_statement: Union[
+ Select[Unpack[TupleAny]], FromStatement[Unpack[TupleAny]]
+ ]
_entities: List[_QueryEntity]
_polymorphic_adapters: Dict[_InternalEntityType, ORMAdapter]
compile_options: Union[
dedupe_columns: Set[ColumnElement[Any]]
create_eager_joins: List[
# TODO: this structure is set up by JoinedLoader
- Tuple[Any, ...]
+ TupleAny
]
current_path: PathRegistry = _path_registry
_has_mapper_entities = False
entity.setup_dml_returning_compile_state(self, adapter)
-class FromStatement(GroupedElement, Generative, TypedReturnsRows[_TP]):
+class FromStatement(GroupedElement, Generative, TypedReturnsRows[Unpack[_Ts]]):
"""Core construct that represents a load of ORM objects from various
:class:`.ReturnsRows` and other classes including:
def _legacy_filter_by_entity_zero(
- query_or_augmented_select: Union[Query[Any], Select[Any]]
+ query_or_augmented_select: Union[Query[Any], Select[Unpack[TupleAny]]]
) -> Optional[_InternalEntityType[Any]]:
self = query_or_augmented_select
if self._setup_joins:
def _entity_from_pre_ent_zero(
- query_or_augmented_select: Union[Query[Any], Select[Any]]
+ query_or_augmented_select: Union[Query[Any], Select[Unpack[TupleAny]]]
) -> Optional[_InternalEntityType[Any]]:
self = query_or_augmented_select
if not self._raw_columns:
from ..sql.elements import BindParameter
from ..util.typing import is_fwd_ref
from ..util.typing import is_pep593
+from ..util.typing import TupleAny
from ..util.typing import typing_get_args
+from ..util.typing import Unpack
+
if typing.TYPE_CHECKING:
from ._typing import _InstanceDict
def create_row_processor(
self,
- query: Select[Any],
- procs: Sequence[Callable[[Row[Any]], Any]],
+ query: Select[Unpack[TupleAny]],
+ procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]],
labels: Sequence[str],
- ) -> Callable[[Row[Any]], Any]:
- def proc(row: Row[Any]) -> Any:
+ ) -> Callable[[Row[Unpack[TupleAny]]], Any]:
+ def proc(row: Row[Unpack[TupleAny]]) -> Any:
return self.property.composite_class(
*[proc(row) for proc in procs]
)
from ..sql.type_api import TypeEngine
from ..util import warn_deprecated
from ..util.typing import RODescriptorReference
+from ..util.typing import TupleAny
+from ..util.typing import Unpack
+
if typing.TYPE_CHECKING:
from ._typing import _EntityType
query_entity: _MapperEntity,
path: AbstractEntityRegistry,
mapper: Mapper[Any],
- result: Result[Any],
+ result: Result[Unpack[TupleAny]],
adapter: Optional[ORMAdapter],
populators: _PopulatorDict,
) -> None:
query_entity: _MapperEntity,
path: AbstractEntityRegistry,
mapper: Mapper[Any],
- result: Result[Any],
+ result: Result[Unpack[TupleAny]],
adapter: Optional[ORMAdapter],
populators: _PopulatorDict,
) -> None:
path: AbstractEntityRegistry,
loadopt: Optional[_LoadElement],
mapper: Mapper[Any],
- result: Result[Any],
+ result: Result[Unpack[TupleAny]],
adapter: Optional[ORMAdapter],
populators: _PopulatorDict,
) -> None:
from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
from ..sql.selectable import SelectState
from ..util import EMPTY_DICT
+from ..util.typing import TupleAny
+from ..util.typing import Unpack
if TYPE_CHECKING:
from ._typing import _IdentityKeyType
_PopulatorDict = Dict[str, List[Tuple[str, Any]]]
-def instances(cursor: CursorResult[Any], context: QueryContext) -> Result[Any]:
+def instances(
+ cursor: CursorResult[Unpack[TupleAny]], context: QueryContext
+) -> Result[Unpack[TupleAny]]:
"""Return a :class:`.Result` given an ORM query context.
:param cursor: a :class:`.CursorResult`, generated by a statement
from ..util import HasMemoized
from ..util import HasMemoized_ro_memoized_attribute
from ..util.typing import Literal
+from ..util.typing import TupleAny
+from ..util.typing import Unpack
if TYPE_CHECKING:
from ._typing import _IdentityKeyType
def identity_key_from_row(
self,
- row: Optional[Union[Row[Any], RowMapping]],
+ row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]],
identity_token: Optional[Any] = None,
adapter: Optional[ORMAdapter] = None,
) -> _IdentityKeyType[_O]:
from ..sql import util as sql_util
from ..sql import visitors
from ..sql._typing import _FromClauseArgument
-from ..sql._typing import _TP
from ..sql.annotation import SupportsCloneAnnotations
from ..sql.base import _entity_namespace_key
from ..sql.base import _generative
from ..sql.selectable import HasSuffixes
from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
from ..sql.selectable import SelectLabelStyle
+from ..util import deprecated
from ..util.typing import Literal
from ..util.typing import Self
+from ..util.typing import TupleAny
+from ..util.typing import TypeVarTuple
+from ..util.typing import Unpack
if TYPE_CHECKING:
__all__ = ["Query", "QueryContext"]
_T = TypeVar("_T", bound=Any)
+_Ts = TypeVarTuple("_Ts")
@inspection._self_inspects
for ent in util.to_list(entities)
]
+ @deprecated(
+ "2.1.0",
+ "The :method:`.Query.tuples` method is deprecated, :class:`.Row` "
+ "now behaves like a tuple and can unpack types directly.",
+ )
def tuples(self: Query[_O]) -> Query[Tuple[_O]]:
"""return a tuple-typed form of this :class:`.Query`.
.. seealso::
+ :ref:`change_10635` - describes a migration path from this
+ workaround for SQLAlchemy 2.1.
+
:meth:`.Result.tuples` - v2 equivalent method.
"""
return stmt
- def _final_statement(self, legacy_query_style: bool = True) -> Select[Any]:
+ def _final_statement(
+ self, legacy_query_style: bool = True
+ ) -> Select[Unpack[TupleAny]]:
"""Return the 'final' SELECT statement for this :class:`.Query`.
This is used by the testing suite only and is fairly inefficient.
@overload
def only_return_tuples(
self: Query[_O], value: Literal[True]
- ) -> RowReturningQuery[Tuple[_O]]:
+ ) -> RowReturningQuery[_O]:
...
@overload
@overload
def with_entities(
self, __ent0: _TCCA[_T0], __ent1: _TCCA[_T1], /
- ) -> RowReturningQuery[Tuple[_T0, _T1]]:
+ ) -> RowReturningQuery[_T0, _T1]:
...
@overload
def with_entities(
self, __ent0: _TCCA[_T0], __ent1: _TCCA[_T1], __ent2: _TCCA[_T2], /
- ) -> RowReturningQuery[Tuple[_T0, _T1, _T2]]:
+ ) -> RowReturningQuery[_T0, _T1, _T2]:
...
@overload
__ent2: _TCCA[_T2],
__ent3: _TCCA[_T3],
/,
- ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3]]:
+ ) -> RowReturningQuery[_T0, _T1, _T2, _T3]:
...
@overload
__ent3: _TCCA[_T3],
__ent4: _TCCA[_T4],
/,
- ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3, _T4]]:
+ ) -> RowReturningQuery[_T0, _T1, _T2, _T3, _T4]:
...
@overload
__ent4: _TCCA[_T4],
__ent5: _TCCA[_T5],
/,
- ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3, _T4, _T5]]:
+ ) -> RowReturningQuery[_T0, _T1, _T2, _T3, _T4, _T5]:
...
@overload
__ent5: _TCCA[_T5],
__ent6: _TCCA[_T6],
/,
- ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6]]:
+ ) -> RowReturningQuery[_T0, _T1, _T2, _T3, _T4, _T5, _T6]:
...
@overload
__ent6: _TCCA[_T6],
__ent7: _TCCA[_T7],
/,
- ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7]]:
+ *entities: _ColumnsClauseArgument[Any],
+ ) -> RowReturningQuery[
+ _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7, Unpack[TupleAny]
+ ]:
...
# END OVERLOADED FUNCTIONS self.with_entities
"""BulkUD which handles DELETEs."""
-class RowReturningQuery(Query[Row[_TP]]):
+class RowReturningQuery(Query[Row[Unpack[_Ts]]]):
if TYPE_CHECKING:
- def tuples(self) -> Query[_TP]: # type: ignore
+ def tuples(self) -> Query[Tuple[Unpack[_Ts]]]: # type: ignore
...
from ..util import ThreadLocalRegistry
from ..util import warn
from ..util import warn_deprecated
+from ..util.typing import TupleAny
+from ..util.typing import TypeVarTuple
+from ..util.typing import Unpack
if TYPE_CHECKING:
from ._typing import _EntityType
from ..sql.selectable import ForUpdateParameter
from ..sql.selectable import TypedReturnsRows
+
_T = TypeVar("_T", bound=Any)
+_Ts = TypeVarTuple("_Ts")
class QueryPropertyDescriptor(Protocol):
@overload
def execute(
self,
- statement: TypedReturnsRows[_T],
+ statement: TypedReturnsRows[Unpack[_Ts]],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[_T]:
+ ) -> Result[Unpack[_Ts]]:
...
@overload
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[TupleAny]]:
...
@overload
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[Any]:
+ ) -> Result[Unpack[TupleAny]]:
...
def execute(
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[Any]:
+ ) -> Result[Unpack[TupleAny]]:
r"""Execute a SQL expression construct.
.. container:: class_bases
@overload
def query(
self, _colexpr: TypedColumnsClauseRole[_T]
- ) -> RowReturningQuery[Tuple[_T]]:
+ ) -> RowReturningQuery[_T]:
...
# START OVERLOADED FUNCTIONS self.query RowReturningQuery 2-8
@overload
def query(
self, __ent0: _TCCA[_T0], __ent1: _TCCA[_T1], /
- ) -> RowReturningQuery[Tuple[_T0, _T1]]:
+ ) -> RowReturningQuery[_T0, _T1]:
...
@overload
def query(
self, __ent0: _TCCA[_T0], __ent1: _TCCA[_T1], __ent2: _TCCA[_T2], /
- ) -> RowReturningQuery[Tuple[_T0, _T1, _T2]]:
+ ) -> RowReturningQuery[_T0, _T1, _T2]:
...
@overload
__ent2: _TCCA[_T2],
__ent3: _TCCA[_T3],
/,
- ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3]]:
+ ) -> RowReturningQuery[_T0, _T1, _T2, _T3]:
...
@overload
__ent3: _TCCA[_T3],
__ent4: _TCCA[_T4],
/,
- ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3, _T4]]:
+ ) -> RowReturningQuery[_T0, _T1, _T2, _T3, _T4]:
...
@overload
__ent4: _TCCA[_T4],
__ent5: _TCCA[_T5],
/,
- ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3, _T4, _T5]]:
+ ) -> RowReturningQuery[_T0, _T1, _T2, _T3, _T4, _T5]:
...
@overload
__ent5: _TCCA[_T5],
__ent6: _TCCA[_T6],
/,
- ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6]]:
+ ) -> RowReturningQuery[_T0, _T1, _T2, _T3, _T4, _T5, _T6]:
...
@overload
__ent6: _TCCA[_T6],
__ent7: _TCCA[_T7],
/,
- ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7]]:
+ *entities: _ColumnsClauseArgument[Any],
+ ) -> RowReturningQuery[
+ _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7, Unpack[TupleAny]
+ ]:
...
# END OVERLOADED FUNCTIONS self.query
@overload
def scalar(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
params: Optional[_CoreSingleExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
@overload
def scalars(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
ident: Union[Any, Tuple[Any, ...]] = None,
*,
instance: Optional[Any] = None,
- row: Optional[Union[Row[Any], RowMapping]] = None,
+ row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None,
identity_token: Optional[Any] = None,
) -> _IdentityKeyType[Any]:
r"""Return an identity key.
from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
from ..util import IdentitySet
from ..util.typing import Literal
+from ..util.typing import TupleAny
+from ..util.typing import TypeVarTuple
+from ..util.typing import Unpack
+
if typing.TYPE_CHECKING:
from ._typing import _EntityType
from ..sql.selectable import TypedReturnsRows
_T = TypeVar("_T", bound=Any)
+_Ts = TypeVarTuple("_Ts")
__all__ = [
"Session",
ident: Union[Any, Tuple[Any, ...]] = None,
*,
instance: Optional[Any] = None,
- row: Optional[Union[Row[Any], RowMapping]] = None,
+ row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None,
identity_token: Optional[Any] = None,
) -> _IdentityKeyType[Any]:
"""Return an identity key.
params: Optional[_CoreAnyExecuteParams] = None,
execution_options: Optional[OrmExecuteOptionsParameter] = None,
bind_arguments: Optional[_BindArguments] = None,
- ) -> Result[Any]:
+ ) -> Result[Unpack[TupleAny]]:
"""Execute the statement represented by this
:class:`.ORMExecuteState`, without re-invoking events that have
already proceeded.
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
_scalar_result: bool = ...,
- ) -> Result[Any]:
+ ) -> Result[Unpack[TupleAny]]:
...
def _execute_internal(
)
for idx, fn in enumerate(events_todo):
orm_exec_state._starting_event_idx = idx
- fn_result: Optional[Result[Any]] = fn(orm_exec_state)
+ fn_result: Optional[Result[Unpack[TupleAny]]] = fn(
+ orm_exec_state
+ )
if fn_result:
if _scalar_result:
return fn_result.scalar()
)
if compile_state_cls:
- result: Result[Any] = compile_state_cls.orm_execute_statement(
+ result: Result[
+ Unpack[TupleAny]
+ ] = compile_state_cls.orm_execute_statement(
self,
statement,
params or {},
@overload
def execute(
self,
- statement: TypedReturnsRows[_T],
+ statement: TypedReturnsRows[Unpack[_Ts]],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[_T]:
+ ) -> Result[Unpack[_Ts]]:
...
@overload
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> CursorResult[Any]:
+ ) -> CursorResult[Unpack[TupleAny]]:
...
@overload
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[Any]:
+ ) -> Result[Unpack[TupleAny]]:
...
def execute(
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[Any]:
+ ) -> Result[Unpack[TupleAny]]:
r"""Execute a SQL expression construct.
Returns a :class:`_engine.Result` object representing
@overload
def scalar(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
params: Optional[_CoreSingleExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
@overload
def scalars(
self,
- statement: TypedReturnsRows[Tuple[_T]],
+ statement: TypedReturnsRows[_T],
params: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
@overload
def query(
self, _colexpr: TypedColumnsClauseRole[_T]
- ) -> RowReturningQuery[Tuple[_T]]:
+ ) -> RowReturningQuery[_T]:
...
# START OVERLOADED FUNCTIONS self.query RowReturningQuery 2-8
@overload
def query(
self, __ent0: _TCCA[_T0], __ent1: _TCCA[_T1], /
- ) -> RowReturningQuery[Tuple[_T0, _T1]]:
+ ) -> RowReturningQuery[_T0, _T1]:
...
@overload
def query(
self, __ent0: _TCCA[_T0], __ent1: _TCCA[_T1], __ent2: _TCCA[_T2], /
- ) -> RowReturningQuery[Tuple[_T0, _T1, _T2]]:
+ ) -> RowReturningQuery[_T0, _T1, _T2]:
...
@overload
__ent2: _TCCA[_T2],
__ent3: _TCCA[_T3],
/,
- ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3]]:
+ ) -> RowReturningQuery[_T0, _T1, _T2, _T3]:
...
@overload
__ent3: _TCCA[_T3],
__ent4: _TCCA[_T4],
/,
- ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3, _T4]]:
+ ) -> RowReturningQuery[_T0, _T1, _T2, _T3, _T4]:
...
@overload
__ent4: _TCCA[_T4],
__ent5: _TCCA[_T5],
/,
- ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3, _T4, _T5]]:
+ ) -> RowReturningQuery[_T0, _T1, _T2, _T3, _T4, _T5]:
...
@overload
__ent5: _TCCA[_T5],
__ent6: _TCCA[_T6],
/,
- ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6]]:
+ ) -> RowReturningQuery[_T0, _T1, _T2, _T3, _T4, _T5, _T6]:
...
@overload
__ent6: _TCCA[_T6],
__ent7: _TCCA[_T7],
/,
- ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7]]:
+ *entities: _ColumnsClauseArgument[Any],
+ ) -> RowReturningQuery[
+ _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7, Unpack[TupleAny]
+ ]:
...
# END OVERLOADED FUNCTIONS self.query
with_for_update = ForUpdateArg._from_argument(with_for_update)
- stmt: Select[Any] = sql.select(object_mapper(instance))
+ stmt: Select[Unpack[TupleAny]] = sql.select(object_mapper(instance))
if (
loading.load_on_ident(
self,
from .. import inspection
from .. import util
from ..util.typing import Literal
+from ..util.typing import TupleAny
+from ..util.typing import Unpack
if TYPE_CHECKING:
from ._typing import _IdentityKeyType
"""
def __call__(
- self, state: InstanceState[_O], dict_: _InstanceDict, row: Row[Any]
+ self,
+ state: InstanceState[_O],
+ dict_: _InstanceDict,
+ row: Row[Unpack[TupleAny]],
) -> None:
...
fixed_impl = impl
def _set_callable(
- state: InstanceState[_O], dict_: _InstanceDict, row: Row[Any]
+ state: InstanceState[_O],
+ dict_: _InstanceDict,
+ row: Row[Unpack[TupleAny]],
) -> None:
if "callables" not in state.__dict__:
state.callables = {}
else:
def _set_callable(
- state: InstanceState[_O], dict_: _InstanceDict, row: Row[Any]
+ state: InstanceState[_O],
+ dict_: _InstanceDict,
+ row: Row[Unpack[TupleAny]],
) -> None:
if "callables" not in state.__dict__:
state.callables = {}
from ..util.typing import eval_name_only as _eval_name_only
from ..util.typing import is_origin_of_cls
from ..util.typing import Literal
+from ..util.typing import TupleAny
from ..util.typing import typing_get_origin
+from ..util.typing import Unpack
if typing.TYPE_CHECKING:
from ._typing import _EntityType
ident: Union[Any, Tuple[Any, ...]] = None,
*,
instance: Optional[_T] = None,
- row: Optional[Union[Row[Any], RowMapping]] = None,
+ row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None,
identity_token: Optional[Any] = None,
) -> _IdentityKeyType[_T]:
r"""Generate "identity key" tuples, as are used as keys in the
def create_row_processor(
self,
- query: Select[Any],
- procs: Sequence[Callable[[Row[Any]], Any]],
+ query: Select[Unpack[TupleAny]],
+ procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]],
labels: Sequence[str],
- ) -> Callable[[Row[Any]], Any]:
+ ) -> Callable[[Row[Unpack[TupleAny]]], Any]:
"""Produce the "row processing" function for this :class:`.Bundle`.
May be overridden by subclasses to provide custom behaviors when
"""
keyed_tuple = result_tuple(labels, [() for l in labels])
- def proc(row: Row[Any]) -> Any:
+ def proc(row: Row[Unpack[TupleAny]]) -> Any:
return keyed_tuple([proc(row) for proc in procs])
return proc
"produce a SQL statement and execute it with session.scalars()."
)
- def select(self) -> Select[Tuple[_T]]:
+ def select(self) -> Select[_T]:
"""Produce a :class:`_sql.Select` construct that represents the
rows within this instance-local :class:`_orm.WriteOnlyCollection`.
from typing import Any
from typing import Optional
from typing import overload
-from typing import Tuple
from typing import TYPE_CHECKING
from typing import TypeVar
from typing import Union
from .selectable import TableClause
from .selectable import TableSample
from .selectable import Values
+from ..util.typing import TupleAny
+from ..util.typing import Unpack
if TYPE_CHECKING:
from ._typing import _FromClauseArgument
@overload
-def select(__ent0: _TCCA[_T0], /) -> Select[Tuple[_T0]]:
+def select(__ent0: _TCCA[_T0], /) -> Select[_T0]:
...
@overload
-def select(
- __ent0: _TCCA[_T0], __ent1: _TCCA[_T1], /
-) -> Select[Tuple[_T0, _T1]]:
+def select(__ent0: _TCCA[_T0], __ent1: _TCCA[_T1], /) -> Select[_T0, _T1]:
...
@overload
def select(
__ent0: _TCCA[_T0], __ent1: _TCCA[_T1], __ent2: _TCCA[_T2], /
-) -> Select[Tuple[_T0, _T1, _T2]]:
+) -> Select[_T0, _T1, _T2]:
...
__ent2: _TCCA[_T2],
__ent3: _TCCA[_T3],
/,
-) -> Select[Tuple[_T0, _T1, _T2, _T3]]:
+) -> Select[_T0, _T1, _T2, _T3]:
...
__ent3: _TCCA[_T3],
__ent4: _TCCA[_T4],
/,
-) -> Select[Tuple[_T0, _T1, _T2, _T3, _T4]]:
+) -> Select[_T0, _T1, _T2, _T3, _T4]:
...
__ent4: _TCCA[_T4],
__ent5: _TCCA[_T5],
/,
-) -> Select[Tuple[_T0, _T1, _T2, _T3, _T4, _T5]]:
+) -> Select[_T0, _T1, _T2, _T3, _T4, _T5]:
...
__ent5: _TCCA[_T5],
__ent6: _TCCA[_T6],
/,
-) -> Select[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6]]:
+) -> Select[_T0, _T1, _T2, _T3, _T4, _T5, _T6]:
...
__ent6: _TCCA[_T6],
__ent7: _TCCA[_T7],
/,
-) -> Select[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7]]:
+) -> Select[_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7]:
...
__ent7: _TCCA[_T7],
__ent8: _TCCA[_T8],
/,
-) -> Select[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7, _T8]]:
+) -> Select[_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7, _T8]:
...
__ent8: _TCCA[_T8],
__ent9: _TCCA[_T9],
/,
-) -> Select[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7, _T8, _T9]]:
+ *entities: _ColumnsClauseArgument[Any],
+) -> Select[
+ _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7, _T8, _T9, Unpack[TupleAny]
+]:
...
@overload
-def select(*entities: _ColumnsClauseArgument[Any], **__kw: Any) -> Select[Any]:
+def select(
+ *entities: _ColumnsClauseArgument[Any], **__kw: Any
+) -> Select[Unpack[TupleAny]]:
...
-def select(*entities: _ColumnsClauseArgument[Any], **__kw: Any) -> Select[Any]:
+def select(
+ *entities: _ColumnsClauseArgument[Any], **__kw: Any
+) -> Select[Unpack[TupleAny]]:
r"""Construct a new :class:`_expression.Select`.
from typing import overload
from typing import Protocol
from typing import Set
-from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
from typing import TypeVar
from .. import util
from ..inspection import Inspectable
from ..util.typing import Literal
+from ..util.typing import TupleAny
from ..util.typing import TypeAlias
+from ..util.typing import Unpack
if TYPE_CHECKING:
from datetime import date
Type[_T],
]
-_TP = TypeVar("_TP", bound=Tuple[Any, ...])
-
_T0 = TypeVar("_T0", bound=Any)
_T1 = TypeVar("_T1", bound=Any)
_T2 = TypeVar("_T2", bound=Any)
def is_select_statement(
t: Union[Executable, ReturnsRows]
- ) -> TypeGuard[Select[Any]]:
+ ) -> TypeGuard[Select[Unpack[TupleAny]]]:
...
def is_table(t: FromClause) -> TypeGuard[TableClause]:
from .. import util
from ..util import FastIntFlag
from ..util.typing import Literal
+from ..util.typing import TupleAny
+from ..util.typing import Unpack
if typing.TYPE_CHECKING:
from .annotation import _AnnotationDict
need_result_map_for_nested: bool
need_result_map_for_compound: bool
select_0: ReturnsRows
- insert_from_select: Select[Any]
+ insert_from_select: Select[Unpack[TupleAny]]
class ExpandedState(NamedTuple):
return text
def _setup_select_hints(
- self, select: Select[Any]
+ self, select: Select[Unpack[TupleAny]]
) -> Tuple[str, _FromHintsType]:
byfrom = {
from_: hinttext
from . import coercions
from . import roles
from . import util as sql_util
-from ._typing import _TP
from ._typing import _unexpected_kw
from ._typing import is_column_element
from ._typing import is_named_from_clause
from .. import exc
from .. import util
from ..util.typing import Self
+from ..util.typing import TupleAny
from ..util.typing import TypeGuard
+from ..util.typing import TypeVarTuple
+from ..util.typing import Unpack
+
if TYPE_CHECKING:
from ._typing import _ColumnExpressionArgument
_T = TypeVar("_T", bound=Any)
+_Ts = TypeVarTuple("_Ts")
_DMLColumnElement = Union[str, ColumnClause[Any]]
_DMLTableElement = Union[TableClause, Alias, Join]
_supports_multi_parameters = False
- select: Optional[Select[Any]] = None
+ select: Optional[Select[Unpack[TupleAny]]] = None
"""SELECT statement for INSERT .. FROM SELECT"""
_post_values_clause: Optional[ClauseElement] = None
/,
*,
sort_by_parameter_order: bool = False,
- ) -> ReturningInsert[Tuple[_T0]]:
+ ) -> ReturningInsert[_T0]:
...
@overload
/,
*,
sort_by_parameter_order: bool = False,
- ) -> ReturningInsert[Tuple[_T0, _T1]]:
+ ) -> ReturningInsert[_T0, _T1]:
...
@overload
/,
*,
sort_by_parameter_order: bool = False,
- ) -> ReturningInsert[Tuple[_T0, _T1, _T2]]:
+ ) -> ReturningInsert[_T0, _T1, _T2]:
...
@overload
/,
*,
sort_by_parameter_order: bool = False,
- ) -> ReturningInsert[Tuple[_T0, _T1, _T2, _T3]]:
+ ) -> ReturningInsert[_T0, _T1, _T2, _T3]:
...
@overload
/,
*,
sort_by_parameter_order: bool = False,
- ) -> ReturningInsert[Tuple[_T0, _T1, _T2, _T3, _T4]]:
+ ) -> ReturningInsert[_T0, _T1, _T2, _T3, _T4]:
...
@overload
/,
*,
sort_by_parameter_order: bool = False,
- ) -> ReturningInsert[Tuple[_T0, _T1, _T2, _T3, _T4, _T5]]:
+ ) -> ReturningInsert[_T0, _T1, _T2, _T3, _T4, _T5]:
...
@overload
/,
*,
sort_by_parameter_order: bool = False,
- ) -> ReturningInsert[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6]]:
+ ) -> ReturningInsert[_T0, _T1, _T2, _T3, _T4, _T5, _T6]:
...
@overload
__ent6: _TCCA[_T6],
__ent7: _TCCA[_T7],
/,
- *,
+ *entities: _ColumnsClauseArgument[Any],
sort_by_parameter_order: bool = False,
- ) -> ReturningInsert[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7]]:
+ ) -> ReturningInsert[
+ _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7, Unpack[TupleAny]
+ ]:
...
# END OVERLOADED FUNCTIONS self.returning
...
-class ReturningInsert(Insert, TypedReturnsRows[_TP]):
+class ReturningInsert(Insert, TypedReturnsRows[Unpack[_Ts]]):
"""Typing-only class that establishes a generic type form of
:class:`.Insert` which tracks returned column types.
# statically generated** by tools/generate_tuple_map_overloads.py
@overload
- def returning(
- self, __ent0: _TCCA[_T0], /
- ) -> ReturningUpdate[Tuple[_T0]]:
+ def returning(self, __ent0: _TCCA[_T0], /) -> ReturningUpdate[_T0]:
...
@overload
def returning(
self, __ent0: _TCCA[_T0], __ent1: _TCCA[_T1], /
- ) -> ReturningUpdate[Tuple[_T0, _T1]]:
+ ) -> ReturningUpdate[_T0, _T1]:
...
@overload
def returning(
self, __ent0: _TCCA[_T0], __ent1: _TCCA[_T1], __ent2: _TCCA[_T2], /
- ) -> ReturningUpdate[Tuple[_T0, _T1, _T2]]:
+ ) -> ReturningUpdate[_T0, _T1, _T2]:
...
@overload
__ent2: _TCCA[_T2],
__ent3: _TCCA[_T3],
/,
- ) -> ReturningUpdate[Tuple[_T0, _T1, _T2, _T3]]:
+ ) -> ReturningUpdate[_T0, _T1, _T2, _T3]:
...
@overload
__ent3: _TCCA[_T3],
__ent4: _TCCA[_T4],
/,
- ) -> ReturningUpdate[Tuple[_T0, _T1, _T2, _T3, _T4]]:
+ ) -> ReturningUpdate[_T0, _T1, _T2, _T3, _T4]:
...
@overload
__ent4: _TCCA[_T4],
__ent5: _TCCA[_T5],
/,
- ) -> ReturningUpdate[Tuple[_T0, _T1, _T2, _T3, _T4, _T5]]:
+ ) -> ReturningUpdate[_T0, _T1, _T2, _T3, _T4, _T5]:
...
@overload
__ent5: _TCCA[_T5],
__ent6: _TCCA[_T6],
/,
- ) -> ReturningUpdate[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6]]:
+ ) -> ReturningUpdate[_T0, _T1, _T2, _T3, _T4, _T5, _T6]:
...
@overload
__ent6: _TCCA[_T6],
__ent7: _TCCA[_T7],
/,
- ) -> ReturningUpdate[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7]]:
+ *entities: _ColumnsClauseArgument[Any],
+ ) -> ReturningUpdate[
+ _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7, Unpack[TupleAny]
+ ]:
...
# END OVERLOADED FUNCTIONS self.returning
...
-class ReturningUpdate(Update, TypedReturnsRows[_TP]):
+class ReturningUpdate(Update, TypedReturnsRows[Unpack[_Ts]]):
"""Typing-only class that establishes a generic type form of
:class:`.Update` which tracks returned column types.
# statically generated** by tools/generate_tuple_map_overloads.py
@overload
- def returning(
- self, __ent0: _TCCA[_T0], /
- ) -> ReturningDelete[Tuple[_T0]]:
+ def returning(self, __ent0: _TCCA[_T0], /) -> ReturningDelete[_T0]:
...
@overload
def returning(
self, __ent0: _TCCA[_T0], __ent1: _TCCA[_T1], /
- ) -> ReturningDelete[Tuple[_T0, _T1]]:
+ ) -> ReturningDelete[_T0, _T1]:
...
@overload
def returning(
self, __ent0: _TCCA[_T0], __ent1: _TCCA[_T1], __ent2: _TCCA[_T2], /
- ) -> ReturningDelete[Tuple[_T0, _T1, _T2]]:
+ ) -> ReturningDelete[_T0, _T1, _T2]:
...
@overload
__ent2: _TCCA[_T2],
__ent3: _TCCA[_T3],
/,
- ) -> ReturningDelete[Tuple[_T0, _T1, _T2, _T3]]:
+ ) -> ReturningDelete[_T0, _T1, _T2, _T3]:
...
@overload
__ent3: _TCCA[_T3],
__ent4: _TCCA[_T4],
/,
- ) -> ReturningDelete[Tuple[_T0, _T1, _T2, _T3, _T4]]:
+ ) -> ReturningDelete[_T0, _T1, _T2, _T3, _T4]:
...
@overload
__ent4: _TCCA[_T4],
__ent5: _TCCA[_T5],
/,
- ) -> ReturningDelete[Tuple[_T0, _T1, _T2, _T3, _T4, _T5]]:
+ ) -> ReturningDelete[_T0, _T1, _T2, _T3, _T4, _T5]:
...
@overload
__ent5: _TCCA[_T5],
__ent6: _TCCA[_T6],
/,
- ) -> ReturningDelete[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6]]:
+ ) -> ReturningDelete[_T0, _T1, _T2, _T3, _T4, _T5, _T6]:
...
@overload
__ent6: _TCCA[_T6],
__ent7: _TCCA[_T7],
/,
- ) -> ReturningDelete[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7]]:
+ *entities: _ColumnsClauseArgument[Any],
+ ) -> ReturningDelete[
+ _T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7, Unpack[TupleAny]
+ ]:
...
# END OVERLOADED FUNCTIONS self.returning
@overload
def returning(
self, *cols: _ColumnsClauseArgument[Any], **__kw: Any
- ) -> ReturningDelete[Any]:
+ ) -> ReturningDelete[Unpack[TupleAny]]:
...
def returning(
self, *cols: _ColumnsClauseArgument[Any], **__kw: Any
- ) -> ReturningDelete[Any]:
+ ) -> ReturningDelete[Unpack[TupleAny]]:
...
-class ReturningDelete(Update, TypedReturnsRows[_TP]):
+class ReturningDelete(Update, TypedReturnsRows[Unpack[_Ts]]):
"""Typing-only class that establishes a generic type form of
:class:`.Delete` which tracks returned column types.
from ..util import TypingOnly
from ..util.typing import Literal
from ..util.typing import Self
+from ..util.typing import TupleAny
+from ..util.typing import Unpack
if typing.TYPE_CHECKING:
from ._typing import _ByArgument
connection: Connection,
distilled_params: _CoreMultiExecuteParams,
execution_options: CoreExecuteOptionsParameter,
- ) -> Result[Any]:
+ ) -> Result[Unpack[TupleAny]]:
if self.supports_execution:
if TYPE_CHECKING:
assert isinstance(self, Executable)
else:
check_value = value
cast(
- "BindParameter[typing_Tuple[Any, ...]]", self
+ "BindParameter[TupleAny]", self
).type = type_._resolve_values_to_types(check_value)
else:
- cast(
- "BindParameter[typing_Tuple[Any, ...]]", self
- ).type = type_
+ cast("BindParameter[TupleAny]", self).type = type_
else:
self.type = type_
or_ = BooleanClauseList.or_
-class Tuple(ClauseList, ColumnElement[typing_Tuple[Any, ...]]):
+class Tuple(ClauseList, ColumnElement[TupleAny]):
"""Represent a SQL tuple."""
__visit_name__ = "tuple"
joins_implicitly=joins_implicitly,
)
- def select(self) -> Select[Tuple[_T]]:
+ def select(self) -> Select[_T]:
"""Produce a :func:`_expression.select` construct
against this :class:`.FunctionElement`.
s = select(function_element)
"""
- s: Select[Any] = Select(self)
+ s: Select[_T] = Select(self)
if self._execution_options:
s = s.execution_options(**self._execution_options)
return s
from . import visitors
from ._typing import _ColumnsClauseArgument
from ._typing import _no_kw
-from ._typing import _TP
from ._typing import is_column_element
from ._typing import is_select_statement
from ._typing import is_subquery
from ..util import HasMemoized_ro_memoized_attribute
from ..util.typing import Literal
from ..util.typing import Self
+from ..util.typing import TupleAny
+from ..util.typing import TypeVarTuple
+from ..util.typing import Unpack
+
and_ = BooleanClauseList.and_
_T = TypeVar("_T", bound=Any)
+_Ts = TypeVarTuple("_Ts")
+
if TYPE_CHECKING:
from ._typing import _ColumnExpressionArgument
"""base for executable statements that return rows."""
-class TypedReturnsRows(ExecutableReturnsRows, Generic[_TP]):
+class TypedReturnsRows(ExecutableReturnsRows, Generic[Unpack[_Ts]]):
"""base for executable statements that return rows."""
_use_schema_map = False
- def select(self) -> Select[Any]:
+ def select(self) -> Select[Unpack[TupleAny]]:
r"""Return a SELECT of this :class:`_expression.FromClause`.
"join explicitly." % (a.description, b.description)
)
- def select(self) -> Select[Any]:
+ def select(self) -> Select[Unpack[TupleAny]]:
r"""Create a :class:`_expression.Select` from this
:class:`_expression.Join`.
def _init(
self,
- selectable: Select[Any],
+ selectable: Select[Unpack[TupleAny]],
*,
name: Optional[str] = None,
recursive: bool = False,
"first in order to create "
"a subquery, which then can be selected.",
)
- def select(self, *arg: Any, **kw: Any) -> Select[Any]:
+ def select(self, *arg: Any, **kw: Any) -> Select[Unpack[TupleAny]]:
return self._implicit_subquery.select(*arg, **kw)
@HasMemoized.memoized_attribute
def __init__(
self,
- statement: Select[Any],
+ statement: Select[Unpack[TupleAny]],
compiler: Optional[SQLCompiler],
**kw: Any,
):
@classmethod
def get_column_descriptions(
- cls, statement: Select[Any]
+ cls, statement: Select[Unpack[TupleAny]]
) -> List[Dict[str, Any]]:
return [
{
@classmethod
def from_statement(
- cls, statement: Select[Any], from_statement: roles.ReturnsRowsRole
+ cls,
+ statement: Select[Unpack[TupleAny]],
+ from_statement: roles.ReturnsRowsRole,
) -> ExecutableReturnsRows:
cls._plugin_not_implemented()
@classmethod
def get_columns_clause_froms(
- cls, statement: Select[Any]
+ cls, statement: Select[Unpack[TupleAny]]
) -> List[FromClause]:
return cls._normalize_froms(
itertools.chain.from_iterable(
return go
- def _get_froms(self, statement: Select[Any]) -> List[FromClause]:
+ def _get_froms(
+ self, statement: Select[Unpack[TupleAny]]
+ ) -> List[FromClause]:
ambiguous_table_name_map: _AmbiguousTableNameMap
self._ambiguous_table_name_map = ambiguous_table_name_map = {}
def _normalize_froms(
cls,
iterable_of_froms: Iterable[FromClause],
- check_statement: Optional[Select[Any]] = None,
+ check_statement: Optional[Select[Unpack[TupleAny]]] = None,
ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] = None,
) -> List[FromClause]:
"""given an iterable of things to select FROM, reduce them to what
@classmethod
def determine_last_joined_entity(
- cls, stmt: Select[Any]
+ cls, stmt: Select[Unpack[TupleAny]]
) -> Optional[_JoinTargetElement]:
if stmt._setup_joins:
return stmt._setup_joins[-1][0]
return None
@classmethod
- def all_selected_columns(cls, statement: Select[Any]) -> _SelectIterable:
+ def all_selected_columns(
+ cls, statement: Select[Unpack[TupleAny]]
+ ) -> _SelectIterable:
return [c for c in _select_iterables(statement._raw_columns)]
def _setup_joins(
return c
@classmethod
- def _generate_for_statement(cls, select_stmt: Select[Any]) -> None:
+ def _generate_for_statement(
+ cls, select_stmt: Select[Unpack[TupleAny]]
+ ) -> None:
if select_stmt._setup_joins or select_stmt._with_options:
self = _MemoizedSelectEntities()
self._raw_columns = select_stmt._raw_columns
HasCompileState,
_SelectFromElements,
GenerativeSelect,
- TypedReturnsRows[_TP],
+ TypedReturnsRows[Unpack[_Ts]],
):
"""Represents a ``SELECT`` statement.
_compile_state_factory: Type[SelectState]
@classmethod
- def _create_raw_select(cls, **kw: Any) -> Select[Any]:
+ def _create_raw_select(cls, **kw: Any) -> Select[Unpack[TupleAny]]:
"""Create a :class:`.Select` using raw ``__new__`` with no coercions.
Used internally to build up :class:`.Select` constructs with
@overload
def scalar_subquery(
- self: Select[Tuple[_MAYBE_ENTITY]],
+ self: Select[_MAYBE_ENTITY],
) -> ScalarSelect[Any]:
...
@overload
def scalar_subquery(
- self: Select[Tuple[_NOT_ENTITY]],
+ self: Select[_NOT_ENTITY],
) -> ScalarSelect[_NOT_ENTITY]:
...
@_generative
def add_columns(
self, *entities: _ColumnsClauseArgument[Any]
- ) -> Select[Any]:
+ ) -> Select[Unpack[TupleAny]]:
r"""Return a new :func:`_expression.select` construct with
the given entities appended to its columns clause.
"be removed in a future release. Please use "
":meth:`_expression.Select.add_columns`",
)
- def column(self, column: _ColumnsClauseArgument[Any]) -> Select[Any]:
+ def column(
+ self, column: _ColumnsClauseArgument[Any]
+ ) -> Select[Unpack[TupleAny]]:
"""Return a new :func:`_expression.select` construct with
the given column expression added to its columns clause.
return self.add_columns(column)
@util.preload_module("sqlalchemy.sql.util")
- def reduce_columns(self, only_synonyms: bool = True) -> Select[Any]:
+ def reduce_columns(
+ self, only_synonyms: bool = True
+ ) -> Select[Unpack[TupleAny]]:
"""Return a new :func:`_expression.select` construct with redundantly
named, equivalently-valued columns removed from the columns clause.
all columns that are equivalent to another are removed.
"""
- woc: Select[Any]
+ woc: Select[Unpack[TupleAny]]
woc = self.with_only_columns(
*util.preloaded.sql_util.reduce_columns(
self._all_selected_columns,
# statically generated** by tools/generate_sel_v1_overloads.py
@overload
- def with_only_columns(self, __ent0: _TCCA[_T0]) -> Select[Tuple[_T0]]:
+ def with_only_columns(self, __ent0: _TCCA[_T0]) -> Select[_T0]:
...
@overload
def with_only_columns(
self, __ent0: _TCCA[_T0], __ent1: _TCCA[_T1]
- ) -> Select[Tuple[_T0, _T1]]:
+ ) -> Select[_T0, _T1]:
...
@overload
def with_only_columns(
self, __ent0: _TCCA[_T0], __ent1: _TCCA[_T1], __ent2: _TCCA[_T2]
- ) -> Select[Tuple[_T0, _T1, _T2]]:
+ ) -> Select[_T0, _T1, _T2]:
...
@overload
__ent1: _TCCA[_T1],
__ent2: _TCCA[_T2],
__ent3: _TCCA[_T3],
- ) -> Select[Tuple[_T0, _T1, _T2, _T3]]:
+ ) -> Select[_T0, _T1, _T2, _T3]:
...
@overload
__ent2: _TCCA[_T2],
__ent3: _TCCA[_T3],
__ent4: _TCCA[_T4],
- ) -> Select[Tuple[_T0, _T1, _T2, _T3, _T4]]:
+ ) -> Select[_T0, _T1, _T2, _T3, _T4]:
...
@overload
__ent3: _TCCA[_T3],
__ent4: _TCCA[_T4],
__ent5: _TCCA[_T5],
- ) -> Select[Tuple[_T0, _T1, _T2, _T3, _T4, _T5]]:
+ ) -> Select[_T0, _T1, _T2, _T3, _T4, _T5]:
...
@overload
__ent4: _TCCA[_T4],
__ent5: _TCCA[_T5],
__ent6: _TCCA[_T6],
- ) -> Select[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6]]:
+ ) -> Select[_T0, _T1, _T2, _T3, _T4, _T5, _T6]:
...
@overload
__ent5: _TCCA[_T5],
__ent6: _TCCA[_T6],
__ent7: _TCCA[_T7],
- ) -> Select[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7]]:
+ ) -> Select[_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7]:
...
# END OVERLOADED FUNCTIONS self.with_only_columns
*entities: _ColumnsClauseArgument[Any],
maintain_column_froms: bool = False,
**__kw: Any,
- ) -> Select[Any]:
+ ) -> Select[Unpack[TupleAny]]:
...
@_generative
*entities: _ColumnsClauseArgument[Any],
maintain_column_froms: bool = False,
**__kw: Any,
- ) -> Select[Any]:
+ ) -> Select[Unpack[TupleAny]]:
r"""Return a new :func:`_expression.select` construct with its columns
clause replaced with the given entities.
meth = SelectState.get_plugin_class(self).all_selected_columns
return list(meth(self))
- def _ensure_disambiguated_names(self) -> Select[Any]:
+ def _ensure_disambiguated_names(self) -> Select[Unpack[TupleAny]]:
if self._label_style is LABEL_STYLE_NONE:
self = self.set_label_style(LABEL_STYLE_DISAMBIGUATE_ONLY)
return self
by this :class:`_expression.ScalarSelect`.
"""
- self.element = cast("Select[Any]", self.element).where(crit)
+ self.element = cast("Select[Unpack[TupleAny]]", self.element).where(
+ crit
+ )
return self
@overload
if TYPE_CHECKING:
- def _ungroup(self) -> Select[Any]:
+ def _ungroup(self) -> Select[Unpack[TupleAny]]:
...
@_generative
"""
- self.element = cast("Select[Any]", self.element).correlate(
- *fromclauses
- )
+ self.element = cast(
+ "Select[Unpack[TupleAny]]", self.element
+ ).correlate(*fromclauses)
return self
@_generative
"""
- self.element = cast("Select[Any]", self.element).correlate_except(
- *fromclauses
- )
+ self.element = cast(
+ "Select[Unpack[TupleAny]]", self.element
+ ).correlate_except(*fromclauses)
return self
"""
inherit_cache = True
- element: Union[SelectStatementGrouping[Select[Any]], ScalarSelect[Any]]
+ element: Union[
+ SelectStatementGrouping[Select[Unpack[TupleAny]]],
+ ScalarSelect[Any],
+ ]
def __init__(
self,
return []
def _regroup(
- self, fn: Callable[[Select[Any]], Select[Any]]
- ) -> SelectStatementGrouping[Select[Any]]:
+ self,
+ fn: Callable[[Select[Unpack[TupleAny]]], Select[Unpack[TupleAny]]],
+ ) -> SelectStatementGrouping[Select[Unpack[TupleAny]]]:
element = self.element._ungroup()
new_element = fn(element)
assert isinstance(return_value, SelectStatementGrouping)
return return_value
- def select(self) -> Select[Any]:
+ def select(self) -> Select[Unpack[TupleAny]]:
r"""Return a SELECT of this :class:`_expression.Exists`.
e.g.::
from ..util import OrderedDict
from ..util.typing import is_literal
from ..util.typing import Literal
+from ..util.typing import TupleAny
from ..util.typing import typing_get_args
if TYPE_CHECKING:
)
-class TupleType(TypeEngine[Tuple[Any, ...]]):
+class TupleType(TypeEngine[TupleAny]):
"""represent the composite type of a Tuple."""
_is_tuple_type = True
from .. import exc
from .. import util
from ..util.typing import Literal
+from ..util.typing import Unpack
if typing.TYPE_CHECKING:
from ._typing import _EquivalentColumnMap
__slots__ = ("row",)
- def __init__(self, row: Row[Any], max_chars: int = 300):
+ def __init__(
+ self, row: Row[Unpack[Tuple[Any, ...]]], max_chars: int = 300
+ ):
self.row = row
self.max_chars = max_chars
from typing_extensions import ParamSpec as ParamSpec # 3.10
from typing_extensions import TypeAlias as TypeAlias # 3.10
from typing_extensions import TypeGuard as TypeGuard # 3.10
+ from typing_extensions import TypeVarTuple as TypeVarTuple # 3.11
from typing_extensions import Self as Self # 3.11
from typing_extensions import TypeAliasType as TypeAliasType # 3.12
+ from typing_extensions import Unpack as Unpack # 3.11
+
_T = TypeVar("_T", bound=Any)
_KT = TypeVar("_KT")
_VT = TypeVar("_VT")
_VT_co = TypeVar("_VT_co", covariant=True)
+TupleAny = Tuple[Any, ...]
+
if compat.py310:
# why they took until py310 to put this in stdlib is beyond me,
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_true
+from sqlalchemy.testing.assertions import expect_deprecated
from sqlalchemy.testing.assertions import expect_raises
from sqlalchemy.testing.util import picklers
from sqlalchemy.util import compat
eq_(m1.fetchone(), {"a": 1, "b": 1, "c": 1})
eq_(r1.fetchone(), (2, 1, 2))
+ @expect_deprecated(".*is deprecated, Row now behaves like a tuple.*")
def test_tuples_plus_base(self):
r1 = self._fixture()
from sqlalchemy.testing.assertions import assert_raises_message
from sqlalchemy.testing.assertions import assert_warns_message
from sqlalchemy.testing.assertions import eq_
+from sqlalchemy.testing.assertions import expect_deprecated
from sqlalchemy.testing.assertions import expect_raises
from sqlalchemy.testing.assertions import expect_warnings
from sqlalchemy.testing.assertions import is_not_none
assert isinstance(row, collections_abc.Sequence)
assert isinstance(row._mapping, collections_abc.Mapping)
+ @expect_deprecated(".*is deprecated, Row now behaves like a tuple.*")
def test_single_entity_tuples(self):
User = self.classes.User
query = fixture_session().query(User).tuples()
assert isinstance(row, collections_abc.Sequence)
assert isinstance(row._mapping, collections_abc.Mapping)
+ @expect_deprecated(".*is deprecated, Row now behaves like a tuple.*")
def test_multiple_entity_true_tuples(self):
User = self.classes.User
query = fixture_session().query(User.id, User).tuples()
from sqlalchemy.testing import assertions
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_deprecated
from sqlalchemy.testing import expect_raises
from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import fixtures
r = connection.scalars(users.select().order_by(users.c.user_id))
eq_(r.all(), [7, 8, 9])
+ @expect_deprecated(".*is deprecated, Row now behaves like a tuple.*")
def test_result_tuples(self, connection):
users = self.tables.users
).tuples()
eq_(r.all(), [(7, "jack"), (8, "ed"), (9, "fred")])
+ @expect_deprecated(".*is deprecated, Row now behaves like a tuple.*")
def test_row_tuple(self, connection):
users = self.tables.users
result = conn.execute(text("select * from table"))
- # EXPECTED_TYPE: CursorResult[Any]
+ # EXPECTED_TYPE: CursorResult[Unpack[.*tuple[Any, ...]]]
reveal_type(result)
with e.begin() as conn:
result = conn.execute(text("select * from table"))
- # EXPECTED_TYPE: CursorResult[Any]
+ # EXPECTED_TYPE: CursorResult[Unpack[.*tuple[Any, ...]]]
reveal_type(result)
engine = create_engine("postgresql://scott:tiger@localhost/test")
result = await conn.execute(text("select * from table"))
- # EXPECTED_TYPE: CursorResult[Any]
+ # EXPECTED_TYPE: CursorResult[Unpack[.*tuple[Any, ...]]]
reveal_type(result)
# stream with direct await
async_result = await conn.stream(text("select * from table"))
- # EXPECTED_TYPE: AsyncResult[Any]
+ # EXPECTED_TYPE: AsyncResult[Unpack[.*tuple[Any, ...]]]
reveal_type(async_result)
# stream with context manager
async with conn.stream(
text("select * from table")
) as ctx_async_result:
- # EXPECTED_TYPE: AsyncResult[Any]
+ # EXPECTED_TYPE: AsyncResult[Unpack[.*tuple[Any, ...]]]
reveal_type(ctx_async_result)
# stream_scalars with direct await
result = await conn.execute(text("select * from table"))
- # EXPECTED_TYPE: CursorResult[Any]
+ # EXPECTED_TYPE: CursorResult[Unpack[.*tuple[Any, ...]]]
reveal_type(result)
# EXPECTED_TYPE: SQLCoreOperations[bool]
reveal_type(expr4)
- # EXPECTED_TYPE: Select[Tuple[bool]]
+ # EXPECTED_TYPE: Select[bool]
reveal_type(stmt2)
stmt = select(Vertex).where(Vertex.start.in_([Point(3, 4)]))
-# EXPECTED_TYPE: Select[Tuple[Vertex]]
+# EXPECTED_TYPE: Select[Vertex]
reveal_type(stmt)
# EXPECTED_TYPE: composite.Point
stmt = select(Vertex).where(Vertex.start.in_([Point(3, 4)]))
-# EXPECTED_TYPE: Select[Tuple[Vertex]]
+# EXPECTED_TYPE: Select[Vertex]
reveal_type(stmt)
# EXPECTED_TYPE: composite.Point
def do_something_with_mapped_class(
cls_: MappedClassProtocol[Employee],
) -> None:
- # EXPECTED_TYPE: Select[Any]
+ # EXPECTED_TYPE: Select[Unpack[.*tuple[Any, ...]]]
reveal_type(cls_.__table__.select())
# EXPECTED_TYPE: Mapper[Employee]
message_query = select(Message)
if TYPE_CHECKING:
- # EXPECTED_TYPE: Select[Tuple[Message]]
+ # EXPECTED_TYPE: Select[Message]
reveal_type(message_query)
return session.scalars(message_query).all()
poly_query = select(PolymorphicMessage)
if TYPE_CHECKING:
- # EXPECTED_TYPE: Select[Tuple[Message]]
+ # EXPECTED_TYPE: Select[Message]
reveal_type(poly_query)
return session.scalars(poly_query).all()
q2 = sess.query(User.id).filter_by(id=7)
rows2 = q2.all()
- # EXPECTED_TYPE: List[Row[Tuple[int]]]
+ # EXPECTED_TYPE: List[.*Row[.*int].*]
reveal_type(rows2)
# test #8280
# test #9125
for row in sess.query(User.id, User.name):
- # EXPECTED_TYPE: Row[Tuple[int, str]]
+ # EXPECTED_TYPE: .*Row[int, str].*
reveal_type(row)
for uobj1 in sess.query(User):
def t_select_1() -> None:
stmt = select(User.id, User.name).filter(User.id == 5)
- # EXPECTED_TYPE: Select[Tuple[int, str]]
+ # EXPECTED_TYPE: Select[int, str]
reveal_type(stmt)
result = session.execute(stmt)
- # EXPECTED_TYPE: Result[Tuple[int, str]]
+ # EXPECTED_TYPE: .*Result[int, str].*
reveal_type(result)
.fetch(User.id)
)
- # EXPECTED_TYPE: Select[Tuple[User]]
+ # EXPECTED_TYPE: Select[User]
reveal_type(stmt)
result = session.execute(stmt)
- # EXPECTED_TYPE: Result[Tuple[User]]
+ # EXPECTED_TYPE: .*Result[User].*
reveal_type(result)
stmt = select(ua.id, ua.name).filter(User.id == 5)
- # EXPECTED_TYPE: Select[Tuple[int, str]]
+ # EXPECTED_TYPE: Select[int, str]
reveal_type(stmt)
result = session.execute(stmt)
- # EXPECTED_TYPE: Result[Tuple[int, str]]
+ # EXPECTED_TYPE: .*Result[int, str].*
reveal_type(result)
ua = aliased(User)
stmt = select(ua, User).filter(User.id == 5)
- # EXPECTED_TYPE: Select[Tuple[User, User]]
+ # EXPECTED_TYPE: Select[User, User]
reveal_type(stmt)
result = session.execute(stmt)
- # EXPECTED_TYPE: Result[Tuple[User, User]]
+ # EXPECTED_TYPE: Result[User, User]
reveal_type(result)
reveal_type(q1.all())
# mypy switches to builtins.list for some reason here
- # EXPECTED_RE_TYPE: .*\.[Ll]ist\[.*Row\*?\[Tuple\[.*User\]\]\]
+ # EXPECTED_RE_TYPE: .*\.[Ll]ist\[.*Row\*?\[.*User\].*\]
reveal_type(q1.only_return_tuples(True).all())
# EXPECTED_TYPE: List[Tuple[User]]
def t_legacy_query_cols_1() -> None:
q1 = session.query(User.id, User.name).filter(User.id == 5)
- # EXPECTED_TYPE: RowReturningQuery[Tuple[int, str]]
+ # EXPECTED_TYPE: RowReturningQuery[int, str]
reveal_type(q1)
- # EXPECTED_TYPE: Row[Tuple[int, str]]
+ # EXPECTED_TYPE: .*Row[int, str].*
reveal_type(q1.one())
r1 = q1.one()
- x, y = r1.t
+ x, y = r1
# EXPECTED_TYPE: int
reveal_type(x)
def t_legacy_query_cols_tupleq_1() -> None:
q1 = session.query(User.id, User.name).filter(User.id == 5)
- # EXPECTED_TYPE: RowReturningQuery[Tuple[int, str]]
+ # EXPECTED_TYPE: RowReturningQuery[int, str]
reveal_type(q1)
q2 = q1.tuples()
q2 = q1.with_entities(User.id, User.name)
- # EXPECTED_TYPE: RowReturningQuery[Tuple[int, str]]
+ # EXPECTED_TYPE: RowReturningQuery[int, str]
reveal_type(q2)
- # EXPECTED_TYPE: Row[Tuple[int, str]]
+ # EXPECTED_TYPE: .*Row[int, str].*
reveal_type(q2.one())
r1 = q2.one()
- x, y = r1.t
+ x, y = r1
# EXPECTED_TYPE: int
reveal_type(x)
def t_select_with_only_cols() -> None:
q1 = select(User).where(User.id == 5)
- # EXPECTED_TYPE: Select[Tuple[User]]
+ # EXPECTED_TYPE: Select[User]
reveal_type(q1)
q2 = q1.with_only_columns(User.id, User.name)
- # EXPECTED_TYPE: Select[Tuple[int, str]]
+ # EXPECTED_TYPE: Select[int, str]
reveal_type(q2)
row = connection.execute(q2).one()
- # EXPECTED_TYPE: Row[Tuple[int, str]]
+ # EXPECTED_TYPE: .*Row[int, str].*
reveal_type(row)
- x, y = row.t
+ x, y = row
# EXPECTED_TYPE: int
reveal_type(x)
a1 = aliased(User)
q1 = session.query(User, a1, User.name).filter(User.id == 5)
- # EXPECTED_TYPE: RowReturningQuery[Tuple[User, User, str]]
+ # EXPECTED_TYPE: RowReturningQuery[User, User, str]
reveal_type(q1)
- # EXPECTED_TYPE: Row[Tuple[User, User, str]]
+ # EXPECTED_TYPE: .*Row[User, User, str].*
reveal_type(q1.one())
r1 = q1.one()
- x, y, z = r1.t
+ x, y, z = r1
# EXPECTED_TYPE: User
reveal_type(x)
a1 = aliased(User)
q2 = q1.with_entities(User, a1, User.name).filter(User.id == 5)
- # EXPECTED_TYPE: RowReturningQuery[Tuple[User, User, str]]
+ # EXPECTED_TYPE: RowReturningQuery[User, User, str]
reveal_type(q2)
- # EXPECTED_TYPE: Row[Tuple[User, User, str]]
+ # EXPECTED_TYPE: .*Row[User, User, str].*
reveal_type(q2.one())
r1 = q2.one()
- x, y, z = r1.t
+ x, y, z = r1
# EXPECTED_TYPE: User
reveal_type(x)
q2 = q1.add_columns(User.data)
# note this should not match Select
- # EXPECTED_TYPE: Select[Any]
+ # EXPECTED_TYPE: Select[Unpack[.*tuple[Any, ...]]]
reveal_type(q2)
# mypy would downgrade to Any rather than picking the basemost type.
# with typing integrated into Select etc. we can at least get a Select
# object back.
- # EXPECTED_TYPE: Select[Any]
+ # EXPECTED_TYPE: Select[Unpack[.*tuple[Any, ...]]]
reveal_type(s2)
# so a fully explicit type may be given
# plain FromClause etc we at least get Select
s3 = select(s1)
- # EXPECTED_TYPE: Select[Any]
+ # EXPECTED_TYPE: Select[Unpack[.*tuple[Any, ...]]]
reveal_type(s3)
t1 = User.__table__
s4 = select(t1)
- # EXPECTED_TYPE: Select[Any]
+ # EXPECTED_TYPE: Select[Unpack[.*tuple[Any, ...]]]
reveal_type(s4)
r1 = session.execute(s1)
- # EXPECTED_TYPE: Result[Tuple[int, str]]
+ # EXPECTED_TYPE: Result[int, str]
reveal_type(r1)
s2 = insert(User).returning(User)
r2 = session.execute(s2)
- # EXPECTED_TYPE: Result[Tuple[User]]
+ # EXPECTED_TYPE: Result[User]
reveal_type(r2)
s3 = insert(User).returning(func.foo(), column("q"))
- # EXPECTED_TYPE: ReturningInsert[Any]
+ # EXPECTED_TYPE: ReturningInsert[Unpack[.*tuple[Any, ...]]]
reveal_type(s3)
r3 = session.execute(s3)
- # EXPECTED_TYPE: Result[Any]
+ # EXPECTED_TYPE: Result[Unpack[.*tuple[Any, ...]]]
reveal_type(r3)
def t_dml_bare_insert() -> None:
s1 = insert(User)
r1 = session.execute(s1)
- # EXPECTED_TYPE: CursorResult[Any]
+ # EXPECTED_TYPE: CursorResult[Unpack[.*tuple[Any, ...]]]
reveal_type(r1)
# EXPECTED_TYPE: int
reveal_type(r1.rowcount)
def t_dml_bare_update() -> None:
s1 = update(User)
r1 = session.execute(s1)
- # EXPECTED_TYPE: CursorResult[Any]
+ # EXPECTED_TYPE: CursorResult[Unpack[.*tuple[Any, ...]]]
reveal_type(r1)
# EXPECTED_TYPE: int
reveal_type(r1.rowcount)
def t_dml_update_with_values() -> None:
s1 = update(User).values({User.id: 123, User.data: "value"})
r1 = session.execute(s1)
- # EXPECTED_TYPE: CursorResult[Any]
+ # EXPECTED_TYPE: CursorResult[Unpack[.*tuple[Any, ...]]]
reveal_type(r1)
# EXPECTED_TYPE: int
reveal_type(r1.rowcount)
def t_dml_bare_delete() -> None:
s1 = delete(User)
r1 = session.execute(s1)
- # EXPECTED_TYPE: CursorResult[Any]
+ # EXPECTED_TYPE: CursorResult[Unpack[.*tuple[Any, ...]]]
reveal_type(r1)
# EXPECTED_TYPE: int
reveal_type(r1.rowcount)
r1 = session.execute(s1)
- # EXPECTED_TYPE: Result[Tuple[int, str]]
+ # EXPECTED_TYPE: Result[int, str]
reveal_type(r1)
r1 = session.execute(s1)
- # EXPECTED_TYPE: Result[Tuple[int, str]]
+ # EXPECTED_TYPE: Result[int, str]
reveal_type(r1)
stmt = select(e1)
-# EXPECTED_TYPE: Select[Tuple[bool]]
+# EXPECTED_TYPE: Select[bool]
reveal_type(stmt)
stmt = stmt.where(e1)
stmt = select(e2)
-# EXPECTED_TYPE: Select[Tuple[bool]]
+# EXPECTED_TYPE: Select[bool]
reveal_type(stmt)
stmt = stmt.where(e2)
stmt2 = (
select(User.id).order_by(asc("id"), desc("email")).group_by("email", "id")
)
-# EXPECTED_TYPE: Select[Tuple[int]]
+# EXPECTED_TYPE: Select[int]
reveal_type(stmt2)
stmt2 = select(User.id).order_by(User.id).group_by(User.email)
stmt2 = (
select(User.id).order_by(User.id, User.email).group_by(User.email, User.id)
)
-# EXPECTED_TYPE: Select[Tuple[int]]
+# EXPECTED_TYPE: Select[int]
reveal_type(stmt2)
q1 = Session().query(User.id).order_by("email").group_by("email")
q1 = Session().query(User.id).order_by("id", "email").group_by("email", "id")
-# EXPECTED_TYPE: RowReturningQuery[Tuple[int]]
+# EXPECTED_TYPE: RowReturningQuery[int]
reveal_type(q1)
q1 = Session().query(User.id).order_by(User.id).group_by(User.email)
.order_by(User.id, User.email)
.group_by(User.email, User.id)
)
-# EXPECTED_TYPE: RowReturningQuery[Tuple[int]]
+# EXPECTED_TYPE: RowReturningQuery[int]
reveal_type(q1)
# test 9174
stmt1 = select(func.aggregate_strings(column("x", String), ","))
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*str\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*str\]
reveal_type(stmt1)
stmt2 = select(func.char_length(column("x")))
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*int\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*int\]
reveal_type(stmt2)
stmt3 = select(func.coalesce(column("x", Integer)))
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*int\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*int\]
reveal_type(stmt3)
stmt4 = select(func.concat())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*str\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*str\]
reveal_type(stmt4)
stmt5 = select(func.count(column("x")))
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*int\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*int\]
reveal_type(stmt5)
stmt6 = select(func.cume_dist())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*Decimal\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*Decimal\]
reveal_type(stmt6)
stmt7 = select(func.current_date())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*date\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*date\]
reveal_type(stmt7)
stmt8 = select(func.current_time())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*time\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*time\]
reveal_type(stmt8)
stmt9 = select(func.current_timestamp())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*datetime\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*datetime\]
reveal_type(stmt9)
stmt10 = select(func.current_user())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*str\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*str\]
reveal_type(stmt10)
stmt11 = select(func.dense_rank())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*int\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*int\]
reveal_type(stmt11)
stmt12 = select(func.localtime())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*datetime\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*datetime\]
reveal_type(stmt12)
stmt13 = select(func.localtimestamp())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*datetime\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*datetime\]
reveal_type(stmt13)
stmt14 = select(func.max(column("x", Integer)))
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*int\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*int\]
reveal_type(stmt14)
stmt15 = select(func.min(column("x", Integer)))
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*int\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*int\]
reveal_type(stmt15)
stmt16 = select(func.next_value(Sequence("x_seq")))
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*int\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*int\]
reveal_type(stmt16)
stmt17 = select(func.now())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*datetime\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*datetime\]
reveal_type(stmt17)
stmt18 = select(func.percent_rank())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*Decimal\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*Decimal\]
reveal_type(stmt18)
stmt19 = select(func.rank())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*int\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*int\]
reveal_type(stmt19)
stmt20 = select(func.session_user())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*str\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*str\]
reveal_type(stmt20)
stmt21 = select(func.sum(column("x", Integer)))
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*int\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*int\]
reveal_type(stmt21)
stmt22 = select(func.sysdate())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*datetime\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*datetime\]
reveal_type(stmt22)
stmt23 = select(func.user())
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*str\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*str\]
reveal_type(stmt23)
# END GENERATED FUNCTION TYPING TESTS
Foo.a,
func.min(Foo.b),
).group_by(Foo.a)
-# EXPECTED_TYPE: Select[Tuple[int, int]]
+# EXPECTED_TYPE: Select[int, int]
reveal_type(stmt1)
# test #10818
Foo.a,
func.coalesce(Foo.c, "a", "b"),
).group_by(Foo.a)
-# EXPECTED_TYPE: Select[Tuple[int, str]]
+# EXPECTED_TYPE: Select[int, str]
reveal_type(stmt2)
from __future__ import annotations
-from typing import Tuple
from typing import TYPE_CHECKING
from sqlalchemy import Column
result = conn.execute(s6)
if TYPE_CHECKING:
- # EXPECTED_TYPE: CursorResult[Any]
+ # EXPECTED_TYPE: CursorResult[Unpack[.*tuple[Any, ...]]]
reveal_type(result)
# we can type these like this
- my_result: Result[Tuple[User]] = conn.execute(s6)
+ my_result: Result[User] = conn.execute(s6)
if TYPE_CHECKING:
# pyright and mypy disagree on the specific type here,
# mypy sees Result as we said, pyright seems to upgrade it to
# CursorResult
- # EXPECTED_RE_TYPE: .*(?:Cursor)?Result\[Tuple\[.*User\]\]
+ # EXPECTED_RE_TYPE: .*(?:Cursor)?Result\[.*User\]
reveal_type(my_result)
import asyncio
from typing import cast
from typing import Optional
-from typing import Tuple
from typing import Type
from sqlalchemy import Column
single_stmt = select(User.name).where(User.name == "foo")
-# EXPECTED_RE_TYPE: sqlalchemy..*Select\*?\[Tuple\[builtins.str\*?\]\]
+# EXPECTED_RE_TYPE: sqlalchemy..*Select\*?\[builtins.str\*?\]
reveal_type(single_stmt)
multi_stmt = select(User.id, User.name).where(User.name == "foo")
-# EXPECTED_RE_TYPE: sqlalchemy..*Select\*?\[Tuple\[builtins.int\*?, builtins.str\*?\]\]
+# EXPECTED_RE_TYPE: sqlalchemy..*Select\*?\[builtins.int\*?, builtins.str\*?\]
reveal_type(multi_stmt)
def t_result_ctxmanager() -> None:
with connection.execute(select(column("q", Integer))) as r1:
- # EXPECTED_TYPE: CursorResult[Tuple[int]]
+ # EXPECTED_TYPE: CursorResult[int]
reveal_type(r1)
with r1.mappings() as r1m:
reveal_type(r2)
with session.execute(select(User.id)) as r3:
- # EXPECTED_TYPE: Result[Tuple[int]]
+ # EXPECTED_TYPE: Result[int]
reveal_type(r3)
with session.scalars(select(User.id)) as r4:
r1 = session.execute(s1)
- # EXPECTED_RE_TYPE: sqlalchemy..*.Result\[Tuple\[builtins.int\*?, typed_results.User\*?, builtins.str\*?\]\]
+ # EXPECTED_RE_TYPE: sqlalchemy..*.Result\[builtins.int\*?, typed_results.User\*?, builtins.str\*?\]
reveal_type(r1)
s2 = select(User, a1).where(User.name == "foo")
r2 = session.execute(s2)
- # EXPECTED_RE_TYPE: sqlalchemy.*Result\[Tuple\[typed_results.User\*?, typed_results.User\*?\]\]
+ # EXPECTED_RE_TYPE: sqlalchemy.*Result\[typed_results.User\*?, typed_results.User\*?\]
reveal_type(r2)
row = r2.t.one()
# automatically typed since they are dynamically generated
a1_id = cast(Mapped[int], a1.id)
s3 = select(User.id, a1_id, a1, User).where(User.name == "foo")
- # EXPECTED_RE_TYPE: sqlalchemy.*Select\*?\[Tuple\[builtins.int\*?, builtins.int\*?, typed_results.User\*?, typed_results.User\*?\]\]
+ # EXPECTED_RE_TYPE: sqlalchemy.*Select\*?\[builtins.int\*?, builtins.int\*?, typed_results.User\*?, typed_results.User\*?\]
reveal_type(s3)
# testing Mapped[entity]
some_mp = cast(Mapped[User], object())
s4 = select(some_mp, a1, User).where(User.name == "foo")
- # NOTEXPECTED_RE_TYPE: sqlalchemy..*Select\*?\[Tuple\[typed_results.User\*?, typed_results.User\*?, typed_results.User\*?\]\]
+ # NOTEXPECTED_RE_TYPE: sqlalchemy..*Select\*?\[typed_results.User\*?, typed_results.User\*?, typed_results.User\*?\]
- # sqlalchemy.sql._gen_overloads.Select[Tuple[typed_results.User, typed_results.User, typed_results.User]]
+ # sqlalchemy.sql._gen_overloads.Select[typed_results.User, typed_results.User, typed_results.User]
- # EXPECTED_TYPE: Select[Tuple[User, User, User]]
+ # EXPECTED_TYPE: Select[User, User, User]
reveal_type(s4)
# test plain core expressions
s5 = select(x, y, User.name + "hi")
- # EXPECTED_RE_TYPE: sqlalchemy..*Select\*?\[Tuple\[builtins.int\*?, builtins.int\*?\, builtins.str\*?]\]
+ # EXPECTED_RE_TYPE: sqlalchemy..*Select\*?\[builtins.int\*?, builtins.int\*?\, builtins.str\*?]
reveal_type(s5)
def t_ambiguous_result_type_one() -> None:
stmt = select(column("q", Integer), table("x", column("y")))
- # EXPECTED_TYPE: Select[Any]
+ # EXPECTED_TYPE: Select[Unpack[.*tuple[Any, ...]]]
reveal_type(stmt)
result = session.execute(stmt)
- # EXPECTED_TYPE: Result[Any]
+ # EXPECTED_TYPE: Result[Unpack[.*tuple[Any, ...]]]
reveal_type(result)
def t_ambiguous_result_type_two() -> None:
stmt = select(column("q"))
- # EXPECTED_TYPE: Select[Tuple[Any]]
+ # EXPECTED_TYPE: Select[Any]
reveal_type(stmt)
result = session.execute(stmt)
- # EXPECTED_TYPE: Result[Any]
+ # EXPECTED_TYPE: Result[Unpack[.*tuple[Any, ...]]]
reveal_type(result)
a1 = aliased(User)
s1 = select(a1)
- # EXPECTED_TYPE: Select[Tuple[User]]
+ # EXPECTED_TYPE: Select[User]
reveal_type(s1)
s4 = select(a1.name, a1, a1, User).where(User.name == "foo")
- # EXPECTED_TYPE: Select[Tuple[str, User, User, User]]
+ # EXPECTED_TYPE: Select[str, User, User, User]
reveal_type(s4)
def t_connection_execute_multi_row_t() -> None:
result = connection.execute(multi_stmt)
- # EXPECTED_RE_TYPE: sqlalchemy.*CursorResult\[Tuple\[builtins.int\*?, builtins.str\*?\]\]
+ # EXPECTED_RE_TYPE: sqlalchemy.*CursorResult\[builtins.int\*?, builtins.str\*?\]
reveal_type(result)
row = result.one()
- # EXPECTED_RE_TYPE: sqlalchemy.*Row\[Tuple\[builtins.int\*?, builtins.str\*?\]\]
+ # EXPECTED_RE_TYPE: .*sqlalchemy.*Row\[builtins.int\*?, builtins.str\*?\].*
reveal_type(row)
x, y = row.t
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
- stmt: Select[Tuple[User, Other]] = select(User, Other).outerjoin(
+ stmt: Select[User, Other] = select(User, Other).outerjoin(
Other, User.id == Other.id
)
- stmt2: Select[Tuple[User, Optional[Other]]] = select(
+ stmt2: Select[User, Optional[Other]] = select(
User, Nullable(Other)
).outerjoin(Other, User.id == Other.id)
- stmt3: Select[Tuple[int, Optional[str]]] = select(
+ stmt3: Select[int, Optional[str]] = select(
User.id, Nullable(Other.name)
).outerjoin(Other, User.id == Other.id)
def go(W: Optional[Type[Other]]) -> None:
- stmt4: Select[Tuple[str, Other]] = select(
+ stmt4: Select[str, Other] = select(
NotNullable(User.value), NotNullable(W)
).where(User.value.is_not(None))
print(stmt4)
rf"""
stmt{count} = select(func.{key}(column('x', Integer)))
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*int\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*int\]
reveal_type(stmt{count})
""",
rf"""
stmt{count} = select(func.{key}(column('x', String), ','))
-# EXPECTED_RE_TYPE: .*Select\[Tuple\[.*str\]\]
+# EXPECTED_RE_TYPE: .*Select\[.*str\]
reveal_type(stmt{count})
""",
fn_class.type, TypeEngine
):
python_type = fn_class.type.python_type
- python_expr = rf"Tuple\[.*{python_type.__name__}\]"
+ python_expr = rf".*{python_type.__name__}"
argspec = inspect.getfullargspec(fn_class)
if fn_class.__name__ == "next_value":
args = "Sequence('x_seq')"
)
for num_args in range(start_index, end_index + 1):
+ ret_suffix = ""
combinations = [
f"__ent{arg}: _TCCA[_T{arg}]"
for arg in range(num_args)
]
+
+ if num_args == end_index:
+ ret_suffix = ", Unpack[TupleAny]"
+ extra_args = (
+ f", *entities: _ColumnsClauseArgument[Any]"
+ f"{extra_args.replace(', *', '')}"
+ )
+
buf.write(
textwrap.indent(
f"""
@overload
def {current_fnname}(
{'self, ' if use_self else ''}{", ".join(combinations)},/{extra_args}
-) -> {return_type}[Tuple[{', '.join(f'_T{i}' for i in range(num_args))}]]:
+) -> {return_type}[{', '.join(f'_T{i}' for i in range(num_args))}{ret_suffix}]:
...
""", # noqa: E501
[testenv:pep484]
deps=
greenlet != 0.4.17
- mypy >= 1.6.0
+ mypy >= 1.7.0
types-greenlet
commands =
mypy {env:MYPY_COLOR} ./lib/sqlalchemy
pytest>=7.0.0rc1,<8
pytest-xdist
greenlet != 0.4.17
- mypy >= 1.2.0
+ mypy >= 1.7.0
patch==1.*
types-greenlet
commands =