from ..util._has_cy import HAS_CYEXTENSION
from ..util.typing import Literal
from ..util.typing import Self
+from ..util.typing import Unpack
+from ..util.typing import TypeVarTuple
if typing.TYPE_CHECKING or not HAS_CYEXTENSION:
from ._py_row import tuplegetter as tuplegetter
_R = TypeVar("_R", bound=_RowData)
_T = TypeVar("_T", bound=Any)
_TP = TypeVar("_TP", bound=Tuple[Any, ...])
+_Ts = TypeVarTuple("_Ts")
_InterimRowType = Union[_R, _RawRowType]
"""a catchall "anything" kind of return type that can be applied
def result_tuple(
fields: Sequence[str], extra: Optional[Any] = None
-) -> Callable[[Iterable[Any]], Row[Any]]:
+) -> Callable[[Iterable[Any]], Row[Unpack[_RawRowType]]]:
parent = SimpleResultMetaData(fields, extra)
return functools.partial(
Row, parent, parent._effective_processors, parent._key_to_index
return self._metadata.keys
-class Result(_WithKeys, ResultInternal[Row[_TP]]):
+class Result(_WithKeys, ResultInternal[Row[Unpack[_Ts]]]):
"""Represent a set of database results.
.. versionadded:: 1.4 The :class:`_engine.Result` object provides a
return self._column_slices(col_expressions)
@overload
- def scalars(self: Result[Tuple[_T]]) -> ScalarResult[_T]:
+ def scalars(self: Result[_T, Unpack[_RawRowType]]) -> ScalarResult[_T]:
...
@overload
def scalars(
- self: Result[Tuple[_T]], index: Literal[0]
+ self: Result[_T, Unpack[_RawRowType]], index: Literal[0]
) -> ScalarResult[_T]:
...
return MappingResult(self)
@property
- def t(self) -> TupleResult[_TP]:
+ def t(self) -> TupleResult[Tuple[Unpack[_Ts]]]:
"""Apply a "typed tuple" typing filter to returned rows.
The :attr:`_engine.Result.t` attribute is a synonym for
"""
return self # type: ignore
- def tuples(self) -> TupleResult[_TP]:
+ def tuples(self) -> TupleResult[Tuple[Unpack[_Ts]]]:
"""Apply a "typed tuple" typing filter to returned rows.
This method returns the same :class:`_engine.Result` object
"""
raise NotImplementedError()
- def __iter__(self) -> Iterator[Row[_TP]]:
+ def __iter__(self) -> Iterator[Row[Unpack[_Ts]]]:
return self._iter_impl()
- def __next__(self) -> Row[_TP]:
+ def __next__(self) -> Row[Unpack[_Ts]]:
return self._next_impl()
def partitions(
self, size: Optional[int] = None
- ) -> Iterator[Sequence[Row[_TP]]]:
+ ) -> Iterator[Sequence[Row[Unpack[_Ts]]]]:
"""Iterate through sub-lists of rows of the size given.
Each list will be of the size given, excluding the last list to
else:
break
- def fetchall(self) -> Sequence[Row[_TP]]:
+ def fetchall(self) -> Sequence[Row[Unpack[_Ts]]]:
"""A synonym for the :meth:`_engine.Result.all` method."""
return self._allrows()
- def fetchone(self) -> Optional[Row[_TP]]:
+ def fetchone(self) -> Optional[Row[Unpack[_Ts]]]:
"""Fetch one row.
When all rows are exhausted, returns None.
else:
return row
- def fetchmany(self, size: Optional[int] = None) -> Sequence[Row[_TP]]:
+ def fetchmany(self, size: Optional[int] = None) -> Sequence[Row[Unpack[_Ts]]]:
"""Fetch many rows.
When all rows are exhausted, returns an empty list.
return self._manyrow_getter(self, size)
- def all(self) -> Sequence[Row[_TP]]:
+ def all(self) -> Sequence[Row[Unpack[_Ts]]]:
"""Return all rows in a list.
Closes the result set after invocation. Subsequent invocations
return self._allrows()
- def first(self) -> Optional[Row[_TP]]:
+ def first(self) -> Optional[Row[Unpack[_Ts]]]:
"""Fetch the first row or ``None`` if no row is present.
Closes the result set and discards remaining rows.
raise_for_second_row=False, raise_for_none=False, scalar=False
)
- def one_or_none(self) -> Optional[Row[_TP]]:
+ def one_or_none(self) -> Optional[Row[Unpack[_Ts]]]:
"""Return at most one result or raise an exception.
Returns ``None`` if the result has no rows.
raise_for_second_row=True, raise_for_none=False, scalar=True
)
- def one(self) -> Row[_TP]:
+ def one(self) -> Row[Unpack[_Ts]]:
"""Return exactly one row or raise an exception.
Raises :class:`.NoResultFound` if the result returns no
raise_for_second_row=False, raise_for_none=False, scalar=True
)
- def freeze(self) -> FrozenResult[_TP]:
+ def freeze(self) -> FrozenResult[Unpack[_Ts]]:
"""Return a callable object that will produce copies of this
:class:`_engine.Result` when invoked.
return FrozenResult(self)
- def merge(self, *others: Result[Any]) -> MergedResult[_TP]:
+ def merge(self, *others: Result[Any]) -> MergedResult[Unpack[_Ts]]:
"""Merge this :class:`_engine.Result` with other compatible result
objects.
_post_creational_filter: Optional[Callable[[Any], Any]]
- def __init__(self, real_result: Result[Any], index: _KeyIndexType):
+ def __init__(self, real_result: Result[Unpack[_RawRowType]], index: _KeyIndexType):
self._real_result = real_result
if real_result._source_supports_scalars:
_post_creational_filter = operator.attrgetter("_mapping")
- def __init__(self, result: Result[Any]):
+ def __init__(self, result: Result[Unpack[_RawRowType]]):
self._real_result = result
self._unique_filter_state = result._unique_filter_state
self._metadata = result._metadata
)
-class FrozenResult(Generic[_TP]):
+class FrozenResult(Generic[Unpack[_Ts]]):
"""Represents a :class:`_engine.Result` object in a "frozen" state suitable
for caching.
data: Sequence[Any]
- def __init__(self, result: Result[_TP]):
+ def __init__(self, result: Result[Unpack[_Ts]]):
self.metadata = result._metadata._for_freeze()
self._source_supports_scalars = result._source_supports_scalars
self._attributes = result._attributes
return [list(row) for row in self.data]
def with_new_rows(
- self, tuple_data: Sequence[Row[_TP]]
- ) -> FrozenResult[_TP]:
+ self, tuple_data: Sequence[Row[Unpack[_Ts]]]
+ ) -> FrozenResult[Unpack[_Ts]]:
fr = FrozenResult.__new__(FrozenResult)
fr.metadata = self.metadata
fr._attributes = self._attributes
fr._source_supports_scalars = self._source_supports_scalars
if self._source_supports_scalars:
- fr.data = [d[0] for d in tuple_data]
+ fr.data = [d[0] for d in tuple_data] # type: ignore[misc]
else:
fr.data = tuple_data
return fr
- def __call__(self) -> Result[_TP]:
- result: IteratorResult[_TP] = IteratorResult(
+ def __call__(self) -> Result[Unpack[_Ts]]:
+ result: IteratorResult[Unpack[_Ts]] = IteratorResult(
self.metadata, iter(self.data)
)
result._attributes = self._attributes
return result
-class IteratorResult(Result[_TP]):
+class IteratorResult(Result[Unpack[_Ts]]):
"""A :class:`_engine.Result` that gets data from a Python iterator of
:class:`_engine.Row` objects or similar row-like data.
return IteratorResult(SimpleResultMetaData([]), iter([]))
-class ChunkedIteratorResult(IteratorResult[_TP]):
+class ChunkedIteratorResult(IteratorResult[Unpack[_Ts]]):
"""An :class:`_engine.IteratorResult` that works from an
iterator-producing callable.
return super()._fetchmany_impl(size=size)
-class MergedResult(IteratorResult[_TP]):
+class MergedResult(IteratorResult[Unpack[_Ts]]):
"""A :class:`_engine.Result` that is merged from any number of
:class:`_engine.Result` objects.
rowcount: Optional[int]
def __init__(
- self, cursor_metadata: ResultMetaData, results: Sequence[Result[_TP]]
+ self, cursor_metadata: ResultMetaData, results: Sequence[Result[Unpack[_Ts]]]
):
self._results = results
super().__init__(
from ..sql import util as sql_util
from ..util import deprecated
+from ..util.typing import TypeVarTuple
+from ..util.typing import Unpack
from ..util._has_cy import HAS_CYEXTENSION
if TYPE_CHECKING or not HAS_CYEXTENSION:
from .result import _KeyType
from .result import _ProcessorsType
from .result import RMKeyView
+ from typing import Tuple as _RowBase
+else:
+ _RowBase = Sequence
+
_T = TypeVar("_T", bound=Any)
_TP = TypeVar("_TP", bound=Tuple[Any, ...])
+_Ts = TypeVarTuple("_Ts")
-class Row(BaseRow, Sequence[Any], Generic[_TP]):
+class Row(BaseRow, _RowBase[Unpack[_Ts]], Generic[Unpack[_Ts]]):
"""Represent a single result row.
The :class:`.Row` object represents a row of a database result. It is
def __delattr__(self, name: str) -> NoReturn:
raise AttributeError("can't delete attribute")
- def _tuple(self) -> _TP:
+ def _tuple(self) -> Tuple[Unpack[_Ts]]:
"""Return a 'tuple' form of this :class:`.Row`.
At runtime, this method returns "self"; the :class:`.Row` object is
"""
- return self # type: ignore
+ return self
@deprecated(
"2.0.19",
"methods and library-level attributes are intended to be underscored "
"to avoid name conflicts. Please use :meth:`Row._tuple`.",
)
- def tuple(self) -> _TP:
+ def tuple(self) -> Tuple[Unpack[_Ts]]:
"""Return a 'tuple' form of this :class:`.Row`.
.. versionadded:: 2.0
return self._tuple()
@property
- def _t(self) -> _TP:
+ def _t(self) -> Tuple[Unpack[_Ts]]:
"""A synonym for :meth:`.Row._tuple`.
.. versionadded:: 2.0.19 - The :attr:`.Row._t` attribute supersedes
:attr:`.Result.t`
"""
- return self # type: ignore
+ return self
@property
@deprecated(
"methods and library-level attributes are intended to be underscored "
"to avoid name conflicts. Please use :attr:`Row._t`.",
)
- def t(self) -> _TP:
+ def t(self) -> Tuple[Unpack[_Ts]]:
"""A synonym for :meth:`.Row._tuple`.
.. versionadded:: 2.0
def _filter_on_values(
self, processor: Optional[_ProcessorsType]
- ) -> Row[Any]:
+ ) -> Row[Unpack[_Ts]]:
return Row(self._parent, processor, self._key_to_index, self._data)
if not TYPE_CHECKING:
__hash__ = BaseRow.__hash__
- if TYPE_CHECKING:
-
- @overload
- def __getitem__(self, index: int) -> Any:
- ...
-
- @overload
- def __getitem__(self, index: slice) -> Sequence[Any]:
- ...
-
- def __getitem__(self, index: Union[int, slice]) -> Any:
- ...
-
def __lt__(self, other: Any) -> bool:
return self._op(other, operator.lt)