be imported only when the asyncio extension is first imported.
Alternatively, the ``greenlet`` library is still imported lazily on
first use to support use case that don't make direct use of the
- SQLAlchemy asyncio extension.
\ No newline at end of file
+ SQLAlchemy asyncio extension.
from .. import util
from ..sql import compiler
from ..sql import util as sql_util
+from ..util.typing import TupleAny
from ..util.typing import TypeVarTuple
from ..util.typing import Unpack
parameters: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> CursorResult[Unpack[Tuple[Any, ...]]]:
+ ) -> CursorResult[Unpack[TupleAny]]:
...
def execute(
parameters: Optional[_CoreAnyExecuteParams] = None,
*,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> CursorResult[Unpack[Tuple[Any, ...]]]:
+ ) -> CursorResult[Unpack[TupleAny]]:
r"""Executes a SQL statement construct and returns a
:class:`_engine.CursorResult`.
func: FunctionElement[Any],
distilled_parameters: _CoreMultiExecuteParams,
execution_options: CoreExecuteOptionsParameter,
- ) -> CursorResult[Unpack[Tuple[Any, ...]]]:
+ ) -> CursorResult[Unpack[TupleAny]]:
"""Execute a sql.FunctionElement object."""
return self._execute_clauseelement(
ddl: ExecutableDDLElement,
distilled_parameters: _CoreMultiExecuteParams,
execution_options: CoreExecuteOptionsParameter,
- ) -> CursorResult[Unpack[Tuple[Any, ...]]]:
+ ) -> CursorResult[Unpack[TupleAny]]:
"""Execute a schema.DDL object."""
execution_options = ddl._execution_options.merge_with(
elem: Executable,
distilled_parameters: _CoreMultiExecuteParams,
execution_options: CoreExecuteOptionsParameter,
- ) -> CursorResult[Unpack[Tuple[Any, ...]]]:
+ ) -> CursorResult[Unpack[TupleAny]]:
"""Execute a sql.ClauseElement object."""
execution_options = elem._execution_options.merge_with(
compiled: Compiled,
distilled_parameters: _CoreMultiExecuteParams,
execution_options: CoreExecuteOptionsParameter = _EMPTY_EXECUTION_OPTS,
- ) -> CursorResult[Unpack[Tuple[Any, ...]]]:
+ ) -> CursorResult[Unpack[TupleAny]]:
"""Execute a sql.Compiled object.
TODO: why do we have this? likely deprecate or remove
statement: str,
parameters: Optional[_DBAPIAnyExecuteParams] = None,
execution_options: Optional[CoreExecuteOptionsParameter] = None,
- ) -> CursorResult[Unpack[Tuple[Any, ...]]]:
+ ) -> CursorResult[Unpack[TupleAny]]:
r"""Executes a string SQL statement on the DBAPI cursor directly,
without any SQL compilation steps.
execution_options: _ExecuteOptions,
*args: Any,
**kw: Any,
- ) -> CursorResult[Unpack[Tuple[Any, ...]]]:
+ ) -> CursorResult[Unpack[TupleAny]]:
"""Create an :class:`.ExecutionContext` and execute, returning
a :class:`_engine.CursorResult`."""
context: ExecutionContext,
statement: Union[str, Compiled],
parameters: Optional[_AnyMultiExecuteParams],
- ) -> CursorResult[Unpack[Tuple[Any, ...]]]:
+ ) -> CursorResult[Unpack[TupleAny]]:
"""continue the _execute_context() method for a single DBAPI
cursor.execute() or cursor.executemany() call.
self,
dialect: Dialect,
context: ExecutionContext,
- ) -> CursorResult[Unpack[Tuple[Any, ...]]]:
+ ) -> CursorResult[Unpack[TupleAny]]:
"""continue the _execute_context() method for an "insertmanyvalues"
operation, which will invoke DBAPI
cursor.execute() one or more times with individual log and
from ..util import compat
from ..util.typing import Literal
from ..util.typing import Self
+from ..util.typing import TupleAny
from ..util.typing import TypeVarTuple
from ..util.typing import Unpack
def __init__(
self,
- parent: CursorResult[Unpack[Tuple[Any, ...]]],
+ parent: CursorResult[Unpack[TupleAny]],
cursor_description: _DBAPICursorDescription,
):
context = parent.context
def soft_close(
self,
- result: CursorResult[Unpack[Tuple[Any, ...]]],
+ result: CursorResult[Unpack[TupleAny]],
dbapi_cursor: Optional[DBAPICursor],
) -> None:
raise NotImplementedError()
def hard_close(
self,
- result: CursorResult[Unpack[Tuple[Any, ...]]],
+ result: CursorResult[Unpack[TupleAny]],
dbapi_cursor: Optional[DBAPICursor],
) -> None:
raise NotImplementedError()
def yield_per(
self,
- result: CursorResult[Unpack[Tuple[Any, ...]]],
+ result: CursorResult[Unpack[TupleAny]],
dbapi_cursor: Optional[DBAPICursor],
num: int,
) -> None:
def fetchone(
self,
- result: CursorResult[Unpack[Tuple[Any, ...]]],
+ result: CursorResult[Unpack[TupleAny]],
dbapi_cursor: DBAPICursor,
hard_close: bool = False,
) -> Any:
def fetchmany(
self,
- result: CursorResult[Unpack[Tuple[Any, ...]]],
+ result: CursorResult[Unpack[TupleAny]],
dbapi_cursor: DBAPICursor,
size: Optional[int] = None,
) -> Any:
def fetchall(
self,
- result: CursorResult[Unpack[Tuple[Any, ...]]],
+ result: CursorResult[Unpack[TupleAny]],
dbapi_cursor: DBAPICursor,
) -> Any:
raise NotImplementedError()
def handle_exception(
self,
- result: CursorResult[Unpack[Tuple[Any, ...]]],
+ result: CursorResult[Unpack[TupleAny]],
dbapi_cursor: Optional[DBAPICursor],
err: BaseException,
) -> NoReturn:
return self._fetchiter_impl()
def merge(
- self, *others: Result[Unpack[Tuple[Any, ...]]]
- ) -> MergedResult[Unpack[Tuple[Any, ...]]]:
+ self, *others: Result[Unpack[TupleAny]]
+ ) -> MergedResult[Unpack[TupleAny]]:
merged_result = super().merge(*others)
setup_rowcounts = self.context._has_rowcount
if setup_rowcounts:
from ..sql.elements import quoted_name
from ..util.typing import Final
from ..util.typing import Literal
+from ..util.typing import TupleAny
from ..util.typing import Unpack
result_column_struct: Optional[
Tuple[List[ResultColumnsEntry], bool, bool, bool, bool]
] = None
- returned_default_rows: Optional[
- Sequence[Row[Unpack[Tuple[Any, ...]]]]
- ] = None
+ returned_default_rows: Optional[Sequence[Row[Unpack[TupleAny]]]] = None
execution_options: _ExecuteOptions = util.EMPTY_DICT
from .. import event
from .. import exc
from ..util.typing import Literal
+from ..util.typing import TupleAny
from ..util.typing import Unpack
if typing.TYPE_CHECKING:
multiparams: _CoreMultiExecuteParams,
params: _CoreSingleExecuteParams,
execution_options: _ExecuteOptions,
- result: Result[Unpack[Tuple[Any, ...]]],
+ result: Result[Unpack[TupleAny]],
) -> None:
"""Intercept high level execute() events after execute.
from ..util._has_cy import HAS_CYEXTENSION
from ..util.typing import Literal
from ..util.typing import Self
+from ..util.typing import TupleAny
from ..util.typing import TypeVarTuple
from ..util.typing import Unpack
_KeyMapType = Mapping[_KeyType, _KeyMapRecType]
-_RowData = Union[Row[Unpack[Tuple[Any, ...]]], RowMapping, Any]
+_RowData = Union[Row[Unpack[TupleAny]], RowMapping, Any]
"""A generic form of "row" that accommodates for the different kinds of
"rows" that different result objects return, including row, row mapping, and
scalar values"""
def _getter(
self, key: Any, raiseerr: bool = True
- ) -> Optional[Callable[[Row[Unpack[Tuple[Any, ...]]]], Any]]:
+ ) -> Optional[Callable[[Row[Unpack[TupleAny]]], Any]]:
index = self._index_for_key(key, raiseerr)
if index is not None:
class ResultInternal(InPlaceGenerative, Generic[_R]):
__slots__ = ()
- _real_result: Optional[Result[Unpack[Tuple[Any, ...]]]] = None
+ _real_result: Optional[Result[Unpack[TupleAny]]] = None
_generate_rows: bool = True
_row_logging_fn: Optional[Callable[[Any], Any]]
def _fetchiter_impl(
self,
- ) -> Iterator[_InterimRowType[Row[Unpack[Tuple[Any, ...]]]]]:
+ ) -> Iterator[_InterimRowType[Row[Unpack[TupleAny]]]]:
raise NotImplementedError()
def _fetchone_impl(
self, hard_close: bool = False
- ) -> Optional[_InterimRowType[Row[Unpack[Tuple[Any, ...]]]]]:
+ ) -> Optional[_InterimRowType[Row[Unpack[TupleAny]]]]:
raise NotImplementedError()
def _fetchmany_impl(
self, size: Optional[int] = None
- ) -> List[_InterimRowType[Row[Unpack[Tuple[Any, ...]]]]]:
+ ) -> List[_InterimRowType[Row[Unpack[TupleAny]]]]:
raise NotImplementedError()
def _fetchall_impl(
self,
- ) -> List[_InterimRowType[Row[Unpack[Tuple[Any, ...]]]]]:
+ ) -> List[_InterimRowType[Row[Unpack[TupleAny]]]]:
raise NotImplementedError()
def _soft_close(self, hard: bool = False) -> None:
@HasMemoized_ro_memoized_attribute
def _row_getter(self) -> Optional[Callable[..., _R]]:
- real_result: Result[Unpack[Tuple[Any, ...]]] = (
+ real_result: Result[Unpack[TupleAny]] = (
self._real_result
if self._real_result
- else cast("Result[Unpack[Tuple[Any, ...]]]", self)
+ else cast("Result[Unpack[TupleAny]]", self)
)
if real_result._source_supports_scalars:
fixed_tf = tf
- def make_row(
- row: _InterimRowType[Row[Unpack[Tuple[Any, ...]]]]
- ) -> _R:
+ def make_row(row: _InterimRowType[Row[Unpack[TupleAny]]]) -> _R:
return _make_row_orig(fixed_tf(row))
else:
_log_row = real_result._row_logging_fn
_make_row = make_row
- def make_row(
- row: _InterimRowType[Row[Unpack[Tuple[Any, ...]]]]
- ) -> _R:
+ def make_row(row: _InterimRowType[Row[Unpack[TupleAny]]]) -> _R:
return _log_row(_make_row(row)) # type: ignore
return make_row
if self._unique_filter_state:
uniques, strategy = self._unique_strategy
- def iterrows(
- self: Result[Unpack[Tuple[Any, ...]]]
- ) -> Iterator[_R]:
+ def iterrows(self: Result[Unpack[TupleAny]]) -> Iterator[_R]:
for raw_row in self._fetchiter_impl():
obj: _InterimRowType[Any] = (
make_row(raw_row) if make_row else raw_row
else:
- def iterrows(
- self: Result[Unpack[Tuple[Any, ...]]]
- ) -> Iterator[_R]:
+ def iterrows(self: Result[Unpack[TupleAny]]) -> Iterator[_R]:
for raw_row in self._fetchiter_impl():
row: _InterimRowType[Any] = (
make_row(raw_row) if make_row else raw_row
if self._unique_filter_state:
uniques, strategy = self._unique_strategy
- def onerow(
- self: Result[Unpack[Tuple[Any, ...]]]
- ) -> Union[_NoRow, _R]:
+ def onerow(self: Result[Unpack[TupleAny]]) -> Union[_NoRow, _R]:
_onerow = self._fetchone_impl
while True:
row = _onerow()
else:
- def onerow(
- self: Result[Unpack[Tuple[Any, ...]]]
- ) -> Union[_NoRow, _R]:
+ def onerow(self: Result[Unpack[TupleAny]]) -> Union[_NoRow, _R]:
row = self._fetchone_impl()
if row is None:
return _NO_ROW
real_result = (
self._real_result
if self._real_result
- else cast("Result[Unpack[Tuple[Any, ...]]]", self)
+ else cast("Result[Unpack[TupleAny]]", self)
)
if real_result._yield_per:
num_required = num = real_result._yield_per
real_result = (
self._real_result
if self._real_result
- else cast("Result[Unpack[Tuple[Any, ...]]]", self)
+ else cast("Result[Unpack[TupleAny]]", self)
)
num = real_result._yield_per
real_result = (
self._real_result
if self._real_result
- else cast("Result[Unpack[Tuple[Any, ...]]]", self)
+ else cast("Result[Unpack[TupleAny]]", self)
)
if not real_result._source_supports_scalars or len(indexes) != 1:
real_result = (
self._real_result
if self._real_result is not None
- else cast("Result[Unpack[Tuple[Any, ...]]]", self)
+ else cast("Result[Unpack[TupleAny]]", self)
)
if not strategy and self._metadata._unique_filters:
__slots__ = ("_metadata", "__dict__")
_row_logging_fn: Optional[
- Callable[[Row[Unpack[Tuple[Any, ...]]]], Row[Unpack[Tuple[Any, ...]]]]
+ Callable[[Row[Unpack[TupleAny]]], Row[Unpack[TupleAny]]]
] = None
_source_supports_scalars: bool = False
def _getter(
self, key: _KeyIndexType, raiseerr: bool = True
- ) -> Optional[Callable[[Row[Unpack[Tuple[Any, ...]]]], Any]]:
+ ) -> Optional[Callable[[Row[Unpack[TupleAny]]], Any]]:
"""return a callable that will retrieve the given key from a
:class:`_engine.Row`.
_post_creational_filter: Optional[Callable[[Any], Any]]
- _real_result: Result[Unpack[Tuple[Any, ...]]]
+ _real_result: Result[Unpack[TupleAny]]
def __enter__(self) -> Self:
return self
def _fetchiter_impl(
self,
- ) -> Iterator[_InterimRowType[Row[Unpack[Tuple[Any, ...]]]]]:
+ ) -> Iterator[_InterimRowType[Row[Unpack[TupleAny]]]]:
return self._real_result._fetchiter_impl()
def _fetchone_impl(
self, hard_close: bool = False
- ) -> Optional[_InterimRowType[Row[Unpack[Tuple[Any, ...]]]]]:
+ ) -> Optional[_InterimRowType[Row[Unpack[TupleAny]]]]:
return self._real_result._fetchone_impl(hard_close=hard_close)
def _fetchall_impl(
self,
- ) -> List[_InterimRowType[Row[Unpack[Tuple[Any, ...]]]]]:
+ ) -> List[_InterimRowType[Row[Unpack[TupleAny]]]]:
return self._real_result._fetchall_impl()
def _fetchmany_impl(
self, size: Optional[int] = None
- ) -> List[_InterimRowType[Row[Unpack[Tuple[Any, ...]]]]]:
+ ) -> List[_InterimRowType[Row[Unpack[TupleAny]]]]:
return self._real_result._fetchmany_impl(size=size)
def _fetchone_impl(
self, hard_close: bool = False
- ) -> Optional[_InterimRowType[Row[Unpack[Tuple[Any, ...]]]]]:
+ ) -> Optional[_InterimRowType[Row[Unpack[TupleAny]]]]:
if self._hard_closed:
self._raise_hard_closed()
def _fetchall_impl(
self,
- ) -> List[_InterimRowType[Row[Unpack[Tuple[Any, ...]]]]]:
+ ) -> List[_InterimRowType[Row[Unpack[TupleAny]]]]:
if self._hard_closed:
self._raise_hard_closed()
try:
def _fetchmany_impl(
self, size: Optional[int] = None
- ) -> List[_InterimRowType[Row[Unpack[Tuple[Any, ...]]]]]:
+ ) -> List[_InterimRowType[Row[Unpack[TupleAny]]]]:
if self._hard_closed:
self._raise_hard_closed()
def _fetchmany_impl(
self, size: Optional[int] = None
- ) -> List[_InterimRowType[Row[Unpack[Tuple[Any, ...]]]]]:
+ ) -> List[_InterimRowType[Row[Unpack[TupleAny]]]]:
if self.dynamic_yield_per:
self.iterator = itertools.chain.from_iterable(self.chunks(size))
return super()._fetchmany_impl(size=size)
from ...util.concurrency import greenlet_spawn
from ...util.typing import Literal
from ...util.typing import Self
+from ...util.typing import TupleAny
from ...util.typing import TypeVarTuple
from ...util.typing import Unpack
class AsyncCommon(FilterResult[_R]):
__slots__ = ()
- _real_result: Result[Unpack[Tuple[Any, ...]]]
+ _real_result: Result[Unpack[TupleAny]]
_metadata: ResultMetaData
async def close(self) -> None: # type: ignore[override]
@overload
def scalars(
- self: AsyncResult[_T, Unpack[Tuple[Any, ...]]], index: Literal[0]
+ self: AsyncResult[_T, Unpack[TupleAny]], index: Literal[0]
) -> AsyncScalarResult[_T]:
...
@overload
def scalars(
- self: AsyncResult[_T, Unpack[Tuple[Any, ...]]],
+ self: AsyncResult[_T, Unpack[TupleAny]],
) -> AsyncScalarResult[_T]:
...
def __init__(
self,
- real_result: Result[Unpack[Tuple[Any, ...]]],
+ real_result: Result[Unpack[TupleAny]],
index: _KeyIndexType,
):
self._real_result = real_result
_post_creational_filter = operator.attrgetter("_mapping")
- def __init__(self, result: Result[Unpack[Tuple[Any, ...]]]):
+ def __init__(self, result: Result[Unpack[TupleAny]]):
self._real_result = result
self._unique_filter_state = result._unique_filter_state
self._metadata = result._metadata
...
-_RT = TypeVar("_RT", bound="Result[Unpack[Tuple[Any, ...]]]")
+_RT = TypeVar("_RT", bound="Result[Unpack[TupleAny]]")
async def _ensure_sync_result(result: _RT, calling_method: Any) -> _RT:
from typing import Optional
from typing import overload
from typing import Sequence
-from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
from typing import TypeVar
from ...util import ScopedRegistry
from ...util import warn
from ...util import warn_deprecated
+from ...util.typing import TupleAny
from ...util.typing import TypeVarTuple
from ...util.typing import Unpack
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[Unpack[Tuple[Any, ...]]]:
+ ) -> Result[Unpack[TupleAny]]:
...
async def execute(
execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> Result[Unpack[Tuple[Any, ...]]]:
+ ) -> Result[Unpack[TupleAny]]:
r"""Execute a statement and return a buffered
:class:`_engine.Result` object.
def identity_key(
cls,
class_: Optional[Type[Any]] = None,
- ident: Union[Any, Tuple[Any, ...]] = None,
+ ident: Union[Any, TupleAny] = None,
*,
instance: Optional[Any] = None,
- row: Optional[Union[Row[Unpack[Tuple[Any, ...]]], RowMapping]] = None,
+ row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None,
identity_token: Optional[Any] = None,
) -> _IdentityKeyType[Any]:
r"""Return an identity key.
from ..sql.selectable import SelectState
from ..sql.selectable import TypedReturnsRows
from ..sql.visitors import InternalTraversal
+from ..util.typing import TupleAny
from ..util.typing import TypeVarTuple
from ..util.typing import Unpack
self,
compile_state: CompileState,
statement: Union[
- Select[Unpack[Tuple[Any, ...]]],
- FromStatement[Unpack[Tuple[Any, ...]]],
+ Select[Unpack[TupleAny]],
+ FromStatement[Unpack[TupleAny]],
],
params: _CoreSingleExecuteParams,
session: Session,
attributes: Dict[Any, Any]
global_attributes: Dict[Any, Any]
- statement: Union[
- Select[Unpack[Tuple[Any, ...]]], FromStatement[Unpack[Tuple[Any, ...]]]
- ]
+ statement: Union[Select[Unpack[TupleAny]], FromStatement[Unpack[TupleAny]]]
select_statement: Union[
- Select[Unpack[Tuple[Any, ...]]], FromStatement[Unpack[Tuple[Any, ...]]]
+ Select[Unpack[TupleAny]], FromStatement[Unpack[TupleAny]]
]
_entities: List[_QueryEntity]
_polymorphic_adapters: Dict[_InternalEntityType, ORMAdapter]
dedupe_columns: Set[ColumnElement[Any]]
create_eager_joins: List[
# TODO: this structure is set up by JoinedLoader
- Tuple[Any, ...]
+ TupleAny
]
current_path: PathRegistry = _path_registry
_has_mapper_entities = False
def _legacy_filter_by_entity_zero(
- query_or_augmented_select: Union[
- Query[Any], Select[Unpack[Tuple[Any, ...]]]
- ]
+ query_or_augmented_select: Union[Query[Any], Select[Unpack[TupleAny]]]
) -> Optional[_InternalEntityType[Any]]:
self = query_or_augmented_select
if self._setup_joins:
def _entity_from_pre_ent_zero(
- query_or_augmented_select: Union[
- Query[Any], Select[Unpack[Tuple[Any, ...]]]
- ]
+ query_or_augmented_select: Union[Query[Any], Select[Unpack[TupleAny]]]
) -> Optional[_InternalEntityType[Any]]:
self = query_or_augmented_select
if not self._raw_columns:
from ..sql.elements import BindParameter
from ..util.typing import is_fwd_ref
from ..util.typing import is_pep593
+from ..util.typing import TupleAny
from ..util.typing import typing_get_args
from ..util.typing import Unpack
def create_row_processor(
self,
- query: Select[Unpack[Tuple[Any, ...]]],
- procs: Sequence[Callable[[Row[Unpack[Tuple[Any, ...]]]], Any]],
+ query: Select[Unpack[TupleAny]],
+ procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]],
labels: Sequence[str],
- ) -> Callable[[Row[Unpack[Tuple[Any, ...]]]], Any]:
- def proc(row: Row[Unpack[Tuple[Any, ...]]]) -> Any:
+ ) -> Callable[[Row[Unpack[TupleAny]]], Any]:
+ def proc(row: Row[Unpack[TupleAny]]) -> Any:
return self.property.composite_class(
*[proc(row) for proc in procs]
)
from ..sql.type_api import TypeEngine
from ..util import warn_deprecated
from ..util.typing import RODescriptorReference
+from ..util.typing import TupleAny
from ..util.typing import TypedDict
from ..util.typing import Unpack
query_entity: _MapperEntity,
path: AbstractEntityRegistry,
mapper: Mapper[Any],
- result: Result[Unpack[Tuple[Any, ...]]],
+ result: Result[Unpack[TupleAny]],
adapter: Optional[ORMAdapter],
populators: _PopulatorDict,
) -> None:
query_entity: _MapperEntity,
path: AbstractEntityRegistry,
mapper: Mapper[Any],
- result: Result[Unpack[Tuple[Any, ...]]],
+ result: Result[Unpack[TupleAny]],
adapter: Optional[ORMAdapter],
populators: _PopulatorDict,
) -> None:
path: AbstractEntityRegistry,
loadopt: Optional[_LoadElement],
mapper: Mapper[Any],
- result: Result[Unpack[Tuple[Any, ...]]],
+ result: Result[Unpack[TupleAny]],
adapter: Optional[ORMAdapter],
populators: _PopulatorDict,
) -> None:
from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
from ..sql.selectable import SelectState
from ..util import EMPTY_DICT
+from ..util.typing import TupleAny
from ..util.typing import Unpack
-
if TYPE_CHECKING:
from ._typing import _IdentityKeyType
from .base import LoaderCallableStatus
def instances(
- cursor: CursorResult[Unpack[Tuple[Any, ...]]], context: QueryContext
-) -> Result[Unpack[Tuple[Any, ...]]]:
+ cursor: CursorResult[Unpack[TupleAny]], context: QueryContext
+) -> Result[Unpack[TupleAny]]:
"""Return a :class:`.Result` given an ORM query context.
:param cursor: a :class:`.CursorResult`, generated by a statement
from ..util import HasMemoized
from ..util import HasMemoized_ro_memoized_attribute
from ..util.typing import Literal
+from ..util.typing import TupleAny
from ..util.typing import Unpack
if TYPE_CHECKING:
def identity_key_from_row(
self,
- row: Optional[Union[Row[Unpack[Tuple[Any, ...]]], RowMapping]],
+ row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]],
identity_token: Optional[Any] = None,
adapter: Optional[ORMAdapter] = None,
) -> _IdentityKeyType[_O]:
from ..util import deprecated
from ..util.typing import Literal
from ..util.typing import Self
+from ..util.typing import TupleAny
from ..util.typing import TypeVarTuple
from ..util.typing import Unpack
def _final_statement(
self, legacy_query_style: bool = True
- ) -> Select[Unpack[Tuple[Any, ...]]]:
+ ) -> Select[Unpack[TupleAny]]:
"""Return the 'final' SELECT statement for this :class:`.Query`.
This is used by the testing suite only and is fairly inefficient.
from ..util import warn
from ..util import warn_deprecated
from ..util.typing import Protocol
+from ..util.typing import TupleAny
from ..util.typing import TypeVarTuple
from ..util.typing import Unpack
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> CursorResult[Unpack[Tuple[Any, ...]]]:
+ ) -> CursorResult[Unpack[TupleAny]]:
...
@overload
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[Unpack[Tuple[Any, ...]]]:
+ ) -> Result[Unpack[TupleAny]]:
...
def execute(
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[Unpack[Tuple[Any, ...]]]:
+ ) -> Result[Unpack[TupleAny]]:
r"""Execute a SQL expression construct.
.. container:: class_bases
ident: Union[Any, Tuple[Any, ...]] = None,
*,
instance: Optional[Any] = None,
- row: Optional[Union[Row[Unpack[Tuple[Any, ...]]], RowMapping]] = None,
+ row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None,
identity_token: Optional[Any] = None,
) -> _IdentityKeyType[Any]:
r"""Return an identity key.
from ..util import IdentitySet
from ..util.typing import Literal
from ..util.typing import Protocol
+from ..util.typing import TupleAny
from ..util.typing import TypeVarTuple
from ..util.typing import Unpack
ident: Union[Any, Tuple[Any, ...]] = None,
*,
instance: Optional[Any] = None,
- row: Optional[Union[Row[Unpack[Tuple[Any, ...]]], RowMapping]] = None,
+ row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None,
identity_token: Optional[Any] = None,
) -> _IdentityKeyType[Any]:
"""Return an identity key.
params: Optional[_CoreAnyExecuteParams] = None,
execution_options: Optional[OrmExecuteOptionsParameter] = None,
bind_arguments: Optional[_BindArguments] = None,
- ) -> Result[Unpack[Tuple[Any, ...]]]:
+ ) -> Result[Unpack[TupleAny]]:
"""Execute the statement represented by this
:class:`.ORMExecuteState`, without re-invoking events that have
already proceeded.
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
_scalar_result: bool = ...,
- ) -> Result[Unpack[Tuple[Any, ...]]]:
+ ) -> Result[Unpack[TupleAny]]:
...
def _execute_internal(
)
for idx, fn in enumerate(events_todo):
orm_exec_state._starting_event_idx = idx
- fn_result: Optional[Result[Unpack[Tuple[Any, ...]]]] = fn(
+ fn_result: Optional[Result[Unpack[TupleAny]]] = fn(
orm_exec_state
)
if fn_result:
if compile_state_cls:
result: Result[
- Unpack[Tuple[Any, ...]]
+ Unpack[TupleAny]
] = compile_state_cls.orm_execute_statement(
self,
statement,
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> CursorResult[Unpack[Tuple[Any, ...]]]:
+ ) -> CursorResult[Unpack[TupleAny]]:
...
@overload
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[Unpack[Tuple[Any, ...]]]:
+ ) -> Result[Unpack[TupleAny]]:
...
def execute(
bind_arguments: Optional[_BindArguments] = None,
_parent_execute_state: Optional[Any] = None,
_add_event: Optional[Any] = None,
- ) -> Result[Unpack[Tuple[Any, ...]]]:
+ ) -> Result[Unpack[TupleAny]]:
r"""Execute a SQL expression construct.
Returns a :class:`_engine.Result` object representing
with_for_update = ForUpdateArg._from_argument(with_for_update)
- stmt: Select[Unpack[Tuple[Any, ...]]] = sql.select(
- object_mapper(instance)
- )
+ stmt: Select[Unpack[TupleAny]] = sql.select(object_mapper(instance))
if (
loading.load_on_ident(
self,
from .. import util
from ..util.typing import Literal
from ..util.typing import Protocol
+from ..util.typing import TupleAny
from ..util.typing import Unpack
if TYPE_CHECKING:
self,
state: InstanceState[_O],
dict_: _InstanceDict,
- row: Row[Unpack[Tuple[Any, ...]]],
+ row: Row[Unpack[TupleAny]],
) -> None:
...
def _set_callable(
state: InstanceState[_O],
dict_: _InstanceDict,
- row: Row[Unpack[Tuple[Any, ...]]],
+ row: Row[Unpack[TupleAny]],
) -> None:
if "callables" not in state.__dict__:
state.callables = {}
def _set_callable(
state: InstanceState[_O],
dict_: _InstanceDict,
- row: Row[Unpack[Tuple[Any, ...]]],
+ row: Row[Unpack[TupleAny]],
) -> None:
if "callables" not in state.__dict__:
state.callables = {}
from ..util.typing import is_origin_of_cls
from ..util.typing import Literal
from ..util.typing import Protocol
+from ..util.typing import TupleAny
from ..util.typing import typing_get_origin
from ..util.typing import Unpack
ident: Union[Any, Tuple[Any, ...]] = None,
*,
instance: Optional[_T] = None,
- row: Optional[Union[Row[Unpack[Tuple[Any, ...]]], RowMapping]] = None,
+ row: Optional[Union[Row[Unpack[TupleAny]], RowMapping]] = None,
identity_token: Optional[Any] = None,
) -> _IdentityKeyType[_T]:
r"""Generate "identity key" tuples, as are used as keys in the
def create_row_processor(
self,
- query: Select[Unpack[Tuple[Any, ...]]],
- procs: Sequence[Callable[[Row[Unpack[Tuple[Any, ...]]]], Any]],
+ query: Select[Unpack[TupleAny]],
+ procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]],
labels: Sequence[str],
- ) -> Callable[[Row[Unpack[Tuple[Any, ...]]]], Any]:
+ ) -> Callable[[Row[Unpack[TupleAny]]], Any]:
"""Produce the "row processing" function for this :class:`.Bundle`.
May be overridden by subclasses to provide custom behaviors when
"""
keyed_tuple = result_tuple(labels, [() for l in labels])
- def proc(row: Row[Unpack[Tuple[Any, ...]]]) -> Any:
+ def proc(row: Row[Unpack[TupleAny]]) -> Any:
return keyed_tuple([proc(row) for proc in procs])
return proc
from typing import Any
from typing import Optional
from typing import overload
-from typing import Tuple
from typing import TYPE_CHECKING
from typing import TypeVar
from typing import Union
from .selectable import TableClause
from .selectable import TableSample
from .selectable import Values
+from ..util.typing import TupleAny
from ..util.typing import Unpack
if TYPE_CHECKING:
@overload
def select(
*entities: _ColumnsClauseArgument[Any], **__kw: Any
-) -> Select[Unpack[Tuple[Any, ...]]]:
+) -> Select[Unpack[TupleAny]]:
...
def select(
*entities: _ColumnsClauseArgument[Any], **__kw: Any
-) -> Select[Unpack[Tuple[Any, ...]]]:
+) -> Select[Unpack[TupleAny]]:
r"""Construct a new :class:`_expression.Select`.
from typing import Optional
from typing import overload
from typing import Set
-from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
from typing import TypeVar
from ..inspection import Inspectable
from ..util.typing import Literal
from ..util.typing import Protocol
+from ..util.typing import TupleAny
from ..util.typing import TypeAlias
from ..util.typing import Unpack
def is_select_statement(
t: Union[Executable, ReturnsRows]
- ) -> TypeGuard[Select[Unpack[Tuple[Any, ...]]]]:
+ ) -> TypeGuard[Select[Unpack[TupleAny]]]:
...
def is_table(t: FromClause) -> TypeGuard[TableClause]:
from ..util import FastIntFlag
from ..util.typing import Literal
from ..util.typing import Protocol
+from ..util.typing import TupleAny
from ..util.typing import TypedDict
from ..util.typing import Unpack
need_result_map_for_nested: bool
need_result_map_for_compound: bool
select_0: ReturnsRows
- insert_from_select: Select[Unpack[Tuple[Any, ...]]]
+ insert_from_select: Select[Unpack[TupleAny]]
class ExpandedState(NamedTuple):
return text
def _setup_select_hints(
- self, select: Select[Unpack[Tuple[Any, ...]]]
+ self, select: Select[Unpack[TupleAny]]
) -> Tuple[str, _FromHintsType]:
byfrom = {
from_: hinttext
from .. import exc
from .. import util
from ..util.typing import Self
+from ..util.typing import TupleAny
from ..util.typing import TypeGuard
from ..util.typing import TypeVarTuple
from ..util.typing import Unpack
_supports_multi_parameters = False
- select: Optional[Select[Unpack[Tuple[Any, ...]]]] = None
+ select: Optional[Select[Unpack[TupleAny]]] = None
"""SELECT statement for INSERT .. FROM SELECT"""
_post_values_clause: Optional[ClauseElement] = None
@overload
def returning(
self, *cols: _ColumnsClauseArgument[Any], **__kw: Any
- ) -> ReturningDelete[Unpack[Tuple[Any, ...]]]:
+ ) -> ReturningDelete[Unpack[TupleAny]]:
...
def returning(
self, *cols: _ColumnsClauseArgument[Any], **__kw: Any
- ) -> ReturningDelete[Unpack[Tuple[Any, ...]]]:
+ ) -> ReturningDelete[Unpack[TupleAny]]:
...
from ..util import TypingOnly
from ..util.typing import Literal
from ..util.typing import Self
+from ..util.typing import TupleAny
from ..util.typing import Unpack
if typing.TYPE_CHECKING:
connection: Connection,
distilled_params: _CoreMultiExecuteParams,
execution_options: CoreExecuteOptionsParameter,
- ) -> Result[Unpack[typing_Tuple[Any, ...]]]:
+ ) -> Result[Unpack[TupleAny]]:
if self.supports_execution:
if TYPE_CHECKING:
assert isinstance(self, Executable)
else:
check_value = value
cast(
- "BindParameter[typing_Tuple[Any, ...]]", self
+ "BindParameter[TupleAny]", self
).type = type_._resolve_values_to_types(check_value)
else:
- cast(
- "BindParameter[typing_Tuple[Any, ...]]", self
- ).type = type_
+ cast("BindParameter[TupleAny]", self).type = type_
else:
self.type = type_
or_ = BooleanClauseList.or_
-class Tuple(ClauseList, ColumnElement[typing_Tuple[Any, ...]]):
+class Tuple(ClauseList, ColumnElement[TupleAny]):
"""Represent a SQL tuple."""
__visit_name__ = "tuple"
from ..util.typing import Literal
from ..util.typing import Protocol
from ..util.typing import Self
+from ..util.typing import TupleAny
from ..util.typing import TypeVarTuple
from ..util.typing import Unpack
_use_schema_map = False
- def select(self) -> Select[Unpack[Tuple[Any, ...]]]:
+ def select(self) -> Select[Unpack[TupleAny]]:
r"""Return a SELECT of this :class:`_expression.FromClause`.
"join explicitly." % (a.description, b.description)
)
- def select(self) -> Select[Unpack[Tuple[Any, ...]]]:
+ def select(self) -> Select[Unpack[TupleAny]]:
r"""Create a :class:`_expression.Select` from this
:class:`_expression.Join`.
def _init(
self,
- selectable: Select[Unpack[Tuple[Any, ...]]],
+ selectable: Select[Unpack[TupleAny]],
*,
name: Optional[str] = None,
recursive: bool = False,
@classmethod
def from_statement(
cls,
- statement: Select[Unpack[Tuple[Any, ...]]],
+ statement: Select[Unpack[TupleAny]],
from_statement: roles.ReturnsRowsRole,
) -> ExecutableReturnsRows:
cls._plugin_not_implemented()
@classmethod
def get_columns_clause_froms(
- cls, statement: Select[Unpack[Tuple[Any, ...]]]
+ cls, statement: Select[Unpack[TupleAny]]
) -> List[FromClause]:
return cls._normalize_froms(
itertools.chain.from_iterable(
return go
def _get_froms(
- self, statement: Select[Unpack[Tuple[Any, ...]]]
+ self, statement: Select[Unpack[TupleAny]]
) -> List[FromClause]:
ambiguous_table_name_map: _AmbiguousTableNameMap
self._ambiguous_table_name_map = ambiguous_table_name_map = {}
def _normalize_froms(
cls,
iterable_of_froms: Iterable[FromClause],
- check_statement: Optional[Select[Unpack[Tuple[Any, ...]]]] = None,
+ check_statement: Optional[Select[Unpack[TupleAny]]] = None,
ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] = None,
) -> List[FromClause]:
"""given an iterable of things to select FROM, reduce them to what
@classmethod
def determine_last_joined_entity(
- cls, stmt: Select[Unpack[Tuple[Any, ...]]]
+ cls, stmt: Select[Unpack[TupleAny]]
) -> Optional[_JoinTargetElement]:
if stmt._setup_joins:
return stmt._setup_joins[-1][0]
@classmethod
def all_selected_columns(
- cls, statement: Select[Unpack[Tuple[Any, ...]]]
+ cls, statement: Select[Unpack[TupleAny]]
) -> _SelectIterable:
return [c for c in _select_iterables(statement._raw_columns)]
@classmethod
def _generate_for_statement(
- cls, select_stmt: Select[Unpack[Tuple[Any, ...]]]
+ cls, select_stmt: Select[Unpack[TupleAny]]
) -> None:
if select_stmt._setup_joins or select_stmt._with_options:
self = _MemoizedSelectEntities()
_compile_state_factory: Type[SelectState]
@classmethod
- def _create_raw_select(cls, **kw: Any) -> Select[Unpack[Tuple[Any, ...]]]:
+ def _create_raw_select(cls, **kw: Any) -> Select[Unpack[TupleAny]]:
"""Create a :class:`.Select` using raw ``__new__`` with no coercions.
Used internally to build up :class:`.Select` constructs with
@_generative
def add_columns(
self, *entities: _ColumnsClauseArgument[Any]
- ) -> Select[Unpack[Tuple[Any, ...]]]:
+ ) -> Select[Unpack[TupleAny]]:
r"""Return a new :func:`_expression.select` construct with
the given entities appended to its columns clause.
)
def column(
self, column: _ColumnsClauseArgument[Any]
- ) -> Select[Unpack[Tuple[Any, ...]]]:
+ ) -> Select[Unpack[TupleAny]]:
"""Return a new :func:`_expression.select` construct with
the given column expression added to its columns clause.
@util.preload_module("sqlalchemy.sql.util")
def reduce_columns(
self, only_synonyms: bool = True
- ) -> Select[Unpack[Tuple[Any, ...]]]:
+ ) -> Select[Unpack[TupleAny]]:
"""Return a new :func:`_expression.select` construct with redundantly
named, equivalently-valued columns removed from the columns clause.
all columns that are equivalent to another are removed.
"""
- woc: Select[Unpack[Tuple[Any, ...]]]
+ woc: Select[Unpack[TupleAny]]
woc = self.with_only_columns(
*util.preloaded.sql_util.reduce_columns(
self._all_selected_columns,
*entities: _ColumnsClauseArgument[Any],
maintain_column_froms: bool = False,
**__kw: Any,
- ) -> Select[Unpack[Tuple[Any, ...]]]:
+ ) -> Select[Unpack[TupleAny]]:
...
@_generative
*entities: _ColumnsClauseArgument[Any],
maintain_column_froms: bool = False,
**__kw: Any,
- ) -> Select[Unpack[Tuple[Any, ...]]]:
+ ) -> Select[Unpack[TupleAny]]:
r"""Return a new :func:`_expression.select` construct with its columns
clause replaced with the given entities.
meth = SelectState.get_plugin_class(self).all_selected_columns
return list(meth(self))
- def _ensure_disambiguated_names(self) -> Select[Unpack[Tuple[Any, ...]]]:
+ def _ensure_disambiguated_names(self) -> Select[Unpack[TupleAny]]:
if self._label_style is LABEL_STYLE_NONE:
self = self.set_label_style(LABEL_STYLE_DISAMBIGUATE_ONLY)
return self
by this :class:`_expression.ScalarSelect`.
"""
- self.element = cast(
- "Select[Unpack[Tuple[Any, ...]]]", self.element
- ).where(crit)
+ self.element = cast("Select[Unpack[TupleAny]]", self.element).where(
+ crit
+ )
return self
@overload
if TYPE_CHECKING:
- def _ungroup(self) -> Select[Unpack[Tuple[Any, ...]]]:
+ def _ungroup(self) -> Select[Unpack[TupleAny]]:
...
@_generative
"""
self.element = cast(
- "Select[Unpack[Tuple[Any, ...]]]", self.element
+ "Select[Unpack[TupleAny]]", self.element
).correlate(*fromclauses)
return self
"""
self.element = cast(
- "Select[Unpack[Tuple[Any, ...]]]", self.element
+ "Select[Unpack[TupleAny]]", self.element
).correlate_except(*fromclauses)
return self
inherit_cache = True
element: Union[
- SelectStatementGrouping[Select[Unpack[Tuple[Any, ...]]]],
+ SelectStatementGrouping[Select[Unpack[TupleAny]]],
ScalarSelect[Any],
]
def _regroup(
self,
- fn: Callable[
- [Select[Unpack[Tuple[Any, ...]]]], Select[Unpack[Tuple[Any, ...]]]
- ],
- ) -> SelectStatementGrouping[Select[Unpack[Tuple[Any, ...]]]]:
+ fn: Callable[[Select[Unpack[TupleAny]]], Select[Unpack[TupleAny]]],
+ ) -> SelectStatementGrouping[Select[Unpack[TupleAny]]]:
element = self.element._ungroup()
new_element = fn(element)
assert isinstance(return_value, SelectStatementGrouping)
return return_value
- def select(self) -> Select[Unpack[Tuple[Any, ...]]]:
+ def select(self) -> Select[Unpack[TupleAny]]:
r"""Return a SELECT of this :class:`_expression.Exists`.
e.g.::
from ..util import OrderedDict
from ..util.typing import is_literal
from ..util.typing import Literal
+from ..util.typing import TupleAny
from ..util.typing import typing_get_args
if TYPE_CHECKING:
)
-class TupleType(TypeEngine[Tuple[Any, ...]]):
+class TupleType(TypeEngine[TupleAny]):
"""represent the composite type of a Tuple."""
_is_tuple_type = True
_VT = TypeVar("_VT")
_VT_co = TypeVar("_VT_co", covariant=True)
+TupleAny = Tuple[Any, ...]
+
if compat.py310:
# why they took until py310 to put this in stdlib is beyond me,
def run(cmd: code_writer_cmd, update_year: bool):
i = 0
- for ext in ('py', 'pyx', 'pxd'):
+ for ext in ("py", "pyx", "pxd"):
for file in sa_path.glob(f"**/*.{ext}"):
run_file(cmd, file, update_year)
i += 1