from typing import Sequence
from typing import Tuple
from typing import TYPE_CHECKING
-from typing import TypeVar
from typing import Union
from .result import IteratorResult
def __init__(
self,
- parent: CursorResult[Any],
+ parent: CursorResult[Unpack[Tuple[Any, ...]]],
cursor_description: _DBAPICursorDescription,
):
context = parent.context
alternate_cursor_description: Optional[_DBAPICursorDescription] = None
def soft_close(
- self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
+ self,
+ result: CursorResult[Unpack[Tuple[Any, ...]]],
+ dbapi_cursor: Optional[DBAPICursor],
) -> None:
raise NotImplementedError()
def hard_close(
- self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
+ self,
+ result: CursorResult[Unpack[Tuple[Any, ...]]],
+ dbapi_cursor: Optional[DBAPICursor],
) -> None:
raise NotImplementedError()
def yield_per(
self,
- result: CursorResult[Any],
+ result: CursorResult[Unpack[Tuple[Any, ...]]],
dbapi_cursor: Optional[DBAPICursor],
num: int,
) -> None:
def fetchone(
self,
- result: CursorResult[Any],
+ result: CursorResult[Unpack[Tuple[Any, ...]]],
dbapi_cursor: DBAPICursor,
hard_close: bool = False,
) -> Any:
def fetchmany(
self,
- result: CursorResult[Any],
+ result: CursorResult[Unpack[Tuple[Any, ...]]],
dbapi_cursor: DBAPICursor,
size: Optional[int] = None,
) -> Any:
def fetchall(
self,
- result: CursorResult[Any],
+ result: CursorResult[Unpack[Tuple[Any, ...]]],
dbapi_cursor: DBAPICursor,
) -> Any:
raise NotImplementedError()
def handle_exception(
self,
- result: CursorResult[Any],
+ result: CursorResult[Unpack[Tuple[Any, ...]]],
dbapi_cursor: Optional[DBAPICursor],
err: BaseException,
) -> NoReturn:
def _raw_row_iterator(self):
return self._fetchiter_impl()
- def merge(self, *others: Result[Unpack[Tuple[Any, ...]]]) -> MergedResult[Unpack[Tuple[Any, ...]]]:
+ def merge(
+ self, *others: Result[Unpack[Tuple[Any, ...]]]
+ ) -> MergedResult[Unpack[Tuple[Any, ...]]]:
merged_result = super().merge(*others)
setup_rowcounts = self.context._has_rowcount
if setup_rowcounts:
result_column_struct: Optional[
Tuple[List[ResultColumnsEntry], bool, bool, bool, bool]
] = None
- returned_default_rows: Optional[Sequence[Row[Unpack[Tuple[Any, ...]]]]] = None
+ returned_default_rows: Optional[
+ Sequence[Row[Unpack[Tuple[Any, ...]]]]
+ ] = None
execution_options: _ExecuteOptions = util.EMPTY_DICT
from ..util._has_cy import HAS_CYEXTENSION
from ..util.typing import Literal
from ..util.typing import Self
-from ..util.typing import Unpack
from ..util.typing import TypeVarTuple
+from ..util.typing import Unpack
if typing.TYPE_CHECKING or not HAS_CYEXTENSION:
from ._py_row import tuplegetter as tuplegetter
else:
return row
- def fetchmany(self, size: Optional[int] = None) -> Sequence[Row[Unpack[_Ts]]]:
+ def fetchmany(
+ self, size: Optional[int] = None
+ ) -> Sequence[Row[Unpack[_Ts]]]:
"""Fetch many rows.
When all rows are exhausted, returns an empty list.
return FrozenResult(self)
- def merge(self, *others: Result[Any]) -> MergedResult[Unpack[_Ts]]:
+ def merge(
+ self, *others: Result[Unpack[_RawRowType]]
+ ) -> MergedResult[Unpack[_RawRowType]]:
"""Merge this :class:`_engine.Result` with other compatible result
objects.
_post_creational_filter: Optional[Callable[[Any], Any]]
- def __init__(self, real_result: Result[Unpack[_RawRowType]], index: _KeyIndexType):
+ def __init__(
+ self, real_result: Result[Unpack[_RawRowType]], index: _KeyIndexType
+ ):
self._real_result = real_result
if real_result._source_supports_scalars:
rowcount: Optional[int]
def __init__(
- self, cursor_metadata: ResultMetaData, results: Sequence[Result[Unpack[_Ts]]]
+ self,
+ cursor_metadata: ResultMetaData,
+ results: Sequence[Result[Unpack[_Ts]]],
):
self._results = results
super().__init__(
from typing import Mapping
from typing import NoReturn
from typing import Optional
-from typing import overload
from typing import Sequence
from typing import Tuple
from typing import TYPE_CHECKING
from typing import TypeVar
-from typing import Union
from ..sql import util as sql_util
from ..util import deprecated
+from ..util._has_cy import HAS_CYEXTENSION
from ..util.typing import TypeVarTuple
from ..util.typing import Unpack
-from ..util._has_cy import HAS_CYEXTENSION
if TYPE_CHECKING or not HAS_CYEXTENSION:
from ._py_row import BaseRow as BaseRow
from sqlalchemy.cyextension.resultproxy import BaseRow as BaseRow
if TYPE_CHECKING:
+ from typing import Tuple as _RowBase
+
from .result import _KeyType
from .result import _ProcessorsType
from .result import RMKeyView
- from typing import Tuple as _RowBase
else:
_RowBase = Sequence
from ..orm.session import ORMExecuteState
from ..orm.state import InstanceState
from ..sql import Executable
- from ..sql._typing import _TP
from ..sql.elements import ClauseElement
__all__ = ["ShardedSession", "ShardedQuery"]
__slots__ = ("row",)
- def __init__(self, row: Row[Unpack[Tuple[Any, ...]]], max_chars: int = 300):
+ def __init__(
+ self, row: Row[Unpack[Tuple[Any, ...]]], max_chars: int = 300
+ ):
self.row = row
self.max_chars = max_chars