Fixed internal typing issues to establish compatibility with mypy 1.11.0.
Note that this does not include issues which have arisen with the
deprecated mypy plugin used by SQLAlchemy 1.4-style code; see the addiional
change note for this plugin indicating revised compatibility.
The legacy mypy plugin is no longer fully functional with the latest series
of mypy 1.11.0, as changes in the mypy interpreter are no longer compatible
with the approach used by the plugin. If code is dependent on the legacy
mypy plugin with sqlalchemy2-stubs, it's recommended to pin mypy to be
below the 1.11.0 series. Seek upgrading to the 2.0 series of SQLAlchemy
and migrating to the modern type annotations.
Change-Id: Ib8fef93ede588430dc0f7ed44ef887649a415821
--- /dev/null
+.. change::
+ :tags: bug, mypy
+ :versions: 2.0
+
+ The deprecated mypy plugin is no longer fully functional with the latest
+ series of mypy 1.11.0, as changes in the mypy interpreter are no longer
+ compatible with the approach used by the plugin. If code is dependent on
+ the mypy plugin with sqlalchemy2-stubs, it's recommended to pin mypy to be
+ below the 1.11.0 series. Seek upgrading to the 2.0 series of SQLAlchemy
+ and migrating to the modern type annotations.
+
+ .. seealso::
+
+ :ref:`mypy_toplevel`
--- /dev/null
+.. change::
+ :tags: bug, mypy
+
+ Fixed internal typing issues to establish compatibility with mypy 1.11.0.
+ Note that this does not include issues which have arisen with the
+ deprecated mypy plugin used by SQLAlchemy 1.4-style code; see the addiional
+ change note for this plugin indicating revised compatibility.
**The SQLAlchemy Mypy Plugin is DEPRECATED, and will be removed possibly
as early as the SQLAlchemy 2.1 release. We would urge users to please
- migrate away from it ASAP.**
+ migrate away from it ASAP. The mypy plugin also works only up until
+ mypy version 1.10.1. version 1.11.0 and greater may not work properly.**
This plugin cannot be maintained across constantly changing releases
of mypy and its stability going forward CANNOT be guaranteed.
.. topic:: SQLAlchemy Mypy Plugin Status Update
- **Updated July 2023**
+ **Updated July 2024**
+
+ The mypy plugin is supported **only up until mypy 1.10.1, and it will have
+ issues running with 1.11.0 or greater**. Use with mypy 1.11.0 or greater
+ may have error conditions which currently cannot be resolved.
For SQLAlchemy 2.0, the Mypy plugin continues to work at the level at which
it reached in the SQLAlchemy 1.4 release. SQLAlchemy 2.0 however features
"""
raise NotImplementedError()
- @classmethod
- def type_descriptor(cls, typeobj: TypeEngine[_T]) -> TypeEngine[_T]:
+ def type_descriptor(self, typeobj: TypeEngine[_T]) -> TypeEngine[_T]:
"""Transform a generic type to a dialect-specific type.
Dialect classes will usually use the
def get_multi_columns(
self,
connection: Connection,
+ *,
schema: Optional[str] = None,
filter_names: Optional[Collection[str]] = None,
**kw: Any,
def get_multi_pk_constraint(
self,
connection: Connection,
+ *,
schema: Optional[str] = None,
filter_names: Optional[Collection[str]] = None,
**kw: Any,
def get_multi_foreign_keys(
self,
connection: Connection,
+ *,
schema: Optional[str] = None,
filter_names: Optional[Collection[str]] = None,
**kw: Any,
def get_multi_indexes(
self,
connection: Connection,
+ *,
schema: Optional[str] = None,
filter_names: Optional[Collection[str]] = None,
**kw: Any,
def get_multi_unique_constraints(
self,
connection: Connection,
+ *,
schema: Optional[str] = None,
filter_names: Optional[Collection[str]] = None,
**kw: Any,
def get_multi_check_constraints(
self,
connection: Connection,
+ *,
schema: Optional[str] = None,
filter_names: Optional[Collection[str]] = None,
**kw: Any,
def get_multi_table_options(
self,
connection: Connection,
+ *,
schema: Optional[str] = None,
filter_names: Optional[Collection[str]] = None,
**kw: Any,
def get_multi_table_comment(
self,
connection: Connection,
+ *,
schema: Optional[str] = None,
filter_names: Optional[Collection[str]] = None,
**kw: Any,
"name": self.name,
"line": self.line,
"column": self.column,
- "type": self.type.serialize(),
+ "type": serialize_type(self.type),
}
def expand_typevar_from_subtype(self, sub_type: TypeInfo) -> None:
return sym.node
return cls.info
+
+
+def serialize_type(typ: Type) -> Union[str, JsonDict]:
+ try:
+ return typ.serialize()
+ except Exception:
+ pass
+ if hasattr(typ, "args"):
+ typ.args = tuple(
+ (
+ a.resolve_string_annotation()
+ if hasattr(a, "resolve_string_annotation")
+ else a
+ )
+ for a in typ.args
+ )
+ elif hasattr(typ, "resolve_string_annotation"):
+ typ = typ.resolve_string_annotation()
+ return typ.serialize()
elif isinstance(self.prop.composite_class, type) and isinstance(
value, self.prop.composite_class
):
- values = self.prop._composite_values_from_instance(value)
+ values = self.prop._composite_values_from_instance(
+ value # type: ignore[arg-type]
+ )
else:
raise sa_exc.ArgumentError(
"Can't UPDATE composite attribute %s to %r"
from ..sql import coercions
from ..sql import expression
from ..sql import roles
+from ..util.langhelpers import Missing
+from ..util.langhelpers import MissingOr
from ..util.typing import Literal
if TYPE_CHECKING:
_KT = TypeVar("_KT", bound=Any)
_VT = TypeVar("_VT", bound=Any)
-_F = TypeVar("_F", bound=Callable[[Any], Any])
-
class _PlainColumnGetter(Generic[_KT]):
"""Plain column getter, stores collection of Column objects
def _cols(self, mapper: Mapper[_KT]) -> Sequence[ColumnElement[_KT]]:
return self.cols
- def __call__(self, value: _KT) -> Union[_KT, Tuple[_KT, ...]]:
+ def __call__(self, value: _KT) -> MissingOr[Union[_KT, Tuple[_KT, ...]]]:
state = base.instance_state(value)
m = base._state_mapper(state)
else:
obj = key[0]
if obj is None:
- return _UNMAPPED_AMBIGUOUS_NONE
+ return Missing
else:
return obj
)
-_UNMAPPED_AMBIGUOUS_NONE = object()
-
-
class _AttrGetter:
__slots__ = ("attr_name", "getter")
dict_ = state.dict
obj = dict_.get(self.attr_name, base.NO_VALUE)
if obj is None:
- return _UNMAPPED_AMBIGUOUS_NONE
+ return Missing
else:
- return _UNMAPPED_AMBIGUOUS_NONE
+ return Missing
return obj
def keyfunc_mapping(
- keyfunc: _F,
+ keyfunc: Callable[[Any], Any],
*,
ignore_unpopulated_attribute: bool = False,
) -> Type[KeyFuncDict[_KT, Any]]:
def __init__(
self,
- keyfunc: _F,
+ keyfunc: Callable[[Any], Any],
*dict_args: Any,
ignore_unpopulated_attribute: bool = False,
) -> None:
@classmethod
def _unreduce(
cls,
- keyfunc: _F,
+ keyfunc: Callable[[Any], Any],
values: Dict[_KT, _KT],
adapter: Optional[CollectionAdapter] = None,
) -> "KeyFuncDict[_KT, _KT]":
)
else:
return
- elif key is _UNMAPPED_AMBIGUOUS_NONE:
+ elif key is Missing:
if not self.ignore_unpopulated_attribute:
self._raise_for_unpopulated(
value, _sa_initiator, warn_only=True
value, _sa_initiator, warn_only=False
)
return
- elif key is _UNMAPPED_AMBIGUOUS_NONE:
+ elif key is Missing:
if not self.ignore_unpopulated_attribute:
self._raise_for_unpopulated(
value, _sa_initiator, warn_only=True
def _mapped_collection_cls(
- keyfunc: _F, ignore_unpopulated_attribute: bool
+ keyfunc: Callable[[Any], Any], ignore_unpopulated_attribute: bool
) -> Type[KeyFuncDict[_KT, _KT]]:
class _MKeyfuncMapped(KeyFuncDict[_KT, _KT]):
def __init__(self, *dict_args: Any) -> None:
)
@overload
- def as_scalar(
+ def as_scalar( # type: ignore[overload-overlap]
self: Query[Tuple[_MAYBE_ENTITY]],
) -> ScalarSelect[_MAYBE_ENTITY]: ...
c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
"""An alias for :attr:`.Bundle.columns`."""
- def _clone(self):
+ def _clone(self, **kw):
cloned = self.__class__.__new__(self.__class__)
cloned.__dict__.update(self.__dict__)
return cloned
l.append(c == local)
return elements.and_(*l)
- def __hash__(self):
+ def __hash__(self): # type: ignore[override]
return hash(tuple(x for x in self))
element: Any,
argname: Optional[str] = None,
resolved: Optional[Any] = None,
+ *,
advice: Optional[str] = None,
code: Optional[str] = None,
err: Optional[Exception] = None,
class _NoTextCoercion(RoleImpl):
__slots__ = ()
- def _literal_coercion(self, element, argname=None, **kw):
+ def _literal_coercion(self, element, *, argname=None, **kw):
if isinstance(element, str) and issubclass(
elements.TextClause, self._role_class
):
def _text_coercion(self, element, argname=None):
return _no_text_coercion(element, argname)
- def _literal_coercion(self, element, argname=None, **kw):
+ def _literal_coercion(self, element, *, argname=None, **kw):
if isinstance(element, str):
if self._coerce_star and element == "*":
return elements.ColumnClause("*", is_literal=True)
self,
element,
resolved,
- argname,
+ argname=None,
+ *,
type_=None,
literal_execute=False,
**kw,
literal_execute=literal_execute,
)
- def _literal_coercion(self, element, argname=None, type_=None, **kw):
+ def _literal_coercion(self, element, **kw):
return element
element: Any,
argname: Optional[str] = None,
resolved: Optional[Any] = None,
+ *,
advice: Optional[str] = None,
code: Optional[str] = None,
err: Optional[Exception] = None,
__slots__ = ()
def _literal_coercion(
- self, element, name=None, type_=None, argname=None, is_crud=False, **kw
+ self, element, *, name=None, type_=None, is_crud=False, **kw
):
if (
element is None
class BinaryElementImpl(ExpressionElementImpl, RoleImpl):
__slots__ = ()
- def _literal_coercion(
- self, element, expr, operator, bindparam_type=None, argname=None, **kw
+ def _literal_coercion( # type: ignore[override]
+ self,
+ element,
+ *,
+ expr,
+ operator,
+ bindparam_type=None,
+ argname=None,
+ **kw,
):
try:
return expr._bind_param(operator, element, type_=bindparam_type)
except exc.ArgumentError as err:
self._raise_for_expected(element, err=err)
- def _post_coercion(self, resolved, expr, bindparam_type=None, **kw):
+ def _post_coercion(self, resolved, *, expr, bindparam_type=None, **kw):
if resolved.type._isnull and not expr.type._isnull:
resolved = resolved._with_binary_element_type(
bindparam_type if bindparam_type is not None else expr.type
% (elem.__class__.__name__)
)
- def _literal_coercion(self, element, expr, operator, **kw):
+ def _literal_coercion( # type: ignore[override]
+ self, element, *, expr, operator, **kw
+ ):
if util.is_non_string_iterable(element):
non_literal_expressions: Dict[
Optional[operators.ColumnOperators],
else:
self._raise_for_expected(element, **kw)
- def _post_coercion(self, element, expr, operator, **kw):
+ def _post_coercion(self, element, *, expr, operator, **kw):
if element._is_select_base:
# for IN, we are doing scalar_subquery() coercion without
# a warning
_coerce_consts = True
- def _literal_coercion(
- self, element, name=None, type_=None, argname=None, is_crud=False, **kw
- ):
+ def _literal_coercion(self, element, **kw):
self._raise_for_expected(element)
- def _post_coercion(self, resolved, original_element=None, **kw):
+ def _post_coercion(self, resolved, *, original_element=None, **kw):
# this is a hack right now as we want to use coercion on an
# ORM InstrumentedAttribute, but we want to return the object
# itself if it is one, not its clause element.
class DMLColumnImpl(_ReturnsStringKey, RoleImpl):
__slots__ = ()
- def _post_coercion(self, element, as_key=False, **kw):
+ def _post_coercion(self, element, *, as_key=False, **kw):
if as_key:
return element.key
else:
class ConstExprImpl(RoleImpl):
__slots__ = ()
- def _literal_coercion(self, element, argname=None, **kw):
+ def _literal_coercion(self, element, *, argname=None, **kw):
if element is None:
return elements.Null()
elif element is False:
else:
self._raise_for_expected(element, argname, resolved)
- def _literal_coercion(self, element, argname=None, **kw):
+ def _literal_coercion(self, element, **kw):
"""coerce the given value to :class:`._truncated_label`.
Existing :class:`._truncated_label` and
else:
self._raise_for_expected(element, argname, resolved)
- def _literal_coercion(self, element, name, type_, **kw):
+ def _literal_coercion( # type: ignore[override]
+ self, element, *, name, type_, **kw
+ ):
if element is None:
return None
else:
_guess_straight_column = re.compile(r"^\w\S*$", re.I)
def _raise_for_expected(
- self, element, argname=None, resolved=None, advice=None, **kw
+ self, element, argname=None, resolved=None, *, advice=None, **kw
):
if not advice and isinstance(element, list):
advice = (
class StatementImpl(_CoerceLiterals, RoleImpl):
__slots__ = ()
- def _post_coercion(self, resolved, original_element, argname=None, **kw):
+ def _post_coercion(
+ self, resolved, *, original_element, argname=None, **kw
+ ):
if resolved is not original_element and not isinstance(
original_element, str
):
_skip_clauseelement_for_target_match = True
- def _literal_coercion(self, element, argname=None, **kw):
+ def _literal_coercion(self, element, *, argname=None, **kw):
self._raise_for_expected(element, argname)
def _implicit_coercions(
element: Any,
resolved: Any,
argname: Optional[str] = None,
+ *,
legacy: bool = False,
**kw: Any,
) -> Any:
element: Any,
resolved: Any,
argname: Optional[str] = None,
+ *,
explicit_subquery: bool = False,
allow_select: bool = True,
**kw: Any,
else:
self._raise_for_expected(element, argname, resolved)
- def _post_coercion(self, element, deannotate=False, **kw):
+ def _post_coercion(self, element, *, deannotate=False, **kw):
if deannotate:
return element._deannotate()
else:
element: Any,
resolved: Any,
argname: Optional[str] = None,
- explicit_subquery: bool = False,
+ *,
allow_select: bool = False,
**kw: Any,
) -> Any:
class AnonymizedFromClauseImpl(StrictFromClauseImpl):
__slots__ = ()
- def _post_coercion(self, element, flat=False, name=None, **kw):
+ def _post_coercion(self, element, *, flat=False, name=None, **kw):
assert name is None
return element._anonymous_fromclause(flat=flat)
def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
return self.visit_getitem_binary(binary, operator, **kw)
- def visit_sequence(self, seq, **kw):
- return "<next sequence value: %s>" % self.preparer.format_sequence(seq)
+ def visit_sequence(self, sequence, **kw):
+ return (
+ f"<next sequence value: {self.preparer.format_sequence(sequence)}>"
+ )
def returning_clause(
self,
for t in extra_froms
)
- def visit_empty_set_expr(self, type_, **kw):
+ def visit_empty_set_expr(self, element_types, **kw):
return "SELECT 1 WHERE 1!=1"
def get_from_hint_text(self, table, text):
def compare(self, other, **kw):
raise NotImplementedError()
- def _copy_internals(self, other, **kw):
+ def _copy_internals(self, **kw):
raise NotImplementedError()
def __eq__(self, other):
self.type = type_api.NULLTYPE
def self_group(self, against: Optional[OperatorType] = None) -> Self:
- assert against is operator.getitem # type: ignore[comparison-overlap]
+ assert against is operator.getitem
return self
if _adapted_from:
self.dispatch = self.dispatch._join(_adapted_from.dispatch)
- def _set_parent(self, column, **kw):
+ def _set_parent(self, parent, **kw):
# set parent hook is when this type is associated with a column.
# Column calls it for all SchemaEventTarget instances, either the
# base type and/or variants in _variant_mapping.
# on_table/metadata_create/drop in this method, which is used by
# "native" types with a separate CREATE/DROP e.g. Postgresql.ENUM
- column._on_table_attach(util.portable_instancemethod(self._set_table))
+ parent._on_table_attach(util.portable_instancemethod(self._set_table))
def _variant_mapping_for_set_table(self, column):
if column.type._variant_mapping:
assert "_enums" in kw
return impltype(**kw)
- def adapt(self, impltype, **kw):
+ def adapt(self, cls, **kw):
kw["_enums"] = self._enums_argument
kw["_disable_warnings"] = True
- return super().adapt(impltype, **kw)
+ return super().adapt(cls, **kw)
def _should_create_constraint(self, compiler, **kw):
if not self._is_impl_for_variant(compiler.dialect, kw):
def compare_values(self, x, y):
return x == y
- def _set_parent(self, column, outer=False, **kw):
+ def _set_parent(self, parent, outer=False, **kw):
"""Support SchemaEventTarget"""
if not outer and isinstance(self.item_type, SchemaEventTarget):
- self.item_type._set_parent(column, **kw)
+ self.item_type._set_parent(parent, **kw)
- def _set_parent_with_dispatch(self, parent):
+ def _set_parent_with_dispatch(self, parent, **kw):
"""Support SchemaEventTarget"""
super()._set_parent_with_dispatch(parent, outer=True)
varkw: Optional[str]
defaults: Optional[Tuple[Any, ...]]
kwonlyargs: List[str]
- kwonlydefaults: Dict[str, Any]
+ kwonlydefaults: Optional[Dict[str, Any]]
annotations: Dict[str, Any]
assert py_spec.loader
py_spec.loader.exec_module(py_module)
return cast(_M, py_module)
+
+
+class _Missing(enum.Enum):
+ Missing = enum.auto()
+
+
+Missing = _Missing.Missing
+MissingOr = Union[_T, Literal[_Missing.Missing]]
import os
+import pathlib
import shutil
from sqlalchemy import testing
class MypyPluginTest(fixtures.MypyTest):
@testing.combinations(
- *[(pathname) for pathname in _incremental_dirs()],
+ *[
+ (pathlib.Path(pathname).name, pathname)
+ for pathname in _incremental_dirs()
+ ],
argnames="pathname",
+ id_="ia",
)
@testing.requires.patch_library
def test_incremental(self, mypy_runner, per_func_cachedir, pathname):
[testenv:pep484]
deps=
greenlet != 0.4.17
- mypy >= 1.7.0,<1.11.0 # temporary, REMOVE upper bound
+ mypy >= 1.7.0
types-greenlet
commands =
mypy {env:MYPY_COLOR} ./lib/sqlalchemy