from typing import cast
from typing import Dict
from typing import FrozenSet
+from typing import Generator
from typing import Generic
from typing import Iterable
from typing import Iterator
from ..util import HasMemoized as HasMemoized
from ..util import hybridmethod
from ..util import typing as compat_typing
+from ..util.typing import Final
from ..util.typing import Protocol
from ..util.typing import Self
from ..util.typing import TypeGuard
from ._orm_types import DMLStrategyArgument
from ._orm_types import SynchronizeSessionArgument
from ._typing import _CLE
+ from .cache_key import CacheKey
from .compiler import SQLCompiler
from .elements import BindParameter
from .elements import ClauseList
from .selectable import _JoinTargetElement
from .selectable import _SelectIterable
from .selectable import FromClause
+ from .visitors import anon_map
from ..engine import Connection
from ..engine import CursorResult
from ..engine.interfaces import _CoreMultiExecuteParams
return f"_NoArg.{self.name}"
-NO_ARG = _NoArg.NO_ARG
+NO_ARG: Final = _NoArg.NO_ARG
class _NoneName(Enum):
"""indicate a 'deferred' name that was ultimately the value None."""
-_NONE_NAME = _NoneName.NONE_NAME
+_NONE_NAME: Final = _NoneName.NONE_NAME
_T = TypeVar("_T", bound=Any)
)
-_never_select_column = operator.attrgetter("_omit_from_statements")
+_never_select_column: operator.attrgetter[Any] = operator.attrgetter(
+ "_omit_from_statements"
+)
class _EntityNamespace(Protocol):
__slots__ = ()
- _is_immutable = True
+ _is_immutable: bool = True
- def unique_params(self, *optionaldict, **kwargs):
+ def unique_params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
raise NotImplementedError("Immutable objects do not support copying")
- def params(self, *optionaldict, **kwargs):
+ def params(self, *optionaldict: Any, **kwargs: Any) -> NoReturn:
raise NotImplementedError("Immutable objects do not support copying")
def _clone(self: _Self, **kw: Any) -> _Self:
class SingletonConstant(Immutable):
"""Represent SQL constants like NULL, TRUE, FALSE"""
- _is_singleton_constant = True
+ _is_singleton_constant: bool = True
_singleton: SingletonConstant
raise NotImplementedError()
@classmethod
- def _create_singleton(cls):
+ def _create_singleton(cls) -> None:
obj = object.__new__(cls)
obj.__init__() # type: ignore
def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]:
- msgs = kw.pop("msgs", {})
+ msgs: Dict[str, str] = kw.pop("msgs", {})
- defaults = kw.pop("defaults", {})
+ defaults: Dict[str, str] = kw.pop("defaults", {})
- getters = [
+ getters: List[Tuple[str, operator.attrgetter[Any], Optional[str]]] = [
(name, operator.attrgetter(name), defaults.get(name, None))
for name in names
]
@util.decorator
- def check(fn, *args, **kw):
+ def check(fn: _Fn, *args: Any, **kw: Any) -> Any:
# make pylance happy by not including "self" in the argument
# list
self = args[0]
The returned set is in terms of the entities present within 'a'.
"""
- all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b))
+ all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
+ _expand_cloned(b)
+ )
return {elem for elem in a if all_overlap.intersection(elem._cloned_set)}
def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
- all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b))
+ all_overlap: Set[_CLE] = set(_expand_cloned(a)).intersection(
+ _expand_cloned(b)
+ )
return {
elem for elem in a if not all_overlap.intersection(elem._cloned_set)
}
__slots__ = ("obj",)
- def __init__(self, obj):
+ def __init__(self, obj: DialectKWArgs) -> None:
self.obj = obj
- def _key(self, key):
+ def _key(self, key: str) -> Tuple[str, str]:
try:
dialect, value_key = key.split("_", 1)
except ValueError as err:
else:
return dialect, value_key
- def __getitem__(self, key):
+ def __getitem__(self, key: str) -> Any:
dialect, value_key = self._key(key)
try:
else:
return opt[value_key]
- def __setitem__(self, key, value):
+ def __setitem__(self, key: str, value: Any) -> None:
try:
dialect, value_key = self._key(key)
except KeyError as err:
else:
self.obj.dialect_options[dialect][value_key] = value
- def __delitem__(self, key):
+ def __delitem__(self, key: str) -> None:
dialect, value_key = self._key(key)
del self.obj.dialect_options[dialect][value_key]
- def __len__(self):
+ def __len__(self) -> int:
return sum(
len(args._non_defaults)
for args in self.obj.dialect_options.values()
)
- def __iter__(self):
+ def __iter__(self) -> Generator[str, None, None]:
return (
"%s_%s" % (dialect_name, value_name)
for dialect_name in self.obj.dialect_options
"""
- def __init__(self):
- self._non_defaults = {}
- self._defaults = {}
+ def __init__(self) -> None:
+ self._non_defaults: Dict[str, Any] = {}
+ self._defaults: Dict[str, Any] = {}
- def __len__(self):
+ def __len__(self) -> int:
return len(set(self._non_defaults).union(self._defaults))
- def __iter__(self):
+ def __iter__(self) -> Iterator[str]:
return iter(set(self._non_defaults).union(self._defaults))
- def __getitem__(self, key):
+ def __getitem__(self, key: str) -> Any:
if key in self._non_defaults:
return self._non_defaults[key]
else:
return self._defaults[key]
- def __setitem__(self, key, value):
+ def __setitem__(self, key: str, value: Any) -> None:
self._non_defaults[key] = value
- def __delitem__(self, key):
+ def __delitem__(self, key: str) -> None:
del self._non_defaults[key]
@util.preload_module("sqlalchemy.dialects")
-def _kw_reg_for_dialect(dialect_name):
+def _kw_reg_for_dialect(dialect_name: str) -> Optional[Dict[Any, Any]]:
dialect_cls = util.preloaded.dialects.registry.load(dialect_name)
if dialect_cls.construct_arguments is None:
return None
__slots__ = ()
- _dialect_kwargs_traverse_internals = [
+ _dialect_kwargs_traverse_internals: List[Tuple[str, Any]] = [
("dialect_options", InternalTraversal.dp_dialect_options)
]
@classmethod
- def argument_for(cls, dialect_name, argument_name, default):
+ def argument_for(
+ cls, dialect_name: str, argument_name: str, default: Any
+ ) -> None:
"""Add a new kind of dialect-specific keyword argument for this class.
E.g.::
"""
- construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
+ construct_arg_dictionary: Optional[Dict[Any, Any]] = (
+ DialectKWArgs._kw_registry[dialect_name]
+ )
if construct_arg_dictionary is None:
raise exc.ArgumentError(
"Dialect '%s' does have keyword-argument "
construct_arg_dictionary[cls][argument_name] = default
@property
- def dialect_kwargs(self):
+ def dialect_kwargs(self) -> _DialectArgView:
"""A collection of keyword arguments specified as dialect-specific
options to this construct.
return _DialectArgView(self)
@property
- def kwargs(self):
+ def kwargs(self) -> _DialectArgView:
"""A synonym for :attr:`.DialectKWArgs.dialect_kwargs`."""
return self.dialect_kwargs
- _kw_registry = util.PopulateDict(_kw_reg_for_dialect)
+ _kw_registry: util.PopulateDict[str, Optional[Dict[Any, Any]]] = (
+ util.PopulateDict(_kw_reg_for_dialect)
+ )
@classmethod
- def _kw_reg_for_dialect_cls(cls, dialect_name):
+ def _kw_reg_for_dialect_cls(cls, dialect_name: str) -> _DialectArgDict:
construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
d = _DialectArgDict()
return d
@util.memoized_property
- def dialect_options(self):
+ def dialect_options(self) -> util.PopulateDict[str, _DialectArgDict]:
"""A collection of keyword arguments specified as dialect-specific
options to this construct.
)
super().__init_subclass__()
- def __init__(self, **kw):
+ def __init__(self, **kw: Any) -> None:
self.__dict__.update(kw)
def __add__(self, other):
return False
return True
- def __repr__(self):
+ def __repr__(self) -> str:
# TODO: fairly inefficient, used only in debugging right now.
return "%s(%s)" % (
return issubclass(cls, klass)
@hybridmethod
- def add_to_element(self, name, value):
+ def add_to_element(self, name: str, value: str) -> Any:
return self + {name: getattr(self, name) + value}
@hybridmethod
return cls._state_dict_const
@classmethod
- def safe_merge(cls, other):
+ def safe_merge(cls, other: "Options") -> Any:
d = other._state_dict()
# only support a merge with another object of our class
@classmethod
def from_execution_options(
- cls, key, attrs, exec_options, statement_exec_options
- ):
+ cls,
+ key: str,
+ attrs: set[str],
+ exec_options: Mapping[str, Any],
+ statement_exec_options: Mapping[str, Any],
+ ) -> Tuple["Options", Mapping[str, Any]]:
"""process Options argument in terms of execution options.
result[local] = statement_exec_options[argname]
new_options = existing_options + result
- exec_options = util.immutabledict().merge_with(
- exec_options, {key: new_options}
+ exec_options = util.immutabledict(exec_options).merge_with(
+ {key: new_options}
)
return new_options, exec_options
__slots__ = ()
@hybridmethod
- def _gen_cache_key_inst(self, anon_map, bindparams):
+ def _gen_cache_key_inst(
+ self, anon_map: Any, bindparams: List[BindParameter[Any]]
+ ) -> Optional[Tuple[Any]]:
return HasCacheKey._gen_cache_key(self, anon_map, bindparams)
@_gen_cache_key_inst.classlevel
- def _gen_cache_key(cls, anon_map, bindparams):
+ def _gen_cache_key(
+ cls, anon_map: "anon_map", bindparams: List[BindParameter[Any]]
+ ) -> Tuple[CacheableOptions, Any]:
return (cls, ())
@hybridmethod
- def _generate_cache_key(self):
+ def _generate_cache_key(self) -> Optional[CacheKey]:
return HasCacheKey._generate_cache_key_for_object(self)
class ExecutableOption(HasCopyInternals):
__slots__ = ()
- _annotations = util.EMPTY_DICT
+ _annotations: _ImmutableExecuteOptions = util.EMPTY_DICT
- __visit_name__ = "executable_option"
+ __visit_name__: str = "executable_option"
- _is_has_cache_key = False
+ _is_has_cache_key: bool = False
- _is_core = True
+ _is_core: bool = True
def _clone(self, **kw):
"""Create a shallow copy of this ExecutableOption."""
supports_execution: bool = True
_execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT
- _is_default_generator = False
+ _is_default_generator: bool = False
_with_options: Tuple[ExecutableOption, ...] = ()
_with_context_options: Tuple[
Tuple[Callable[[CompileState], None], Any], ...
("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs),
]
- is_select = False
- is_from_statement = False
- is_update = False
- is_insert = False
- is_text = False
- is_delete = False
- is_dml = False
+ is_select: bool = False
+ is_from_statement: bool = False
+ is_update: bool = False
+ is_insert: bool = False
+ is_text: bool = False
+ is_delete: bool = False
+ is_dml: bool = False
if TYPE_CHECKING:
__visit_name__: str
) -> Any: ...
@util.ro_non_memoized_property
- def _all_selected_columns(self):
+ def _all_selected_columns(self) -> _SelectIterable:
raise NotImplementedError()
@property
"""
- __traverse_options__ = {"schema_visitor": True}
+ __traverse_options__: Dict[str, Any] = {"schema_visitor": True}
class _SentinelDefaultCharacterization(Enum):
def __init__(
self, collection: ColumnCollection[Any, _COL_co], col: _COL_co
- ):
+ ) -> None:
self.column = col
# proxy_index being non-empty means it was initialized.
for eps_col in col._expanded_proxy_set:
pi[eps_col].add(self)
- def get_expanded_proxy_set(self):
+ def get_expanded_proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
return self.column._expanded_proxy_set
- def dispose(self, collection):
+ def dispose(self, collection: ColumnCollection[_COLKEY, _COL_co]) -> None:
pi = collection._proxy_index
if not pi:
return
"""
- __slots__ = "_collection", "_index", "_colset", "_proxy_index"
+ __slots__ = ("_collection", "_index", "_colset", "_proxy_index")
_collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]]
_index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]]
else:
return True
- def compare(self, other: ColumnCollection[Any, Any]) -> bool:
+ def compare(self, other: ColumnCollection[_COLKEY, _COL_co]) -> bool:
"""Compare this :class:`_expression.ColumnCollection` to another
based on the names of the keys"""
:class:`_sql.ColumnCollection`."""
raise NotImplementedError()
- def remove(self, column: Any) -> None:
+ def remove(self, column: Any) -> NoReturn:
raise NotImplementedError()
def update(self, iter_: Any) -> NoReturn:
raise NotImplementedError()
# https://github.com/python/mypy/issues/4266
- __hash__ = None # type: ignore
+ __hash__: Optional[int] = None # type: ignore
def _populate_separate_keys(
self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
return ReadOnlyColumnCollection(self)
- def _init_proxy_index(self):
+ def _init_proxy_index(self) -> None:
"""populate the "proxy index", if empty.
proxy index is added in 2.0 to provide more efficient operation
def extend(self, iter_: Iterable[_NAMEDCOL]) -> None:
self._populate_separate_keys((col.key, col) for col in iter_)
- def remove(self, column: _NAMEDCOL) -> None:
+ def remove(self, column: _NAMEDCOL) -> None: # type: ignore[override]
if column not in self._colset:
raise ValueError(
"Can't remove column %r; column is not in this collection"
):
__slots__ = ("_parent",)
- def __init__(self, collection):
+ def __init__(self, collection: ColumnCollection[_COLKEY, _COL_co]):
object.__setattr__(self, "_parent", collection)
object.__setattr__(self, "_colset", collection._colset)
object.__setattr__(self, "_index", collection._index)
object.__setattr__(self, "_collection", collection._collection)
object.__setattr__(self, "_proxy_index", collection._proxy_index)
- def __getstate__(self):
+ def __getstate__(self) -> Dict[str, _COL_co]:
return {"_parent": self._parent}
- def __setstate__(self, state):
+ def __setstate__(self, state: Dict[str, Any]) -> None:
parent = state["_parent"]
self.__init__(parent) # type: ignore
class ColumnSet(util.OrderedSet["ColumnClause[Any]"]):
- def contains_column(self, col):
+ def contains_column(self, col: ColumnClause[Any]) -> bool:
return col in self
- def extend(self, cols):
+ def extend(self, cols: Iterable[Any]) -> None:
for col in cols:
self.add(col)
l.append(c == local)
return elements.and_(*l)
- def __hash__(self): # type: ignore[override]
+ def __hash__(self) -> int: # type: ignore[override]
return hash(tuple(x for x in self))