From: Mike Bayer Date: Sun, 7 Sep 2025 00:20:00 +0000 (-0400) Subject: doc updates, localize test fixtures X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=cf3880fc8544dd1f4ae457d302c67e908f96308c;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git doc updates, localize test fixtures testing with types is inherently awkward and subject to changes in python interpreters (such as all the recent python 3.14 stuff we had them fix), but in this suite we already have a lot of types that are defined inline inside of test methods. so since that's how many of the tests work anyway, organize the big series of pep-695 and pep-593 structures into fixtures or individual tests to make the whole suite easier to follow. pyright complains quite a lot about this, so if this becomes a bigger issue for say mypy /pep484 target, we may have to revisit (which I'd likely do with more ignores) or if function/method-local type declarations with global becomes a runtime issue in py3.15 or something, we can revisit then where we would in theory need to convert the entire suite, which I'd do with a more consistent naming style for everything. but for now try to go with fixtures / local type declarations so that we dont have to wonder where all these types are used. For 2.0 this further repairs some merge mismatches between 2.1 and 2.0, with tests lining up more closely to where they were placed in 2.1. Change-Id: Ibe8f447eaa10f5e927b1122c8b608f11a5f5bc97 (cherry picked from commit 8f7138326cf48a1394417aa492ad083b3400c529) --- diff --git a/doc/build/changelog/unreleased_20/12829.rst b/doc/build/changelog/unreleased_20/12829.rst index f307545c3a..5dd8d3e9d4 100644 --- a/doc/build/changelog/unreleased_20/12829.rst +++ b/doc/build/changelog/unreleased_20/12829.rst @@ -4,19 +4,20 @@ The way ORM Annotated Declarative interprets Python :pep:`695` type aliases in ``Mapped[]`` annotations has been refined to expand the lookup scheme. A - PEP 695 type can now be resolved based on either its direct presence in + :pep:`695` type can now be resolved based on either its direct presence in :paramref:`_orm.registry.type_annotation_map` or its immediate resolved - value, as long as a recursive lookup across multiple pep-695 types is not - required for it to resolve. This change reverses part of the restrictions - introduced in 2.0.37 as part of :ticket:`11955`, which deprecated (and - disallowed in 2.1) the ability to resolve any PEP 695 type that was not - explicitly present in :paramref:`_orm.registry.type_annotation_map`. - Recursive lookups of PEP 695 types remains deprecated in 2.0 and disallowed - in version 2.1, as do implicit lookups of ``NewType`` types without an - entry in :paramref:`_orm.registry.type_annotation_map`. + value, as long as a recursive lookup across multiple :pep:`695` types is + not required for it to resolve. This change reverses part of the + restrictions introduced in 2.0.37 as part of :ticket:`11955`, which + deprecated (and disallowed in 2.1) the ability to resolve any :pep:`695` + type that was not explicitly present in + :paramref:`_orm.registry.type_annotation_map`. Recursive lookups of + :pep:`695` types remains deprecated in 2.0 and disallowed in version 2.1, + as do implicit lookups of ``NewType`` types without an entry in + :paramref:`_orm.registry.type_annotation_map`. - Additionally, new support has been added for generic PEP 695 aliases that - refer to PEP 593 ``Annotated`` constructs containing + Additionally, new support has been added for generic :pep:`695` aliases that + refer to :pep:`593` ``Annotated`` constructs containing :func:`_orm.mapped_column` configurations. See the sections below for examples. diff --git a/doc/build/orm/declarative_tables.rst b/doc/build/orm/declarative_tables.rst index 5761d8ab29..0dd2a01a8f 100644 --- a/doc/build/orm/declarative_tables.rst +++ b/doc/build/orm/declarative_tables.rst @@ -1229,7 +1229,7 @@ adding a ``FOREIGN KEY`` constraint as well as substituting .. _orm_declarative_mapped_column_generic_pep593: Mapping Whole Column Declarations to Generic Python Types -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Using the ``Annotated`` approach from the previous section, we may also create a generic version that will apply particular :func:`_orm.mapped_column` @@ -1266,12 +1266,13 @@ The above type can now apply ``primary_key=True`` to any Python type:: # will create a UUID primary key id: Mapped[PrimaryKey[uuid.UUID]] -The type alias may also be defined equivalently using the pep-695 ``type`` -keyword in Python 3.12 or above:: +For a more shorthand approach, we may opt to use the :pep:`695` ``type`` +keyword (Python 3.12 or above) which allows us to skip having to define a +``TypeVar`` variable:: type PrimaryKey[T] = Annotated[T, mapped_column(primary_key=True)] -.. versionadded:: 2.0.44 Generic pep-695 types may be used with pep-593 +.. versionadded:: 2.0.44 Generic :pep:`695` types may be used with :pep:`593` ``Annotated`` elements to create generic types that automatically deliver :func:`_orm.mapped_column` arguments. diff --git a/test/orm/declarative/test_tm_future_annotations_sync.py b/test/orm/declarative/test_tm_future_annotations_sync.py index e4b4435e9e..b62a4667d7 100644 --- a/test/orm/declarative/test_tm_future_annotations_sync.py +++ b/test/orm/declarative/test_tm_future_annotations_sync.py @@ -33,7 +33,6 @@ import uuid import typing_extensions from typing_extensions import get_args as get_args from typing_extensions import Literal as Literal -from typing_extensions import TypeAlias as TypeAlias from typing_extensions import TypeAliasType from typing_extensions import TypedDict @@ -105,81 +104,82 @@ from sqlalchemy.testing.fixtures import fixture_session from sqlalchemy.util import compat from sqlalchemy.util.typing import Annotated -TV = typing.TypeVar("TV") + +# try to differentiate between typing_extensions.TypeAliasType +# and typing.TypeAliasType +TypingTypeAliasType = getattr(typing, "TypeAliasType", TypeAliasType) -class _SomeDict1(TypedDict): - type: Literal["1"] +@testing.fixture +def pep_695_types(): + global TV + global _UnionPep695, _TypingGenericPep695, _TypingStrPep695 + global _TypingGenericPep695, _TypingGenericPep695Typed + global _TypingLiteral695 + global _Literal695, _RecursiveLiteral695 + global _StrPep695, _GenericPep695, _GenericPep695Typed, _GenericPep695Typed + TV = typing.TypeVar("TV") -class _SomeDict2(TypedDict): - type: Literal["2"] + _StrPep695 = TypeAliasType("_StrPep695", str) # type: ignore + _GenericPep695 = TypeAliasType( # type: ignore + "_GenericPep695", List[TV], type_params=(TV,) + ) + _GenericPep695Typed = _GenericPep695[int] + class _SomeDict1(TypedDict): + type: Literal["1"] -_UnionTypeAlias: TypeAlias = Union[_SomeDict1, _SomeDict2] + class _SomeDict2(TypedDict): + type: Literal["2"] -_StrTypeAlias: TypeAlias = str + _Literal695 = TypeAliasType( # type: ignore + "_Literal695", Literal["to-do", "in-progress", "done"] + ) + _RecursiveLiteral695 = TypeAliasType( # type: ignore + "_RecursiveLiteral695", _Literal695 + ) + _UnionPep695 = TypeAliasType( # type: ignore + "_UnionPep695", Union[_SomeDict1, _SomeDict2] + ) + _TypingStrPep695 = TypingTypeAliasType( # type: ignore + "_TypingStrPep695", str + ) -if compat.py38: - _TypingLiteral = typing.Literal["a", "b"] -_TypingExtensionsLiteral = typing_extensions.Literal["a", "b"] + _TypingGenericPep695 = TypingTypeAliasType( # type: ignore + "_TypingGenericPep695", List[TV], type_params=(TV,) # type: ignore + ) -_JsonPrimitive: TypeAlias = Union[str, int, float, bool, None] -_JsonObject: TypeAlias = Dict[str, "_Json"] -_JsonArray: TypeAlias = List["_Json"] -_Json: TypeAlias = Union[_JsonObject, _JsonArray, _JsonPrimitive] + _TypingGenericPep695Typed = _TypingGenericPep695[int] # type: ignore -if compat.py310: - _JsonPrimitivePep604: TypeAlias = str | int | float | bool | None - _JsonObjectPep604: TypeAlias = dict[str, "_JsonPep604"] - _JsonArrayPep604: TypeAlias = list["_JsonPep604"] - _JsonPep604: TypeAlias = ( - _JsonObjectPep604 | _JsonArrayPep604 | _JsonPrimitivePep604 + _TypingLiteral695 = TypingTypeAliasType( # type: ignore + "_TypingLiteral695", Literal["to-do", "in-progress", "done"] ) - _JsonPep695 = TypeAliasType("_JsonPep695", _JsonPep604) -TypingTypeAliasType = getattr(typing, "TypeAliasType", TypeAliasType) -_StrPep695 = TypeAliasType("_StrPep695", str) -_TypingStrPep695 = TypingTypeAliasType("_TypingStrPep695", str) -_GenericPep695 = TypeAliasType("_GenericPep695", List[TV], type_params=(TV,)) -_TypingGenericPep695 = TypingTypeAliasType( - "_TypingGenericPep695", List[TV], type_params=(TV,) -) -_GenericPep695Typed = _GenericPep695[int] -_TypingGenericPep695Typed = _TypingGenericPep695[int] -_UnionPep695 = TypeAliasType("_UnionPep695", Union[_SomeDict1, _SomeDict2]) -strtypalias_keyword = TypeAliasType( - "strtypalias_keyword", Annotated[str, mapped_column(info={"hi": "there"})] -) -if compat.py310: - strtypalias_keyword_nested = TypeAliasType( - "strtypalias_keyword_nested", - int | Annotated[str, mapped_column(info={"hi": "there"})], +@testing.fixture +def pep_593_types(pep_695_types): + global _GenericPep593TypeAlias, _GenericPep593Pep695 + global _RecursivePep695Pep593 + + _GenericPep593TypeAlias = Annotated[ + TV, mapped_column(info={"hi": "there"}) # type: ignore + ] + + _GenericPep593Pep695 = TypingTypeAliasType( # type: ignore + "_GenericPep593Pep695", + Annotated[TV, mapped_column(info={"hi": "there"})], # type: ignore + type_params=(TV,), + ) + + _RecursivePep695Pep593 = TypingTypeAliasType( # type: ignore + "_RecursivePep695Pep593", + Annotated[ + _TypingStrPep695, # type: ignore + mapped_column(info={"hi": "there"}), + ], ) -strtypalias_ta: TypeAlias = Annotated[str, mapped_column(info={"hi": "there"})] -strtypalias_plain = Annotated[str, mapped_column(info={"hi": "there"})] -_Literal695 = TypeAliasType( - "_Literal695", Literal["to-do", "in-progress", "done"] -) -_TypingLiteral695 = TypingTypeAliasType( - "_TypingLiteral695", Literal["to-do", "in-progress", "done"] -) -_RecursiveLiteral695 = TypeAliasType("_RecursiveLiteral695", _Literal695) - -_GenericPep593TypeAlias = Annotated[TV, mapped_column(info={"hi": "there"})] - -_GenericPep593Pep695 = TypingTypeAliasType( - "_GenericPep593Pep695", - Annotated[TV, mapped_column(info={"hi": "there"})], - type_params=(TV,), -) - -_RecursivePep695Pep593 = TypingTypeAliasType( - "_RecursivePep695Pep593", - Annotated[_TypingStrPep695, mapped_column(info={"hi": "there"})], -) def expect_annotation_syntax_error(name): @@ -223,15 +223,6 @@ class DeclarativeBaseTest(fixtures.TestBase): Tab.non_existent -_annotated_names_tested = set() - - -def annotated_name_test_cases(*cases, **kw): - _annotated_names_tested.update([case[0] for case in cases]) - - return testing.combinations_list(cases, **kw) - - class MappedColumnTest(fixtures.TestBase, testing.AssertsCompiledSQL): __dialect__ = "default" @@ -636,7 +627,7 @@ class MappedColumnTest(fixtures.TestBase, testing.AssertsCompiledSQL): anno_str = Annotated[str, 50] anno_str_optional = Annotated[Optional[str], 30] - newtype_str = NewType("MyType", str) + newtype_str = NewType("newtype_str", str) anno_str_mc = Annotated[str, mapped_column()] anno_str_optional_mc = Annotated[Optional[str], mapped_column()] @@ -775,6 +766,11 @@ class MappedColumnTest(fixtures.TestBase, testing.AssertsCompiledSQL): def test_typing_literal_identity(self, decl_base): """See issue #11820""" + global _TypingLiteral, _TypingExtensionsLiteral + + _TypingLiteral = typing.Literal["a", "b"] + _TypingExtensionsLiteral = typing_extensions.Literal["a", "b"] + class Foo(decl_base): __tablename__ = "footable" @@ -787,162 +783,187 @@ class MappedColumnTest(fixtures.TestBase, testing.AssertsCompiledSQL): eq_(col.type.enums, ["a", "b"]) is_(col.type.native_enum, False) - @testing.requires.python310 - def test_we_got_all_attrs_test_annotated(self): - argnames = _py_inspect.getfullargspec(mapped_column) - assert _annotated_names_tested.issuperset(argnames.kwonlyargs), ( - f"annotated attributes were not tested: " - f"{set(argnames.kwonlyargs).difference(_annotated_names_tested)}" - ) - - @annotated_name_test_cases( - ("sort_order", 100, lambda sort_order: sort_order == 100), - ("nullable", False, lambda column: column.nullable is False), - ( - "active_history", - True, - lambda column_property: column_property.active_history is True, - ), - ( - "deferred", - True, - lambda column_property: column_property.deferred is True, - ), - ( - "deferred", - _NoArg.NO_ARG, - lambda column_property: column_property is None, - ), - ( - "deferred_group", - "mygroup", - lambda column_property: column_property.deferred is True - and column_property.group == "mygroup", - ), - ( - "deferred_raiseload", - True, - lambda column_property: column_property.deferred is True - and column_property.raiseload is True, - ), - ( - "server_default", - "25", - lambda column: column.server_default.arg == "25", - ), - ( - "server_onupdate", - "25", - lambda column: column.server_onupdate.arg == "25", - ), - ( - "default", - 25, - lambda column: column.default.arg == 25, - ), - ( - "insert_default", - 25, - lambda column: column.default.arg == 25, - ), - ( - "onupdate", - 25, - lambda column: column.onupdate.arg == 25, - ), - ("doc", "some doc", lambda column: column.doc == "some doc"), - ( - "comment", - "some comment", - lambda column: column.comment == "some comment", - ), - ("index", True, lambda column: column.index is True), - ("index", _NoArg.NO_ARG, lambda column: column.index is None), - ("index", False, lambda column: column.index is False), - ("unique", True, lambda column: column.unique is True), - ("unique", False, lambda column: column.unique is False), - ("autoincrement", True, lambda column: column.autoincrement is True), - ("system", True, lambda column: column.system is True), - ("primary_key", True, lambda column: column.primary_key is True), - ("type_", BIGINT, lambda column: isinstance(column.type, BIGINT)), - ("info", {"foo": "bar"}, lambda column: column.info == {"foo": "bar"}), - ( - "use_existing_column", - True, - lambda mc: mc._use_existing_column is True, - ), - ( - "quote", - True, - exc.SADeprecationWarning( - "Can't use the 'key' or 'name' arguments in Annotated " + @staticmethod + def annotated_name_test_cases(): + return [ + ("sort_order", 100, lambda sort_order: sort_order == 100), + ("nullable", False, lambda column: column.nullable is False), + ( + "active_history", + True, + lambda column_property: column_property.active_history is True, ), - ), - ( - "key", - "mykey", - exc.SADeprecationWarning( - "Can't use the 'key' or 'name' arguments in Annotated " + ( + "deferred", + True, + lambda column_property: column_property.deferred is True, ), - ), - ( - "name", - "mykey", - exc.SADeprecationWarning( - "Can't use the 'key' or 'name' arguments in Annotated " + ( + "deferred", + _NoArg.NO_ARG, + lambda column_property: column_property is None, ), - ), - ( - "kw_only", - True, - exc.SADeprecationWarning( - "Argument 'kw_only' is a dataclass argument " + ( + "deferred_group", + "mygroup", + lambda column_property: column_property.deferred is True + and column_property.group == "mygroup", ), - testing.requires.python310, - ), - ( - "compare", - True, - exc.SADeprecationWarning( - "Argument 'compare' is a dataclass argument " + ( + "deferred_raiseload", + True, + lambda column_property: column_property.deferred is True + and column_property.raiseload is True, ), - testing.requires.python310, - ), - ( - "default_factory", - lambda: 25, - exc.SADeprecationWarning( - "Argument 'default_factory' is a dataclass argument " + ( + "server_default", + "25", + lambda column: column.server_default.arg == "25", ), - ), - ( - "repr", - True, - exc.SADeprecationWarning( - "Argument 'repr' is a dataclass argument " + ( + "server_onupdate", + "25", + lambda column: column.server_onupdate.arg == "25", ), - ), - ( - "init", - True, - exc.SADeprecationWarning( - "Argument 'init' is a dataclass argument" + ( + "default", + 25, + lambda column: column.default.arg == 25, ), - ), - ( - "hash", - True, - exc.SADeprecationWarning( - "Argument 'hash' is a dataclass argument" + ( + "insert_default", + 25, + lambda column: column.default.arg == 25, ), - ), - ( - "dataclass_metadata", - {}, - exc.SADeprecationWarning( - "Argument 'dataclass_metadata' is a dataclass argument" + ( + "onupdate", + 25, + lambda column: column.onupdate.arg == 25, ), - ), + ("doc", "some doc", lambda column: column.doc == "some doc"), + ( + "comment", + "some comment", + lambda column: column.comment == "some comment", + ), + ("index", True, lambda column: column.index is True), + ("index", _NoArg.NO_ARG, lambda column: column.index is None), + ("index", False, lambda column: column.index is False), + ("unique", True, lambda column: column.unique is True), + ("unique", False, lambda column: column.unique is False), + ( + "autoincrement", + True, + lambda column: column.autoincrement is True, + ), + ("system", True, lambda column: column.system is True), + ("primary_key", True, lambda column: column.primary_key is True), + ("type_", BIGINT, lambda column: isinstance(column.type, BIGINT)), + ( + "info", + {"foo": "bar"}, + lambda column: column.info == {"foo": "bar"}, + ), + ( + "use_existing_column", + True, + lambda mc: mc._use_existing_column is True, + ), + ( + "quote", + True, + exc.SADeprecationWarning( + "Can't use the 'key' or 'name' arguments in Annotated " + ), + ), + ( + "key", + "mykey", + exc.SADeprecationWarning( + "Can't use the 'key' or 'name' arguments in Annotated " + ), + ), + ( + "name", + "mykey", + exc.SADeprecationWarning( + "Can't use the 'key' or 'name' arguments in Annotated " + ), + ), + ( + "kw_only", + True, + exc.SADeprecationWarning( + "Argument 'kw_only' is a dataclass argument " + ), + testing.requires.python310, + ), + ( + "compare", + True, + exc.SADeprecationWarning( + "Argument 'compare' is a dataclass argument " + ), + testing.requires.python310, + ), + ( + "default_factory", + lambda: 25, + exc.SADeprecationWarning( + "Argument 'default_factory' is a dataclass argument " + ), + ), + ( + "repr", + True, + exc.SADeprecationWarning( + "Argument 'repr' is a dataclass argument " + ), + ), + ( + "init", + True, + exc.SADeprecationWarning( + "Argument 'init' is a dataclass argument" + ), + ), + ( + "hash", + True, + exc.SADeprecationWarning( + "Argument 'hash' is a dataclass argument" + ), + ), + ( + "dataclass_metadata", + {}, + exc.SADeprecationWarning( + "Argument 'dataclass_metadata' is a dataclass argument" + ), + ), + ] + + if not compat.py310: + annotated_name_test_cases = annotated_name_test_cases.__func__ + static_annotated_name_test_cases = staticmethod( + annotated_name_test_cases + ) + else: + static_annotated_name_test_cases = annotated_name_test_cases + + def test_we_got_all_attrs_test_annotated(self): + argnames = _py_inspect.getfullargspec(mapped_column) + _annotated_names_tested = { + case[0] for case in self.static_annotated_name_test_cases() + } + assert _annotated_names_tested.issuperset(argnames.kwonlyargs), ( + f"annotated attributes were not tested: " + f"{set(argnames.kwonlyargs).difference(_annotated_names_tested)}" + ) + + @testing.requires.python310 + @testing.combinations_list( + annotated_name_test_cases(), argnames="argname, argument, assertion", ) @testing.variation("use_annotated", [True, False, "control"]) @@ -1060,369 +1081,106 @@ class MappedColumnTest(fixtures.TestBase, testing.AssertsCompiledSQL): else: is_(result, orig) - @testing.variation( - "union", - [ - "union", - ("pep604", requires.python310), - "union_null", - ("pep604_null", requires.python310), - ], - ) - def test_unions(self, union): - global UnionType - our_type = Numeric(10, 2) + @testing.variation("optional", [True, False]) + @testing.variation("provide_type", [True, False]) + @testing.variation("add_to_type_map", [True, False]) + def test_recursive_type( + self, decl_base, optional, provide_type, add_to_type_map + ): + """test #9553""" + + global T + + T = Dict[str, Optional["T"]] + + if not provide_type and not add_to_type_map: + with expect_raises_message( + sa_exc.ArgumentError, + r"Could not locate SQLAlchemy.*" r".*ForwardRef\('T'\).*", + ): + + class TypeTest(decl_base): + __tablename__ = "my_table" + + id: Mapped[int] = mapped_column(primary_key=True) + if optional: + type_test: Mapped[Optional[T]] = mapped_column() + else: + type_test: Mapped[T] = mapped_column() + + return - if union.union: - UnionType = Union[float, Decimal] - elif union.union_null: - UnionType = Union[float, Decimal, None] - elif union.pep604: - UnionType = float | Decimal - elif union.pep604_null: - UnionType = float | Decimal | None else: - union.fail() + if add_to_type_map: + decl_base.registry.update_type_annotation_map({T: JSON()}) - class Base(DeclarativeBase): - type_annotation_map = {UnionType: our_type} + class TypeTest(decl_base): + __tablename__ = "my_table" - class User(Base): + id: Mapped[int] = mapped_column(primary_key=True) + + if add_to_type_map: + if optional: + type_test: Mapped[Optional[T]] = mapped_column() + else: + type_test: Mapped[T] = mapped_column() + else: + if optional: + type_test: Mapped[Optional[T]] = mapped_column(JSON()) + else: + type_test: Mapped[T] = mapped_column(JSON()) + + if optional: + is_(TypeTest.__table__.c.type_test.nullable, True) + else: + is_(TypeTest.__table__.c.type_test.nullable, False) + + self.assert_compile( + select(TypeTest), + "SELECT my_table.id, my_table.type_test FROM my_table", + ) + + def test_missing_mapped_lhs(self, decl_base): + with expect_annotation_syntax_error("User.name"): + + class User(decl_base): + __tablename__ = "users" + + id: Mapped[int] = mapped_column(primary_key=True) + name: str = mapped_column() # type: ignore + + def test_construct_lhs_separate_name(self, decl_base): + class User(decl_base): __tablename__ = "users" id: Mapped[int] = mapped_column(primary_key=True) + name: Mapped[str] = mapped_column() + data: Mapped[Optional[str]] = mapped_column("the_data") - data: Mapped[Union[float, Decimal]] - reverse_data: Mapped[Union[Decimal, float]] + self.assert_compile( + select(User.data), "SELECT users.the_data FROM users" + ) + is_true(User.__table__.c.the_data.nullable) - optional_data: Mapped[Optional[Union[float, Decimal]]] = ( - mapped_column() - ) + def test_construct_works_in_expr(self, decl_base): + class User(decl_base): + __tablename__ = "users" - # use Optional directly - reverse_optional_data: Mapped[Optional[Union[Decimal, float]]] = ( - mapped_column() - ) + id: Mapped[int] = mapped_column(primary_key=True) - # use Union with None, same as Optional but presents differently - # (Optional object with __origin__ Union vs. Union) - reverse_u_optional_data: Mapped[Union[Decimal, float, None]] = ( - mapped_column() - ) + class Address(decl_base): + __tablename__ = "addresses" - refer_union: Mapped[UnionType] - refer_union_optional: Mapped[Optional[UnionType]] + id: Mapped[int] = mapped_column(primary_key=True) + user_id: Mapped[int] = mapped_column(ForeignKey("users.id")) - # py38, 37 does not automatically flatten unions, add extra tests - # for this. maintain these in order to catch future regressions - # in the behavior of ``Union`` - unflat_union_optional_data: Mapped[ - Union[Union[Decimal, float, None], None] - ] = mapped_column() + user = relationship(User, primaryjoin=user_id == User.id) - float_data: Mapped[float] = mapped_column() - decimal_data: Mapped[Decimal] = mapped_column() - - if compat.py310: - pep604_data: Mapped[float | Decimal] = mapped_column() - pep604_reverse: Mapped[Decimal | float] = mapped_column() - pep604_optional: Mapped[Decimal | float | None] = ( - mapped_column() - ) - pep604_data_fwd: Mapped["float | Decimal"] = mapped_column() - pep604_reverse_fwd: Mapped["Decimal | float"] = mapped_column() - pep604_optional_fwd: Mapped["Decimal | float | None"] = ( - mapped_column() - ) - - info = [ - ("data", False), - ("reverse_data", False), - ("optional_data", True), - ("reverse_optional_data", True), - ("reverse_u_optional_data", True), - ("refer_union", "null" in union.name), - ("refer_union_optional", True), - ("unflat_union_optional_data", True), - ] - if compat.py310: - info += [ - ("pep604_data", False), - ("pep604_reverse", False), - ("pep604_optional", True), - ("pep604_data_fwd", False), - ("pep604_reverse_fwd", False), - ("pep604_optional_fwd", True), - ] - - for name, nullable in info: - col = User.__table__.c[name] - is_(col.type, our_type, name) - is_(col.nullable, nullable, name) - - is_true(isinstance(User.__table__.c.float_data.type, Float)) - ne_(User.__table__.c.float_data.type, our_type) - - is_true(isinstance(User.__table__.c.decimal_data.type, Numeric)) - ne_(User.__table__.c.decimal_data.type, our_type) - - @testing.variation( - "union", - [ - "union", - ("pep604", requires.python310), - ("pep695", requires.python312), - ], - ) - def test_optional_in_annotation_map(self, union): - """See issue #11370""" - - class Base(DeclarativeBase): - if union.union: - type_annotation_map = {_Json: JSON} - elif union.pep604: - type_annotation_map = {_JsonPep604: JSON} - elif union.pep695: - type_annotation_map = {_JsonPep695: JSON} # noqa: F821 - else: - union.fail() - - class A(Base): - __tablename__ = "a" - - id: Mapped[int] = mapped_column(primary_key=True) - if union.union: - json1: Mapped[_Json] - json2: Mapped[_Json] = mapped_column(nullable=False) - elif union.pep604: - json1: Mapped[_JsonPep604] - json2: Mapped[_JsonPep604] = mapped_column(nullable=False) - elif union.pep695: - json1: Mapped[_JsonPep695] # noqa: F821 - json2: Mapped[_JsonPep695] = mapped_column( # noqa: F821 - nullable=False - ) - else: - union.fail() - - is_(A.__table__.c.json1.type._type_affinity, JSON) - is_(A.__table__.c.json2.type._type_affinity, JSON) - is_true(A.__table__.c.json1.nullable) - is_false(A.__table__.c.json2.nullable) - - @testing.variation( - "option", - [ - "not_optional", - "optional", - "optional_fwd_ref", - "union_none", - ("pep604", testing.requires.python310), - ("pep604_fwd_ref", testing.requires.python310), - ], - ) - @testing.variation("brackets", ["oneset", "twosets"]) - @testing.combinations( - "include_mc_type", "derive_from_anno", argnames="include_mc_type" - ) - def test_optional_styles_nested_brackets( - self, option, brackets, include_mc_type - ): - """composed types test, includes tests that were added later for - #12207""" - - class Base(DeclarativeBase): - if testing.requires.python310.enabled: - type_annotation_map = { - Dict[str, Decimal]: JSON, - dict[str, Decimal]: JSON, - Union[List[int], List[str]]: JSON, - list[int] | list[str]: JSON, - } - else: - type_annotation_map = { - Dict[str, Decimal]: JSON, - Union[List[int], List[str]]: JSON, - } - - if include_mc_type == "include_mc_type": - mc = mapped_column(JSON) - mc2 = mapped_column(JSON) - else: - mc = mapped_column() - mc2 = mapped_column() - - class A(Base): - __tablename__ = "a" - - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[str] = mapped_column() - - if brackets.oneset: - if option.not_optional: - json: Mapped[Dict[str, Decimal]] = mapped_column() # type: ignore # noqa: E501 - if testing.requires.python310.enabled: - json2: Mapped[dict[str, Decimal]] = mapped_column() # type: ignore # noqa: E501 - elif option.optional: - json: Mapped[Optional[Dict[str, Decimal]]] = mc - if testing.requires.python310.enabled: - json2: Mapped[Optional[dict[str, Decimal]]] = mc2 - elif option.optional_fwd_ref: - json: Mapped["Optional[Dict[str, Decimal]]"] = mc - if testing.requires.python310.enabled: - json2: Mapped["Optional[dict[str, Decimal]]"] = mc2 - elif option.union_none: - json: Mapped[Union[Dict[str, Decimal], None]] = mc - json2: Mapped[Union[None, Dict[str, Decimal]]] = mc2 - elif option.pep604: - json: Mapped[dict[str, Decimal] | None] = mc - if testing.requires.python310.enabled: - json2: Mapped[None | dict[str, Decimal]] = mc2 - elif option.pep604_fwd_ref: - json: Mapped["dict[str, Decimal] | None"] = mc - if testing.requires.python310.enabled: - json2: Mapped["None | dict[str, Decimal]"] = mc2 - elif brackets.twosets: - if option.not_optional: - json: Mapped[Union[List[int], List[str]]] = mapped_column() # type: ignore # noqa: E501 - elif option.optional: - json: Mapped[Optional[Union[List[int], List[str]]]] = mc - if testing.requires.python310.enabled: - json2: Mapped[ - Optional[Union[list[int], list[str]]] - ] = mc2 - elif option.optional_fwd_ref: - json: Mapped["Optional[Union[List[int], List[str]]]"] = mc - if testing.requires.python310.enabled: - json2: Mapped[ - "Optional[Union[list[int], list[str]]]" - ] = mc2 - elif option.union_none: - json: Mapped[Union[List[int], List[str], None]] = mc - if testing.requires.python310.enabled: - json2: Mapped[Union[None, list[int], list[str]]] = mc2 - elif option.pep604: - json: Mapped[list[int] | list[str] | None] = mc - json2: Mapped[None | list[int] | list[str]] = mc2 - elif option.pep604_fwd_ref: - json: Mapped["list[int] | list[str] | None"] = mc - json2: Mapped["None | list[int] | list[str]"] = mc2 - else: - brackets.fail() - - is_(A.__table__.c.json.type._type_affinity, JSON) - if hasattr(A, "json2"): - is_(A.__table__.c.json2.type._type_affinity, JSON) - if option.not_optional: - is_false(A.__table__.c.json2.nullable) - else: - is_true(A.__table__.c.json2.nullable) - - if option.not_optional: - is_false(A.__table__.c.json.nullable) - else: - is_true(A.__table__.c.json.nullable) - - @testing.variation("optional", [True, False]) - @testing.variation("provide_type", [True, False]) - @testing.variation("add_to_type_map", [True, False]) - def test_recursive_type( - self, decl_base, optional, provide_type, add_to_type_map - ): - """test #9553""" - - global T - - T = Dict[str, Optional["T"]] - - if not provide_type and not add_to_type_map: - with expect_raises_message( - sa_exc.ArgumentError, - r"Could not locate SQLAlchemy.*" r".*ForwardRef\('T'\).*", - ): - - class TypeTest(decl_base): - __tablename__ = "my_table" - - id: Mapped[int] = mapped_column(primary_key=True) - if optional: - type_test: Mapped[Optional[T]] = mapped_column() - else: - type_test: Mapped[T] = mapped_column() - - return - - else: - if add_to_type_map: - decl_base.registry.update_type_annotation_map({T: JSON()}) - - class TypeTest(decl_base): - __tablename__ = "my_table" - - id: Mapped[int] = mapped_column(primary_key=True) - - if add_to_type_map: - if optional: - type_test: Mapped[Optional[T]] = mapped_column() - else: - type_test: Mapped[T] = mapped_column() - else: - if optional: - type_test: Mapped[Optional[T]] = mapped_column(JSON()) - else: - type_test: Mapped[T] = mapped_column(JSON()) - - if optional: - is_(TypeTest.__table__.c.type_test.nullable, True) - else: - is_(TypeTest.__table__.c.type_test.nullable, False) - - self.assert_compile( - select(TypeTest), - "SELECT my_table.id, my_table.type_test FROM my_table", - ) - - def test_missing_mapped_lhs(self, decl_base): - with expect_annotation_syntax_error("User.name"): - - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - name: str = mapped_column() # type: ignore - - def test_construct_lhs_separate_name(self, decl_base): - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - name: Mapped[str] = mapped_column() - data: Mapped[Optional[str]] = mapped_column("the_data") - - self.assert_compile( - select(User.data), "SELECT users.the_data FROM users" - ) - is_true(User.__table__.c.the_data.nullable) - - def test_construct_works_in_expr(self, decl_base): - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - - class Address(decl_base): - __tablename__ = "addresses" - - id: Mapped[int] = mapped_column(primary_key=True) - user_id: Mapped[int] = mapped_column(ForeignKey("users.id")) - - user = relationship(User, primaryjoin=user_id == User.id) - - self.assert_compile( - select(Address.user_id, User.id).join(Address.user), - "SELECT addresses.user_id, users.id FROM addresses " - "JOIN users ON addresses.user_id = users.id", - ) + self.assert_compile( + select(Address.user_id, User.id).join(Address.user), + "SELECT addresses.user_id, users.id FROM addresses " + "JOIN users ON addresses.user_id = users.id", + ) def test_construct_works_as_polymorphic_on(self, decl_base): class User(decl_base): @@ -1551,6 +1309,21 @@ class Pep593InterpretationTests(fixtures.TestBase, testing.AssertsCompiledSQL): self, decl_base: Type[DeclarativeBase], alias_type ): """test #11130""" + + global strtypalias_keyword, strtypalias_keyword_nested + global strtypalias_ta, strtypalias_plain + + strtypalias_keyword = TypeAliasType( + "strtypalias_keyword", + Annotated[str, mapped_column(info={"hi": "there"})], + ) + strtypalias_keyword_nested = TypeAliasType( + "strtypalias_keyword_nested", + int | Annotated[str, mapped_column(info={"hi": "there"})], + ) + strtypalias_ta = Annotated[str, mapped_column(info={"hi": "there"})] + strtypalias_plain = Annotated[str, mapped_column(info={"hi": "there"})] + if alias_type.typekeyword: decl_base.registry.update_type_annotation_map( {strtypalias_keyword: VARCHAR(33)} # noqa: F821 @@ -1594,60 +1367,28 @@ class Pep593InterpretationTests(fixtures.TestBase, testing.AssertsCompiledSQL): @testing.requires.python312 def test_no_recursive_pep593_from_pep695( - self, decl_base: Type[DeclarativeBase] + self, decl_base: Type[DeclarativeBase], pep_593_types ): + def declare(): class MyClass(decl_base): __tablename__ = "my_table" id: Mapped[int] = mapped_column(primary_key=True) - data_one: Mapped[_RecursivePep695Pep593] # noqa: F821 - - with expect_raises_message( - orm_exc.MappedAnnotationError, - r"Could not locate SQLAlchemy Core type when resolving for Python " - r"type " - r"indicated by '_RecursivePep695Pep593' inside the Mapped\[\] " - r"annotation for the 'data_one' attribute; none of " - r"'_RecursivePep695Pep593', " - r"'typing.Annotated\[_TypingStrPep695, .*\]', '_TypingStrPep695' " - r"are resolvable by the registry", - ): - declare() - - @testing.variation("in_map", [True, False]) - @testing.variation("alias_type", ["plain", "pep695"]) - @testing.requires.python312 - def test_generic_typealias_pep593( - self, decl_base: Type[DeclarativeBase], alias_type: Variation, in_map - ): - - if in_map: - decl_base.registry.update_type_annotation_map( - { - _GenericPep593TypeAlias[str]: VARCHAR(33), - _GenericPep593Pep695[str]: VARCHAR(33), - } - ) - - class MyClass(decl_base): - __tablename__ = "my_table" - - id: Mapped[int] = mapped_column(primary_key=True) - - if alias_type.plain: - data_one: Mapped[_GenericPep593TypeAlias[str]] # noqa: F821 - elif alias_type.pep695: - data_one: Mapped[_GenericPep593Pep695[str]] # noqa: F821 - else: - alias_type.fail() - - eq_(MyClass.data_one.expression.info, {"hi": "there"}) - if in_map: - eq_(MyClass.data_one.expression.type.length, 33) - else: - eq_(MyClass.data_one.expression.type.length, None) + data_one: Mapped[_RecursivePep695Pep593] # noqa: F821 + + with expect_raises_message( + orm_exc.MappedAnnotationError, + r"Could not locate SQLAlchemy Core type when resolving for Python " + r"type " + r"indicated by '_RecursivePep695Pep593' inside the Mapped\[\] " + r"annotation for the 'data_one' attribute; none of " + r"'_RecursivePep695Pep593', " + r"'typing.Annotated\[_TypingStrPep695, .*\]', '_TypingStrPep695' " + r"are resolvable by the registry", + ): + declare() def test_extract_base_type_from_pep593( self, decl_base: Type[DeclarativeBase] @@ -2174,6 +1915,43 @@ class Pep593InterpretationTests(fixtures.TestBase, testing.AssertsCompiledSQL): eq_(A_1.label.property.columns[0].table, A.__table__) eq_(A_2.label.property.columns[0].table, A.__table__) + @testing.variation("in_map", [True, False]) + @testing.variation("alias_type", ["plain", "pep695"]) + @testing.requires.python312 + def test_generic_typealias_pep593( + self, + decl_base: Type[DeclarativeBase], + alias_type: Variation, + in_map, + pep_593_types, + ): + + if in_map: + decl_base.registry.update_type_annotation_map( + { + _GenericPep593TypeAlias[str]: VARCHAR(33), + _GenericPep593Pep695[str]: VARCHAR(33), + } + ) + + class MyClass(decl_base): + __tablename__ = "my_table" + + id: Mapped[int] = mapped_column(primary_key=True) + + if alias_type.plain: + data_one: Mapped[_GenericPep593TypeAlias[str]] # noqa: F821 + elif alias_type.pep695: + data_one: Mapped[_GenericPep593Pep695[str]] # noqa: F821 + else: + alias_type.fail() + + eq_(MyClass.data_one.expression.info, {"hi": "there"}) + if in_map: + eq_(MyClass.data_one.expression.type.length, 33) + else: + eq_(MyClass.data_one.expression.type.length, None) + class TypeResolutionTests(fixtures.TestBase, testing.AssertsCompiledSQL): __dialect__ = "default" @@ -2563,6 +2341,18 @@ class TypeResolutionTests(fixtures.TestBase, testing.AssertsCompiledSQL): def test_plain_typealias_as_typemap_keys( self, decl_base: Type[DeclarativeBase] ): + + global _StrTypeAlias, _UnionTypeAlias + + class _SomeDict1(TypedDict): + type: Literal["1"] + + class _SomeDict2(TypedDict): + type: Literal["2"] + + _StrTypeAlias = str + _UnionTypeAlias = Union[_SomeDict1, _SomeDict2] + decl_base.registry.update_type_annotation_map( {_UnionTypeAlias: JSON, _StrTypeAlias: String(30)} ) @@ -2671,117 +2461,454 @@ class TypeResolutionTests(fixtures.TestBase, testing.AssertsCompiledSQL): return @testing.variation( - "type_", + "type_", + [ + "str_extension", + "str_typing", + "generic_extension", + "generic_typing", + "generic_typed_extension", + "generic_typed_typing", + ], + ) + @testing.requires.python312 + def test_pep695_typealias_as_typemap_keys( + self, decl_base: Type[DeclarativeBase], type_, pep_695_types + ): + """test #10807, #12829""" + + decl_base.registry.update_type_annotation_map( + { + _UnionPep695: JSON, + _StrPep695: String(30), + _TypingStrPep695: String(30), + _GenericPep695: String(30), + _TypingGenericPep695: String(30), + _GenericPep695Typed: String(30), + _TypingGenericPep695Typed: String(30), + } + ) + + class Test(decl_base): + __tablename__ = "test" + id: Mapped[int] = mapped_column(primary_key=True) + if type_.str_extension: + data: Mapped[_StrPep695] + elif type_.str_typing: + data: Mapped[_TypingStrPep695] + elif type_.generic_extension: + data: Mapped[_GenericPep695] + elif type_.generic_typing: + data: Mapped[_TypingGenericPep695] + elif type_.generic_typed_extension: + data: Mapped[_GenericPep695Typed] + elif type_.generic_typed_typing: + data: Mapped[_TypingGenericPep695Typed] + else: + type_.fail() + structure: Mapped[_UnionPep695] + + eq_(Test.__table__.c.data.type._type_affinity, String) + eq_(Test.__table__.c.data.type.length, 30) + is_(Test.__table__.c.structure.type._type_affinity, JSON) + + def test_pep484_newtypes_as_typemap_keys( + self, decl_base: Type[DeclarativeBase] + ): + global str50, str30, str3050 + + str50 = NewType("str50", str) + str30 = NewType("str30", str) + str3050 = NewType("str30", str50) + + decl_base.registry.update_type_annotation_map( + {str50: String(50), str30: String(30), str3050: String(150)} + ) + + class MyClass(decl_base): + __tablename__ = "my_table" + + id: Mapped[str50] = mapped_column(primary_key=True) + data_one: Mapped[str30] + data_two: Mapped[str50] + data_three: Mapped[Optional[str30]] + data_four: Mapped[str3050] + + eq_(MyClass.__table__.c.data_one.type.length, 30) + is_false(MyClass.__table__.c.data_one.nullable) + + eq_(MyClass.__table__.c.data_two.type.length, 50) + is_false(MyClass.__table__.c.data_two.nullable) + + eq_(MyClass.__table__.c.data_three.type.length, 30) + is_true(MyClass.__table__.c.data_three.nullable) + + eq_(MyClass.__table__.c.data_four.type.length, 150) + is_false(MyClass.__table__.c.data_four.nullable) + + def test_newtype_missing_from_map(self, decl_base): + global str50 + + str50 = NewType("str50", str) + + if compat.py310: + text = ".*str50" + else: + # NewTypes before 3.10 had a very bad repr + # .new_type at 0x...> + text = ".*NewType.*" + + with expect_deprecated( + f"Matching the provided NewType '{text}' on its " + "resolved value without matching it in the " + "type_annotation_map is deprecated; add this type to the " + "type_annotation_map to allow it to match explicitly.", + ): + + class MyClass(decl_base): + __tablename__ = "my_table" + + id: Mapped[int] = mapped_column(primary_key=True) + data_one: Mapped[str50] + + is_true(isinstance(MyClass.data_one.type, String)) + + @testing.variation( + "union", + [ + "union", + ("pep604", requires.python310), + "union_null", + ("pep604_null", requires.python310), + ], + ) + def test_unions(self, union): + global UnionType + our_type = Numeric(10, 2) + + if union.union: + UnionType = Union[float, Decimal] + elif union.union_null: + UnionType = Union[float, Decimal, None] + elif union.pep604: + UnionType = float | Decimal + elif union.pep604_null: + UnionType = float | Decimal | None + else: + union.fail() + + class Base(DeclarativeBase): + type_annotation_map = {UnionType: our_type} + + class User(Base): + __tablename__ = "users" + + id: Mapped[int] = mapped_column(primary_key=True) + + data: Mapped[Union[float, Decimal]] + reverse_data: Mapped[Union[Decimal, float]] + + optional_data: Mapped[Optional[Union[float, Decimal]]] = ( + mapped_column() + ) + + # use Optional directly + reverse_optional_data: Mapped[Optional[Union[Decimal, float]]] = ( + mapped_column() + ) + + # use Union with None, same as Optional but presents differently + # (Optional object with __origin__ Union vs. Union) + reverse_u_optional_data: Mapped[Union[Decimal, float, None]] = ( + mapped_column() + ) + + refer_union: Mapped[UnionType] + refer_union_optional: Mapped[Optional[UnionType]] + + # py38, 37 does not automatically flatten unions, add extra tests + # for this. maintain these in order to catch future regressions + # in the behavior of ``Union`` + unflat_union_optional_data: Mapped[ + Union[Union[Decimal, float, None], None] + ] = mapped_column() + + float_data: Mapped[float] = mapped_column() + decimal_data: Mapped[Decimal] = mapped_column() + + if compat.py310: + pep604_data: Mapped[float | Decimal] = mapped_column() + pep604_reverse: Mapped[Decimal | float] = mapped_column() + pep604_optional: Mapped[Decimal | float | None] = ( + mapped_column() + ) + pep604_data_fwd: Mapped["float | Decimal"] = mapped_column() + pep604_reverse_fwd: Mapped["Decimal | float"] = mapped_column() + pep604_optional_fwd: Mapped["Decimal | float | None"] = ( + mapped_column() + ) + + info = [ + ("data", False), + ("reverse_data", False), + ("optional_data", True), + ("reverse_optional_data", True), + ("reverse_u_optional_data", True), + ("refer_union", "null" in union.name), + ("refer_union_optional", True), + ("unflat_union_optional_data", True), + ] + if compat.py310: + info += [ + ("pep604_data", False), + ("pep604_reverse", False), + ("pep604_optional", True), + ("pep604_data_fwd", False), + ("pep604_reverse_fwd", False), + ("pep604_optional_fwd", True), + ] + + for name, nullable in info: + col = User.__table__.c[name] + is_(col.type, our_type, name) + is_(col.nullable, nullable, name) + + is_true(isinstance(User.__table__.c.float_data.type, Float)) + ne_(User.__table__.c.float_data.type, our_type) + + is_true(isinstance(User.__table__.c.decimal_data.type, Numeric)) + ne_(User.__table__.c.decimal_data.type, our_type) + + @testing.variation( + "union", [ - "str_extension", - "str_typing", - "generic_extension", - "generic_typing", - "generic_typed_extension", - "generic_typed_typing", + "union", + ("pep604", requires.python310), + ("pep695", requires.python312), ], ) - @testing.requires.python312 - def test_pep695_typealias_as_typemap_keys( - self, decl_base: Type[DeclarativeBase], type_ - ): - """test #10807, #12829""" + def test_optional_in_annotation_map(self, union): + """See issue #11370""" - decl_base.registry.update_type_annotation_map( - { - _UnionPep695: JSON, - _StrPep695: String(30), - _TypingStrPep695: String(30), - _GenericPep695: String(30), - _TypingGenericPep695: String(30), - _GenericPep695Typed: String(30), - _TypingGenericPep695Typed: String(30), - } - ) + global _Json, _JsonPep604, _JsonPep695 + + _JsonPrimitive = Union[str, int, float, bool, None] + _JsonObject = Dict[str, "_Json"] + _JsonArray = List["_Json"] + _Json = Union[_JsonObject, _JsonArray, _JsonPrimitive] + if requires.python310.enabled: + _JsonPrimitivePep604 = str | int | float | bool | None + _JsonObjectPep604 = dict[str, "_JsonPep604"] + _JsonArrayPep604 = list["_JsonPep604"] + _JsonPep604 = ( + _JsonObjectPep604 | _JsonArrayPep604 | _JsonPrimitivePep604 + ) + _JsonPep695 = TypeAliasType("_JsonPep695", _JsonPep604) + + class Base(DeclarativeBase): + if union.union: + type_annotation_map = {_Json: JSON} + elif union.pep604: + type_annotation_map = {_JsonPep604: JSON} + elif union.pep695: + type_annotation_map = {_JsonPep695: JSON} # noqa: F821 + else: + union.fail() + + class A(Base): + __tablename__ = "a" - class Test(decl_base): - __tablename__ = "test" id: Mapped[int] = mapped_column(primary_key=True) - if type_.str_extension: - data: Mapped[_StrPep695] - elif type_.str_typing: - data: Mapped[_TypingStrPep695] - elif type_.generic_extension: - data: Mapped[_GenericPep695] - elif type_.generic_typing: - data: Mapped[_TypingGenericPep695] - elif type_.generic_typed_extension: - data: Mapped[_GenericPep695Typed] - elif type_.generic_typed_typing: - data: Mapped[_TypingGenericPep695Typed] + if union.union: + json1: Mapped[_Json] + json2: Mapped[_Json] = mapped_column(nullable=False) + elif union.pep604: + json1: Mapped[_JsonPep604] + json2: Mapped[_JsonPep604] = mapped_column(nullable=False) + elif union.pep695: + json1: Mapped[_JsonPep695] # noqa: F821 + json2: Mapped[_JsonPep695] = mapped_column( # noqa: F821 + nullable=False + ) else: - type_.fail() - structure: Mapped[_UnionPep695] + union.fail() - eq_(Test.__table__.c.data.type._type_affinity, String) - eq_(Test.__table__.c.data.type.length, 30) - is_(Test.__table__.c.structure.type._type_affinity, JSON) + is_(A.__table__.c.json1.type._type_affinity, JSON) + is_(A.__table__.c.json2.type._type_affinity, JSON) + is_true(A.__table__.c.json1.nullable) + is_false(A.__table__.c.json2.nullable) - def test_pep484_newtypes_as_typemap_keys( - self, decl_base: Type[DeclarativeBase] + @testing.variation( + "option", + [ + "not_optional", + "optional", + "optional_fwd_ref", + "union_none", + ("pep604", testing.requires.python310), + ("pep604_fwd_ref", testing.requires.python310), + ], + ) + @testing.variation("brackets", ["oneset", "twosets"]) + @testing.combinations( + "include_mc_type", "derive_from_anno", argnames="include_mc_type" + ) + def test_optional_styles_nested_brackets( + self, option, brackets, include_mc_type ): - global str50, str30, str3050 + """composed types test, includes tests that were added later for + #12207""" - str50 = NewType("str50", str) - str30 = NewType("str30", str) - str3050 = NewType("str30", str50) + class Base(DeclarativeBase): + if testing.requires.python310.enabled: + type_annotation_map = { + Dict[str, Decimal]: JSON, + dict[str, Decimal]: JSON, + Union[List[int], List[str]]: JSON, + list[int] | list[str]: JSON, + } + else: + type_annotation_map = { + Dict[str, Decimal]: JSON, + Union[List[int], List[str]]: JSON, + } - decl_base.registry.update_type_annotation_map( - {str50: String(50), str30: String(30), str3050: String(150)} - ) + if include_mc_type == "include_mc_type": + mc = mapped_column(JSON) + mc2 = mapped_column(JSON) + else: + mc = mapped_column() + mc2 = mapped_column() - class MyClass(decl_base): - __tablename__ = "my_table" + class A(Base): + __tablename__ = "a" - id: Mapped[str50] = mapped_column(primary_key=True) - data_one: Mapped[str30] - data_two: Mapped[str50] - data_three: Mapped[Optional[str30]] - data_four: Mapped[str3050] + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[str] = mapped_column() - eq_(MyClass.__table__.c.data_one.type.length, 30) - is_false(MyClass.__table__.c.data_one.nullable) + if brackets.oneset: + if option.not_optional: + json: Mapped[Dict[str, Decimal]] = mapped_column() # type: ignore # noqa: E501 + if testing.requires.python310.enabled: + json2: Mapped[dict[str, Decimal]] = mapped_column() # type: ignore # noqa: E501 + elif option.optional: + json: Mapped[Optional[Dict[str, Decimal]]] = mc + if testing.requires.python310.enabled: + json2: Mapped[Optional[dict[str, Decimal]]] = mc2 + elif option.optional_fwd_ref: + json: Mapped["Optional[Dict[str, Decimal]]"] = mc + if testing.requires.python310.enabled: + json2: Mapped["Optional[dict[str, Decimal]]"] = mc2 + elif option.union_none: + json: Mapped[Union[Dict[str, Decimal], None]] = mc + json2: Mapped[Union[None, Dict[str, Decimal]]] = mc2 + elif option.pep604: + json: Mapped[dict[str, Decimal] | None] = mc + if testing.requires.python310.enabled: + json2: Mapped[None | dict[str, Decimal]] = mc2 + elif option.pep604_fwd_ref: + json: Mapped["dict[str, Decimal] | None"] = mc + if testing.requires.python310.enabled: + json2: Mapped["None | dict[str, Decimal]"] = mc2 + elif brackets.twosets: + if option.not_optional: + json: Mapped[Union[List[int], List[str]]] = mapped_column() # type: ignore # noqa: E501 + elif option.optional: + json: Mapped[Optional[Union[List[int], List[str]]]] = mc + if testing.requires.python310.enabled: + json2: Mapped[ + Optional[Union[list[int], list[str]]] + ] = mc2 + elif option.optional_fwd_ref: + json: Mapped["Optional[Union[List[int], List[str]]]"] = mc + if testing.requires.python310.enabled: + json2: Mapped[ + "Optional[Union[list[int], list[str]]]" + ] = mc2 + elif option.union_none: + json: Mapped[Union[List[int], List[str], None]] = mc + if testing.requires.python310.enabled: + json2: Mapped[Union[None, list[int], list[str]]] = mc2 + elif option.pep604: + json: Mapped[list[int] | list[str] | None] = mc + json2: Mapped[None | list[int] | list[str]] = mc2 + elif option.pep604_fwd_ref: + json: Mapped["list[int] | list[str] | None"] = mc + json2: Mapped["None | list[int] | list[str]"] = mc2 + else: + brackets.fail() - eq_(MyClass.__table__.c.data_two.type.length, 50) - is_false(MyClass.__table__.c.data_two.nullable) + is_(A.__table__.c.json.type._type_affinity, JSON) + if hasattr(A, "json2"): + is_(A.__table__.c.json2.type._type_affinity, JSON) + if option.not_optional: + is_false(A.__table__.c.json2.nullable) + else: + is_true(A.__table__.c.json2.nullable) - eq_(MyClass.__table__.c.data_three.type.length, 30) - is_true(MyClass.__table__.c.data_three.nullable) + if option.not_optional: + is_false(A.__table__.c.json.nullable) + else: + is_true(A.__table__.c.json.nullable) - eq_(MyClass.__table__.c.data_four.type.length, 150) - is_false(MyClass.__table__.c.data_four.nullable) + @testing.variation("optional", [True, False]) + @testing.variation("provide_type", [True, False]) + @testing.variation("add_to_type_map", [True, False]) + def test_recursive_type( + self, decl_base, optional, provide_type, add_to_type_map + ): + """test #9553""" - def test_newtype_missing_from_map(self, decl_base): - global str50 + global T - str50 = NewType("str50", str) + T = Dict[str, Optional["T"]] - if compat.py310: - text = ".*str50" - else: - # NewTypes before 3.10 had a very bad repr - # .new_type at 0x...> - text = ".*NewType.*" + if not provide_type and not add_to_type_map: + with expect_raises_message( + sa_exc.ArgumentError, + r"Could not locate SQLAlchemy.*" r".*ForwardRef\('T'\).*", + ): + + class TypeTest(decl_base): + __tablename__ = "my_table" + + id: Mapped[int] = mapped_column(primary_key=True) + if optional: + type_test: Mapped[Optional[T]] = mapped_column() + else: + type_test: Mapped[T] = mapped_column() - with expect_deprecated( - f"Matching the provided NewType '{text}' on its " - "resolved value without matching it in the " - "type_annotation_map is deprecated; add this type to the " - "type_annotation_map to allow it to match explicitly.", - ): + return - class MyClass(decl_base): + else: + if add_to_type_map: + decl_base.registry.update_type_annotation_map({T: JSON()}) + + class TypeTest(decl_base): __tablename__ = "my_table" id: Mapped[int] = mapped_column(primary_key=True) - data_one: Mapped[str50] - is_true(isinstance(MyClass.data_one.type, String)) + if add_to_type_map: + if optional: + type_test: Mapped[Optional[T]] = mapped_column() + else: + type_test: Mapped[T] = mapped_column() + else: + if optional: + type_test: Mapped[Optional[T]] = mapped_column(JSON()) + else: + type_test: Mapped[T] = mapped_column(JSON()) + + if optional: + is_(TypeTest.__table__.c.type_test.nullable, True) + else: + is_(TypeTest.__table__.c.type_test.nullable, False) + + self.assert_compile( + select(TypeTest), + "SELECT my_table.id, my_table.type_test FROM my_table", + ) class ResolveToEnumTest(fixtures.TestBase, testing.AssertsCompiledSQL): @@ -2915,6 +3042,119 @@ class ResolveToEnumTest(fixtures.TestBase, testing.AssertsCompiledSQL): is_(MyClass.__table__.c.data.type.enum_class, FooEnum) eq_(MyClass.__table__.c.data.type.name, "fooenum") # and not 'enum' + @testing.variation( + "type_", + [ + "literal", + "literal_typing", + "recursive", + "not_literal", + "not_literal_typing", + "generic", + "generic_typing", + "generic_typed", + "generic_typed_typing", + ], + ) + @testing.combinations(True, False, argnames="in_map") + @testing.requires.python312 + def test_pep695_literal_defaults_to_enum( + self, decl_base, type_, in_map, pep_695_types + ): + """test #11305.""" + + def declare(): + class Foo(decl_base): + __tablename__ = "footable" + + id: Mapped[int] = mapped_column(primary_key=True) + if type_.recursive: + status: Mapped[_RecursiveLiteral695] # noqa: F821 + elif type_.literal: + status: Mapped[_Literal695] # noqa: F821 + elif type_.literal_typing: + status: Mapped[_TypingLiteral695] # noqa: F821 + elif type_.not_literal: + status: Mapped[_StrPep695] # noqa: F821 + elif type_.not_literal_typing: + status: Mapped[_TypingStrPep695] # noqa: F821 + elif type_.generic: + status: Mapped[_GenericPep695] # noqa: F821 + elif type_.generic_typing: + status: Mapped[_TypingGenericPep695] # noqa: F821 + elif type_.generic_typed: + status: Mapped[_GenericPep695Typed] # noqa: F821 + elif type_.generic_typed_typing: + status: Mapped[_TypingGenericPep695Typed] # noqa: F821 + else: + type_.fail() + + return Foo + + if in_map: + decl_base.registry.update_type_annotation_map( + { + _Literal695: Enum(enum.Enum), # noqa: F821 + _TypingLiteral695: Enum(enum.Enum), # noqa: F821 + _RecursiveLiteral695: Enum(enum.Enum), # noqa: F821 + _StrPep695: Enum(enum.Enum), # noqa: F821 + _TypingStrPep695: Enum(enum.Enum), # noqa: F821 + _GenericPep695: Enum(enum.Enum), # noqa: F821 + _TypingGenericPep695: Enum(enum.Enum), # noqa: F821 + _GenericPep695Typed: Enum(enum.Enum), # noqa: F821 + _TypingGenericPep695Typed: Enum(enum.Enum), # noqa: F821 + } + ) + if type_.recursive: + with expect_deprecated( + "Mapping recursive TypeAliasType '.+' that resolve to " + "literal to generate an Enum is deprecated. SQLAlchemy " + "2.1 will not support this use case. Please avoid using " + "recursing TypeAliasType", + ): + Foo = declare() + elif type_.literal or type_.literal_typing: + Foo = declare() + else: + with expect_raises_message( + exc.ArgumentError, + "Can't associate TypeAliasType '.+' to an Enum " + "since it's not a direct alias of a Literal. Only " + "aliases in this form `type my_alias = Literal.'a', " + "'b'.` are supported when generating Enums.", + ): + declare() + elif type_.literal or type_.literal_typing: + Foo = declare() + col = Foo.__table__.c.status + is_true(isinstance(col.type, Enum)) + eq_(col.type.enums, ["to-do", "in-progress", "done"]) + is_(col.type.native_enum, False) + elif type_.not_literal or type_.not_literal_typing: + Foo = declare() + col = Foo.__table__.c.status + is_true(isinstance(col.type, String)) + elif type_.recursive: + with expect_deprecated( + "Matching to pep-695 type '_Literal695' in a " + "recursive fashion " + "without the recursed type being present in the " + "type_annotation_map is deprecated; add this type or its " + "recursed value to the type_annotation_map to allow it to " + "match explicitly." + ): + Foo = declare() + else: + with expect_raises_message( + orm_exc.MappedAnnotationError, + r"Could not locate SQLAlchemy Core type when resolving " + r"for Python type " + r"indicated by '.+' inside the Mapped\[\] " + r"annotation for the 'status' attribute", + ): + declare() + return + @testing.variation( "sqltype", [ @@ -3091,117 +3331,6 @@ class ResolveToEnumTest(fixtures.TestBase, testing.AssertsCompiledSQL): is_true(isinstance(Foo.__table__.c.status.type, JSON)) - @testing.variation( - "type_", - [ - "literal", - "literal_typing", - "recursive", - "not_literal", - "not_literal_typing", - "generic", - "generic_typing", - "generic_typed", - "generic_typed_typing", - ], - ) - @testing.combinations(True, False, argnames="in_map") - @testing.requires.python312 - def test_pep695_literal_defaults_to_enum(self, decl_base, type_, in_map): - """test #11305.""" - - def declare(): - class Foo(decl_base): - __tablename__ = "footable" - - id: Mapped[int] = mapped_column(primary_key=True) - if type_.recursive: - status: Mapped[_RecursiveLiteral695] # noqa: F821 - elif type_.literal: - status: Mapped[_Literal695] # noqa: F821 - elif type_.literal_typing: - status: Mapped[_TypingLiteral695] # noqa: F821 - elif type_.not_literal: - status: Mapped[_StrPep695] # noqa: F821 - elif type_.not_literal_typing: - status: Mapped[_TypingStrPep695] # noqa: F821 - elif type_.generic: - status: Mapped[_GenericPep695] # noqa: F821 - elif type_.generic_typing: - status: Mapped[_TypingGenericPep695] # noqa: F821 - elif type_.generic_typed: - status: Mapped[_GenericPep695Typed] # noqa: F821 - elif type_.generic_typed_typing: - status: Mapped[_TypingGenericPep695Typed] # noqa: F821 - else: - type_.fail() - - return Foo - - if in_map: - decl_base.registry.update_type_annotation_map( - { - _Literal695: Enum(enum.Enum), # noqa: F821 - _TypingLiteral695: Enum(enum.Enum), # noqa: F821 - _RecursiveLiteral695: Enum(enum.Enum), # noqa: F821 - _StrPep695: Enum(enum.Enum), # noqa: F821 - _TypingStrPep695: Enum(enum.Enum), # noqa: F821 - _GenericPep695: Enum(enum.Enum), # noqa: F821 - _TypingGenericPep695: Enum(enum.Enum), # noqa: F821 - _GenericPep695Typed: Enum(enum.Enum), # noqa: F821 - _TypingGenericPep695Typed: Enum(enum.Enum), # noqa: F821 - } - ) - if type_.recursive: - with expect_deprecated( - "Mapping recursive TypeAliasType '.+' that resolve to " - "literal to generate an Enum is deprecated. SQLAlchemy " - "2.1 will not support this use case. Please avoid using " - "recursing TypeAliasType", - ): - Foo = declare() - elif type_.literal or type_.literal_typing: - Foo = declare() - else: - with expect_raises_message( - exc.ArgumentError, - "Can't associate TypeAliasType '.+' to an Enum " - "since it's not a direct alias of a Literal. Only " - "aliases in this form `type my_alias = Literal.'a', " - "'b'.` are supported when generating Enums.", - ): - declare() - elif type_.literal or type_.literal_typing: - Foo = declare() - col = Foo.__table__.c.status - is_true(isinstance(col.type, Enum)) - eq_(col.type.enums, ["to-do", "in-progress", "done"]) - is_(col.type.native_enum, False) - elif type_.not_literal or type_.not_literal_typing: - Foo = declare() - col = Foo.__table__.c.status - is_true(isinstance(col.type, String)) - elif type_.recursive: - with expect_deprecated( - "Matching to pep-695 type '_Literal695' in a " - "recursive fashion " - "without the recursed type being present in the " - "type_annotation_map is deprecated; add this type or its " - "recursed value to the type_annotation_map to allow it to " - "match explicitly." - ): - Foo = declare() - else: - with expect_raises_message( - orm_exc.MappedAnnotationError, - r"Could not locate SQLAlchemy Core type when resolving " - r"for Python type " - r"indicated by '.+' inside the Mapped\[\] " - r"annotation for the 'status' attribute", - ): - declare() - return - class MixinTest(fixtures.TestBase, testing.AssertsCompiledSQL): __dialect__ = "default" diff --git a/test/orm/declarative/test_typed_mapping.py b/test/orm/declarative/test_typed_mapping.py index 003872a809..d9c657a8b1 100644 --- a/test/orm/declarative/test_typed_mapping.py +++ b/test/orm/declarative/test_typed_mapping.py @@ -24,7 +24,6 @@ import uuid import typing_extensions from typing_extensions import get_args as get_args from typing_extensions import Literal as Literal -from typing_extensions import TypeAlias as TypeAlias from typing_extensions import TypeAliasType from typing_extensions import TypedDict @@ -96,81 +95,82 @@ from sqlalchemy.testing.fixtures import fixture_session from sqlalchemy.util import compat from sqlalchemy.util.typing import Annotated -TV = typing.TypeVar("TV") + +# try to differentiate between typing_extensions.TypeAliasType +# and typing.TypeAliasType +TypingTypeAliasType = getattr(typing, "TypeAliasType", TypeAliasType) -class _SomeDict1(TypedDict): - type: Literal["1"] +@testing.fixture +def pep_695_types(): + global TV + global _UnionPep695, _TypingGenericPep695, _TypingStrPep695 + global _TypingGenericPep695, _TypingGenericPep695Typed + global _TypingLiteral695 + global _Literal695, _RecursiveLiteral695 + global _StrPep695, _GenericPep695, _GenericPep695Typed, _GenericPep695Typed + TV = typing.TypeVar("TV") -class _SomeDict2(TypedDict): - type: Literal["2"] + _StrPep695 = TypeAliasType("_StrPep695", str) # type: ignore + _GenericPep695 = TypeAliasType( # type: ignore + "_GenericPep695", List[TV], type_params=(TV,) + ) + _GenericPep695Typed = _GenericPep695[int] + class _SomeDict1(TypedDict): + type: Literal["1"] -_UnionTypeAlias: TypeAlias = Union[_SomeDict1, _SomeDict2] + class _SomeDict2(TypedDict): + type: Literal["2"] -_StrTypeAlias: TypeAlias = str + _Literal695 = TypeAliasType( # type: ignore + "_Literal695", Literal["to-do", "in-progress", "done"] + ) + _RecursiveLiteral695 = TypeAliasType( # type: ignore + "_RecursiveLiteral695", _Literal695 + ) + _UnionPep695 = TypeAliasType( # type: ignore + "_UnionPep695", Union[_SomeDict1, _SomeDict2] + ) + _TypingStrPep695 = TypingTypeAliasType( # type: ignore + "_TypingStrPep695", str + ) -if compat.py38: - _TypingLiteral = typing.Literal["a", "b"] -_TypingExtensionsLiteral = typing_extensions.Literal["a", "b"] + _TypingGenericPep695 = TypingTypeAliasType( # type: ignore + "_TypingGenericPep695", List[TV], type_params=(TV,) # type: ignore + ) -_JsonPrimitive: TypeAlias = Union[str, int, float, bool, None] -_JsonObject: TypeAlias = Dict[str, "_Json"] -_JsonArray: TypeAlias = List["_Json"] -_Json: TypeAlias = Union[_JsonObject, _JsonArray, _JsonPrimitive] + _TypingGenericPep695Typed = _TypingGenericPep695[int] # type: ignore -if compat.py310: - _JsonPrimitivePep604: TypeAlias = str | int | float | bool | None - _JsonObjectPep604: TypeAlias = dict[str, "_JsonPep604"] - _JsonArrayPep604: TypeAlias = list["_JsonPep604"] - _JsonPep604: TypeAlias = ( - _JsonObjectPep604 | _JsonArrayPep604 | _JsonPrimitivePep604 + _TypingLiteral695 = TypingTypeAliasType( # type: ignore + "_TypingLiteral695", Literal["to-do", "in-progress", "done"] ) - _JsonPep695 = TypeAliasType("_JsonPep695", _JsonPep604) -TypingTypeAliasType = getattr(typing, "TypeAliasType", TypeAliasType) -_StrPep695 = TypeAliasType("_StrPep695", str) -_TypingStrPep695 = TypingTypeAliasType("_TypingStrPep695", str) -_GenericPep695 = TypeAliasType("_GenericPep695", List[TV], type_params=(TV,)) -_TypingGenericPep695 = TypingTypeAliasType( - "_TypingGenericPep695", List[TV], type_params=(TV,) -) -_GenericPep695Typed = _GenericPep695[int] -_TypingGenericPep695Typed = _TypingGenericPep695[int] -_UnionPep695 = TypeAliasType("_UnionPep695", Union[_SomeDict1, _SomeDict2]) -strtypalias_keyword = TypeAliasType( - "strtypalias_keyword", Annotated[str, mapped_column(info={"hi": "there"})] -) -if compat.py310: - strtypalias_keyword_nested = TypeAliasType( - "strtypalias_keyword_nested", - int | Annotated[str, mapped_column(info={"hi": "there"})], +@testing.fixture +def pep_593_types(pep_695_types): + global _GenericPep593TypeAlias, _GenericPep593Pep695 + global _RecursivePep695Pep593 + + _GenericPep593TypeAlias = Annotated[ + TV, mapped_column(info={"hi": "there"}) # type: ignore + ] + + _GenericPep593Pep695 = TypingTypeAliasType( # type: ignore + "_GenericPep593Pep695", + Annotated[TV, mapped_column(info={"hi": "there"})], # type: ignore + type_params=(TV,), + ) + + _RecursivePep695Pep593 = TypingTypeAliasType( # type: ignore + "_RecursivePep695Pep593", + Annotated[ + _TypingStrPep695, # type: ignore + mapped_column(info={"hi": "there"}), + ], ) -strtypalias_ta: TypeAlias = Annotated[str, mapped_column(info={"hi": "there"})] -strtypalias_plain = Annotated[str, mapped_column(info={"hi": "there"})] -_Literal695 = TypeAliasType( - "_Literal695", Literal["to-do", "in-progress", "done"] -) -_TypingLiteral695 = TypingTypeAliasType( - "_TypingLiteral695", Literal["to-do", "in-progress", "done"] -) -_RecursiveLiteral695 = TypeAliasType("_RecursiveLiteral695", _Literal695) - -_GenericPep593TypeAlias = Annotated[TV, mapped_column(info={"hi": "there"})] - -_GenericPep593Pep695 = TypingTypeAliasType( - "_GenericPep593Pep695", - Annotated[TV, mapped_column(info={"hi": "there"})], - type_params=(TV,), -) - -_RecursivePep695Pep593 = TypingTypeAliasType( - "_RecursivePep695Pep593", - Annotated[_TypingStrPep695, mapped_column(info={"hi": "there"})], -) def expect_annotation_syntax_error(name): @@ -214,15 +214,6 @@ class DeclarativeBaseTest(fixtures.TestBase): Tab.non_existent -_annotated_names_tested = set() - - -def annotated_name_test_cases(*cases, **kw): - _annotated_names_tested.update([case[0] for case in cases]) - - return testing.combinations_list(cases, **kw) - - class MappedColumnTest(fixtures.TestBase, testing.AssertsCompiledSQL): __dialect__ = "default" @@ -627,7 +618,7 @@ class MappedColumnTest(fixtures.TestBase, testing.AssertsCompiledSQL): anno_str = Annotated[str, 50] anno_str_optional = Annotated[Optional[str], 30] - newtype_str = NewType("MyType", str) + newtype_str = NewType("newtype_str", str) anno_str_mc = Annotated[str, mapped_column()] anno_str_optional_mc = Annotated[Optional[str], mapped_column()] @@ -766,6 +757,11 @@ class MappedColumnTest(fixtures.TestBase, testing.AssertsCompiledSQL): def test_typing_literal_identity(self, decl_base): """See issue #11820""" + # anno only: global _TypingLiteral, _TypingExtensionsLiteral + + _TypingLiteral = typing.Literal["a", "b"] + _TypingExtensionsLiteral = typing_extensions.Literal["a", "b"] + class Foo(decl_base): __tablename__ = "footable" @@ -778,162 +774,187 @@ class MappedColumnTest(fixtures.TestBase, testing.AssertsCompiledSQL): eq_(col.type.enums, ["a", "b"]) is_(col.type.native_enum, False) - @testing.requires.python310 - def test_we_got_all_attrs_test_annotated(self): - argnames = _py_inspect.getfullargspec(mapped_column) - assert _annotated_names_tested.issuperset(argnames.kwonlyargs), ( - f"annotated attributes were not tested: " - f"{set(argnames.kwonlyargs).difference(_annotated_names_tested)}" - ) - - @annotated_name_test_cases( - ("sort_order", 100, lambda sort_order: sort_order == 100), - ("nullable", False, lambda column: column.nullable is False), - ( - "active_history", - True, - lambda column_property: column_property.active_history is True, - ), - ( - "deferred", - True, - lambda column_property: column_property.deferred is True, - ), - ( - "deferred", - _NoArg.NO_ARG, - lambda column_property: column_property is None, - ), - ( - "deferred_group", - "mygroup", - lambda column_property: column_property.deferred is True - and column_property.group == "mygroup", - ), - ( - "deferred_raiseload", - True, - lambda column_property: column_property.deferred is True - and column_property.raiseload is True, - ), - ( - "server_default", - "25", - lambda column: column.server_default.arg == "25", - ), - ( - "server_onupdate", - "25", - lambda column: column.server_onupdate.arg == "25", - ), - ( - "default", - 25, - lambda column: column.default.arg == 25, - ), - ( - "insert_default", - 25, - lambda column: column.default.arg == 25, - ), - ( - "onupdate", - 25, - lambda column: column.onupdate.arg == 25, - ), - ("doc", "some doc", lambda column: column.doc == "some doc"), - ( - "comment", - "some comment", - lambda column: column.comment == "some comment", - ), - ("index", True, lambda column: column.index is True), - ("index", _NoArg.NO_ARG, lambda column: column.index is None), - ("index", False, lambda column: column.index is False), - ("unique", True, lambda column: column.unique is True), - ("unique", False, lambda column: column.unique is False), - ("autoincrement", True, lambda column: column.autoincrement is True), - ("system", True, lambda column: column.system is True), - ("primary_key", True, lambda column: column.primary_key is True), - ("type_", BIGINT, lambda column: isinstance(column.type, BIGINT)), - ("info", {"foo": "bar"}, lambda column: column.info == {"foo": "bar"}), - ( - "use_existing_column", - True, - lambda mc: mc._use_existing_column is True, - ), - ( - "quote", - True, - exc.SADeprecationWarning( - "Can't use the 'key' or 'name' arguments in Annotated " + @staticmethod + def annotated_name_test_cases(): + return [ + ("sort_order", 100, lambda sort_order: sort_order == 100), + ("nullable", False, lambda column: column.nullable is False), + ( + "active_history", + True, + lambda column_property: column_property.active_history is True, ), - ), - ( - "key", - "mykey", - exc.SADeprecationWarning( - "Can't use the 'key' or 'name' arguments in Annotated " + ( + "deferred", + True, + lambda column_property: column_property.deferred is True, ), - ), - ( - "name", - "mykey", - exc.SADeprecationWarning( - "Can't use the 'key' or 'name' arguments in Annotated " + ( + "deferred", + _NoArg.NO_ARG, + lambda column_property: column_property is None, ), - ), - ( - "kw_only", - True, - exc.SADeprecationWarning( - "Argument 'kw_only' is a dataclass argument " + ( + "deferred_group", + "mygroup", + lambda column_property: column_property.deferred is True + and column_property.group == "mygroup", ), - testing.requires.python310, - ), - ( - "compare", - True, - exc.SADeprecationWarning( - "Argument 'compare' is a dataclass argument " + ( + "deferred_raiseload", + True, + lambda column_property: column_property.deferred is True + and column_property.raiseload is True, ), - testing.requires.python310, - ), - ( - "default_factory", - lambda: 25, - exc.SADeprecationWarning( - "Argument 'default_factory' is a dataclass argument " + ( + "server_default", + "25", + lambda column: column.server_default.arg == "25", ), - ), - ( - "repr", - True, - exc.SADeprecationWarning( - "Argument 'repr' is a dataclass argument " + ( + "server_onupdate", + "25", + lambda column: column.server_onupdate.arg == "25", ), - ), - ( - "init", - True, - exc.SADeprecationWarning( - "Argument 'init' is a dataclass argument" + ( + "default", + 25, + lambda column: column.default.arg == 25, ), - ), - ( - "hash", - True, - exc.SADeprecationWarning( - "Argument 'hash' is a dataclass argument" + ( + "insert_default", + 25, + lambda column: column.default.arg == 25, ), - ), - ( - "dataclass_metadata", - {}, - exc.SADeprecationWarning( - "Argument 'dataclass_metadata' is a dataclass argument" + ( + "onupdate", + 25, + lambda column: column.onupdate.arg == 25, ), - ), + ("doc", "some doc", lambda column: column.doc == "some doc"), + ( + "comment", + "some comment", + lambda column: column.comment == "some comment", + ), + ("index", True, lambda column: column.index is True), + ("index", _NoArg.NO_ARG, lambda column: column.index is None), + ("index", False, lambda column: column.index is False), + ("unique", True, lambda column: column.unique is True), + ("unique", False, lambda column: column.unique is False), + ( + "autoincrement", + True, + lambda column: column.autoincrement is True, + ), + ("system", True, lambda column: column.system is True), + ("primary_key", True, lambda column: column.primary_key is True), + ("type_", BIGINT, lambda column: isinstance(column.type, BIGINT)), + ( + "info", + {"foo": "bar"}, + lambda column: column.info == {"foo": "bar"}, + ), + ( + "use_existing_column", + True, + lambda mc: mc._use_existing_column is True, + ), + ( + "quote", + True, + exc.SADeprecationWarning( + "Can't use the 'key' or 'name' arguments in Annotated " + ), + ), + ( + "key", + "mykey", + exc.SADeprecationWarning( + "Can't use the 'key' or 'name' arguments in Annotated " + ), + ), + ( + "name", + "mykey", + exc.SADeprecationWarning( + "Can't use the 'key' or 'name' arguments in Annotated " + ), + ), + ( + "kw_only", + True, + exc.SADeprecationWarning( + "Argument 'kw_only' is a dataclass argument " + ), + testing.requires.python310, + ), + ( + "compare", + True, + exc.SADeprecationWarning( + "Argument 'compare' is a dataclass argument " + ), + testing.requires.python310, + ), + ( + "default_factory", + lambda: 25, + exc.SADeprecationWarning( + "Argument 'default_factory' is a dataclass argument " + ), + ), + ( + "repr", + True, + exc.SADeprecationWarning( + "Argument 'repr' is a dataclass argument " + ), + ), + ( + "init", + True, + exc.SADeprecationWarning( + "Argument 'init' is a dataclass argument" + ), + ), + ( + "hash", + True, + exc.SADeprecationWarning( + "Argument 'hash' is a dataclass argument" + ), + ), + ( + "dataclass_metadata", + {}, + exc.SADeprecationWarning( + "Argument 'dataclass_metadata' is a dataclass argument" + ), + ), + ] + + if not compat.py310: + annotated_name_test_cases = annotated_name_test_cases.__func__ + static_annotated_name_test_cases = staticmethod( + annotated_name_test_cases + ) + else: + static_annotated_name_test_cases = annotated_name_test_cases + + def test_we_got_all_attrs_test_annotated(self): + argnames = _py_inspect.getfullargspec(mapped_column) + _annotated_names_tested = { + case[0] for case in self.static_annotated_name_test_cases() + } + assert _annotated_names_tested.issuperset(argnames.kwonlyargs), ( + f"annotated attributes were not tested: " + f"{set(argnames.kwonlyargs).difference(_annotated_names_tested)}" + ) + + @testing.requires.python310 + @testing.combinations_list( + annotated_name_test_cases(), argnames="argname, argument, assertion", ) @testing.variation("use_annotated", [True, False, "control"]) @@ -1051,369 +1072,106 @@ class MappedColumnTest(fixtures.TestBase, testing.AssertsCompiledSQL): else: is_(result, orig) - @testing.variation( - "union", - [ - "union", - ("pep604", requires.python310), - "union_null", - ("pep604_null", requires.python310), - ], - ) - def test_unions(self, union): - # anno only: global UnionType - our_type = Numeric(10, 2) + @testing.variation("optional", [True, False]) + @testing.variation("provide_type", [True, False]) + @testing.variation("add_to_type_map", [True, False]) + def test_recursive_type( + self, decl_base, optional, provide_type, add_to_type_map + ): + """test #9553""" + + global T + + T = Dict[str, Optional["T"]] + + if not provide_type and not add_to_type_map: + with expect_raises_message( + sa_exc.ArgumentError, + r"Could not locate SQLAlchemy.*" r".*ForwardRef\('T'\).*", + ): + + class TypeTest(decl_base): + __tablename__ = "my_table" + + id: Mapped[int] = mapped_column(primary_key=True) + if optional: + type_test: Mapped[Optional[T]] = mapped_column() + else: + type_test: Mapped[T] = mapped_column() + + return - if union.union: - UnionType = Union[float, Decimal] - elif union.union_null: - UnionType = Union[float, Decimal, None] - elif union.pep604: - UnionType = float | Decimal - elif union.pep604_null: - UnionType = float | Decimal | None else: - union.fail() + if add_to_type_map: + decl_base.registry.update_type_annotation_map({T: JSON()}) - class Base(DeclarativeBase): - type_annotation_map = {UnionType: our_type} + class TypeTest(decl_base): + __tablename__ = "my_table" - class User(Base): + id: Mapped[int] = mapped_column(primary_key=True) + + if add_to_type_map: + if optional: + type_test: Mapped[Optional[T]] = mapped_column() + else: + type_test: Mapped[T] = mapped_column() + else: + if optional: + type_test: Mapped[Optional[T]] = mapped_column(JSON()) + else: + type_test: Mapped[T] = mapped_column(JSON()) + + if optional: + is_(TypeTest.__table__.c.type_test.nullable, True) + else: + is_(TypeTest.__table__.c.type_test.nullable, False) + + self.assert_compile( + select(TypeTest), + "SELECT my_table.id, my_table.type_test FROM my_table", + ) + + def test_missing_mapped_lhs(self, decl_base): + with expect_annotation_syntax_error("User.name"): + + class User(decl_base): + __tablename__ = "users" + + id: Mapped[int] = mapped_column(primary_key=True) + name: str = mapped_column() # type: ignore + + def test_construct_lhs_separate_name(self, decl_base): + class User(decl_base): __tablename__ = "users" id: Mapped[int] = mapped_column(primary_key=True) + name: Mapped[str] = mapped_column() + data: Mapped[Optional[str]] = mapped_column("the_data") - data: Mapped[Union[float, Decimal]] - reverse_data: Mapped[Union[Decimal, float]] + self.assert_compile( + select(User.data), "SELECT users.the_data FROM users" + ) + is_true(User.__table__.c.the_data.nullable) - optional_data: Mapped[Optional[Union[float, Decimal]]] = ( - mapped_column() - ) + def test_construct_works_in_expr(self, decl_base): + class User(decl_base): + __tablename__ = "users" - # use Optional directly - reverse_optional_data: Mapped[Optional[Union[Decimal, float]]] = ( - mapped_column() - ) + id: Mapped[int] = mapped_column(primary_key=True) - # use Union with None, same as Optional but presents differently - # (Optional object with __origin__ Union vs. Union) - reverse_u_optional_data: Mapped[Union[Decimal, float, None]] = ( - mapped_column() - ) + class Address(decl_base): + __tablename__ = "addresses" - refer_union: Mapped[UnionType] - refer_union_optional: Mapped[Optional[UnionType]] + id: Mapped[int] = mapped_column(primary_key=True) + user_id: Mapped[int] = mapped_column(ForeignKey("users.id")) - # py38, 37 does not automatically flatten unions, add extra tests - # for this. maintain these in order to catch future regressions - # in the behavior of ``Union`` - unflat_union_optional_data: Mapped[ - Union[Union[Decimal, float, None], None] - ] = mapped_column() + user = relationship(User, primaryjoin=user_id == User.id) - float_data: Mapped[float] = mapped_column() - decimal_data: Mapped[Decimal] = mapped_column() - - if compat.py310: - pep604_data: Mapped[float | Decimal] = mapped_column() - pep604_reverse: Mapped[Decimal | float] = mapped_column() - pep604_optional: Mapped[Decimal | float | None] = ( - mapped_column() - ) - pep604_data_fwd: Mapped["float | Decimal"] = mapped_column() - pep604_reverse_fwd: Mapped["Decimal | float"] = mapped_column() - pep604_optional_fwd: Mapped["Decimal | float | None"] = ( - mapped_column() - ) - - info = [ - ("data", False), - ("reverse_data", False), - ("optional_data", True), - ("reverse_optional_data", True), - ("reverse_u_optional_data", True), - ("refer_union", "null" in union.name), - ("refer_union_optional", True), - ("unflat_union_optional_data", True), - ] - if compat.py310: - info += [ - ("pep604_data", False), - ("pep604_reverse", False), - ("pep604_optional", True), - ("pep604_data_fwd", False), - ("pep604_reverse_fwd", False), - ("pep604_optional_fwd", True), - ] - - for name, nullable in info: - col = User.__table__.c[name] - is_(col.type, our_type, name) - is_(col.nullable, nullable, name) - - is_true(isinstance(User.__table__.c.float_data.type, Float)) - ne_(User.__table__.c.float_data.type, our_type) - - is_true(isinstance(User.__table__.c.decimal_data.type, Numeric)) - ne_(User.__table__.c.decimal_data.type, our_type) - - @testing.variation( - "union", - [ - "union", - ("pep604", requires.python310), - ("pep695", requires.python312), - ], - ) - def test_optional_in_annotation_map(self, union): - """See issue #11370""" - - class Base(DeclarativeBase): - if union.union: - type_annotation_map = {_Json: JSON} - elif union.pep604: - type_annotation_map = {_JsonPep604: JSON} - elif union.pep695: - type_annotation_map = {_JsonPep695: JSON} # noqa: F821 - else: - union.fail() - - class A(Base): - __tablename__ = "a" - - id: Mapped[int] = mapped_column(primary_key=True) - if union.union: - json1: Mapped[_Json] - json2: Mapped[_Json] = mapped_column(nullable=False) - elif union.pep604: - json1: Mapped[_JsonPep604] - json2: Mapped[_JsonPep604] = mapped_column(nullable=False) - elif union.pep695: - json1: Mapped[_JsonPep695] # noqa: F821 - json2: Mapped[_JsonPep695] = mapped_column( # noqa: F821 - nullable=False - ) - else: - union.fail() - - is_(A.__table__.c.json1.type._type_affinity, JSON) - is_(A.__table__.c.json2.type._type_affinity, JSON) - is_true(A.__table__.c.json1.nullable) - is_false(A.__table__.c.json2.nullable) - - @testing.variation( - "option", - [ - "not_optional", - "optional", - "optional_fwd_ref", - "union_none", - ("pep604", testing.requires.python310), - ("pep604_fwd_ref", testing.requires.python310), - ], - ) - @testing.variation("brackets", ["oneset", "twosets"]) - @testing.combinations( - "include_mc_type", "derive_from_anno", argnames="include_mc_type" - ) - def test_optional_styles_nested_brackets( - self, option, brackets, include_mc_type - ): - """composed types test, includes tests that were added later for - #12207""" - - class Base(DeclarativeBase): - if testing.requires.python310.enabled: - type_annotation_map = { - Dict[str, Decimal]: JSON, - dict[str, Decimal]: JSON, - Union[List[int], List[str]]: JSON, - list[int] | list[str]: JSON, - } - else: - type_annotation_map = { - Dict[str, Decimal]: JSON, - Union[List[int], List[str]]: JSON, - } - - if include_mc_type == "include_mc_type": - mc = mapped_column(JSON) - mc2 = mapped_column(JSON) - else: - mc = mapped_column() - mc2 = mapped_column() - - class A(Base): - __tablename__ = "a" - - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[str] = mapped_column() - - if brackets.oneset: - if option.not_optional: - json: Mapped[Dict[str, Decimal]] = mapped_column() # type: ignore # noqa: E501 - if testing.requires.python310.enabled: - json2: Mapped[dict[str, Decimal]] = mapped_column() # type: ignore # noqa: E501 - elif option.optional: - json: Mapped[Optional[Dict[str, Decimal]]] = mc - if testing.requires.python310.enabled: - json2: Mapped[Optional[dict[str, Decimal]]] = mc2 - elif option.optional_fwd_ref: - json: Mapped["Optional[Dict[str, Decimal]]"] = mc - if testing.requires.python310.enabled: - json2: Mapped["Optional[dict[str, Decimal]]"] = mc2 - elif option.union_none: - json: Mapped[Union[Dict[str, Decimal], None]] = mc - json2: Mapped[Union[None, Dict[str, Decimal]]] = mc2 - elif option.pep604: - json: Mapped[dict[str, Decimal] | None] = mc - if testing.requires.python310.enabled: - json2: Mapped[None | dict[str, Decimal]] = mc2 - elif option.pep604_fwd_ref: - json: Mapped["dict[str, Decimal] | None"] = mc - if testing.requires.python310.enabled: - json2: Mapped["None | dict[str, Decimal]"] = mc2 - elif brackets.twosets: - if option.not_optional: - json: Mapped[Union[List[int], List[str]]] = mapped_column() # type: ignore # noqa: E501 - elif option.optional: - json: Mapped[Optional[Union[List[int], List[str]]]] = mc - if testing.requires.python310.enabled: - json2: Mapped[ - Optional[Union[list[int], list[str]]] - ] = mc2 - elif option.optional_fwd_ref: - json: Mapped["Optional[Union[List[int], List[str]]]"] = mc - if testing.requires.python310.enabled: - json2: Mapped[ - "Optional[Union[list[int], list[str]]]" - ] = mc2 - elif option.union_none: - json: Mapped[Union[List[int], List[str], None]] = mc - if testing.requires.python310.enabled: - json2: Mapped[Union[None, list[int], list[str]]] = mc2 - elif option.pep604: - json: Mapped[list[int] | list[str] | None] = mc - json2: Mapped[None | list[int] | list[str]] = mc2 - elif option.pep604_fwd_ref: - json: Mapped["list[int] | list[str] | None"] = mc - json2: Mapped["None | list[int] | list[str]"] = mc2 - else: - brackets.fail() - - is_(A.__table__.c.json.type._type_affinity, JSON) - if hasattr(A, "json2"): - is_(A.__table__.c.json2.type._type_affinity, JSON) - if option.not_optional: - is_false(A.__table__.c.json2.nullable) - else: - is_true(A.__table__.c.json2.nullable) - - if option.not_optional: - is_false(A.__table__.c.json.nullable) - else: - is_true(A.__table__.c.json.nullable) - - @testing.variation("optional", [True, False]) - @testing.variation("provide_type", [True, False]) - @testing.variation("add_to_type_map", [True, False]) - def test_recursive_type( - self, decl_base, optional, provide_type, add_to_type_map - ): - """test #9553""" - - global T - - T = Dict[str, Optional["T"]] - - if not provide_type and not add_to_type_map: - with expect_raises_message( - sa_exc.ArgumentError, - r"Could not locate SQLAlchemy.*" r".*ForwardRef\('T'\).*", - ): - - class TypeTest(decl_base): - __tablename__ = "my_table" - - id: Mapped[int] = mapped_column(primary_key=True) - if optional: - type_test: Mapped[Optional[T]] = mapped_column() - else: - type_test: Mapped[T] = mapped_column() - - return - - else: - if add_to_type_map: - decl_base.registry.update_type_annotation_map({T: JSON()}) - - class TypeTest(decl_base): - __tablename__ = "my_table" - - id: Mapped[int] = mapped_column(primary_key=True) - - if add_to_type_map: - if optional: - type_test: Mapped[Optional[T]] = mapped_column() - else: - type_test: Mapped[T] = mapped_column() - else: - if optional: - type_test: Mapped[Optional[T]] = mapped_column(JSON()) - else: - type_test: Mapped[T] = mapped_column(JSON()) - - if optional: - is_(TypeTest.__table__.c.type_test.nullable, True) - else: - is_(TypeTest.__table__.c.type_test.nullable, False) - - self.assert_compile( - select(TypeTest), - "SELECT my_table.id, my_table.type_test FROM my_table", - ) - - def test_missing_mapped_lhs(self, decl_base): - with expect_annotation_syntax_error("User.name"): - - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - name: str = mapped_column() # type: ignore - - def test_construct_lhs_separate_name(self, decl_base): - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - name: Mapped[str] = mapped_column() - data: Mapped[Optional[str]] = mapped_column("the_data") - - self.assert_compile( - select(User.data), "SELECT users.the_data FROM users" - ) - is_true(User.__table__.c.the_data.nullable) - - def test_construct_works_in_expr(self, decl_base): - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - - class Address(decl_base): - __tablename__ = "addresses" - - id: Mapped[int] = mapped_column(primary_key=True) - user_id: Mapped[int] = mapped_column(ForeignKey("users.id")) - - user = relationship(User, primaryjoin=user_id == User.id) - - self.assert_compile( - select(Address.user_id, User.id).join(Address.user), - "SELECT addresses.user_id, users.id FROM addresses " - "JOIN users ON addresses.user_id = users.id", - ) + self.assert_compile( + select(Address.user_id, User.id).join(Address.user), + "SELECT addresses.user_id, users.id FROM addresses " + "JOIN users ON addresses.user_id = users.id", + ) def test_construct_works_as_polymorphic_on(self, decl_base): class User(decl_base): @@ -1542,6 +1300,21 @@ class Pep593InterpretationTests(fixtures.TestBase, testing.AssertsCompiledSQL): self, decl_base: Type[DeclarativeBase], alias_type ): """test #11130""" + + # anno only: global strtypalias_keyword, strtypalias_keyword_nested + # anno only: global strtypalias_ta, strtypalias_plain + + strtypalias_keyword = TypeAliasType( + "strtypalias_keyword", + Annotated[str, mapped_column(info={"hi": "there"})], + ) + strtypalias_keyword_nested = TypeAliasType( + "strtypalias_keyword_nested", + int | Annotated[str, mapped_column(info={"hi": "there"})], + ) + strtypalias_ta = Annotated[str, mapped_column(info={"hi": "there"})] + strtypalias_plain = Annotated[str, mapped_column(info={"hi": "there"})] + if alias_type.typekeyword: decl_base.registry.update_type_annotation_map( {strtypalias_keyword: VARCHAR(33)} # noqa: F821 @@ -1585,60 +1358,28 @@ class Pep593InterpretationTests(fixtures.TestBase, testing.AssertsCompiledSQL): @testing.requires.python312 def test_no_recursive_pep593_from_pep695( - self, decl_base: Type[DeclarativeBase] + self, decl_base: Type[DeclarativeBase], pep_593_types ): + def declare(): class MyClass(decl_base): __tablename__ = "my_table" id: Mapped[int] = mapped_column(primary_key=True) - data_one: Mapped[_RecursivePep695Pep593] # noqa: F821 - - with expect_raises_message( - orm_exc.MappedAnnotationError, - r"Could not locate SQLAlchemy Core type when resolving for Python " - r"type " - r"indicated by '_RecursivePep695Pep593' inside the Mapped\[\] " - r"annotation for the 'data_one' attribute; none of " - r"'_RecursivePep695Pep593', " - r"'typing.Annotated\[_TypingStrPep695, .*\]', '_TypingStrPep695' " - r"are resolvable by the registry", - ): - declare() - - @testing.variation("in_map", [True, False]) - @testing.variation("alias_type", ["plain", "pep695"]) - @testing.requires.python312 - def test_generic_typealias_pep593( - self, decl_base: Type[DeclarativeBase], alias_type: Variation, in_map - ): - - if in_map: - decl_base.registry.update_type_annotation_map( - { - _GenericPep593TypeAlias[str]: VARCHAR(33), - _GenericPep593Pep695[str]: VARCHAR(33), - } - ) - - class MyClass(decl_base): - __tablename__ = "my_table" - - id: Mapped[int] = mapped_column(primary_key=True) - - if alias_type.plain: - data_one: Mapped[_GenericPep593TypeAlias[str]] # noqa: F821 - elif alias_type.pep695: - data_one: Mapped[_GenericPep593Pep695[str]] # noqa: F821 - else: - alias_type.fail() - - eq_(MyClass.data_one.expression.info, {"hi": "there"}) - if in_map: - eq_(MyClass.data_one.expression.type.length, 33) - else: - eq_(MyClass.data_one.expression.type.length, None) + data_one: Mapped[_RecursivePep695Pep593] # noqa: F821 + + with expect_raises_message( + orm_exc.MappedAnnotationError, + r"Could not locate SQLAlchemy Core type when resolving for Python " + r"type " + r"indicated by '_RecursivePep695Pep593' inside the Mapped\[\] " + r"annotation for the 'data_one' attribute; none of " + r"'_RecursivePep695Pep593', " + r"'typing.Annotated\[_TypingStrPep695, .*\]', '_TypingStrPep695' " + r"are resolvable by the registry", + ): + declare() def test_extract_base_type_from_pep593( self, decl_base: Type[DeclarativeBase] @@ -2165,6 +1906,43 @@ class Pep593InterpretationTests(fixtures.TestBase, testing.AssertsCompiledSQL): eq_(A_1.label.property.columns[0].table, A.__table__) eq_(A_2.label.property.columns[0].table, A.__table__) + @testing.variation("in_map", [True, False]) + @testing.variation("alias_type", ["plain", "pep695"]) + @testing.requires.python312 + def test_generic_typealias_pep593( + self, + decl_base: Type[DeclarativeBase], + alias_type: Variation, + in_map, + pep_593_types, + ): + + if in_map: + decl_base.registry.update_type_annotation_map( + { + _GenericPep593TypeAlias[str]: VARCHAR(33), + _GenericPep593Pep695[str]: VARCHAR(33), + } + ) + + class MyClass(decl_base): + __tablename__ = "my_table" + + id: Mapped[int] = mapped_column(primary_key=True) + + if alias_type.plain: + data_one: Mapped[_GenericPep593TypeAlias[str]] # noqa: F821 + elif alias_type.pep695: + data_one: Mapped[_GenericPep593Pep695[str]] # noqa: F821 + else: + alias_type.fail() + + eq_(MyClass.data_one.expression.info, {"hi": "there"}) + if in_map: + eq_(MyClass.data_one.expression.type.length, 33) + else: + eq_(MyClass.data_one.expression.type.length, None) + class TypeResolutionTests(fixtures.TestBase, testing.AssertsCompiledSQL): __dialect__ = "default" @@ -2554,6 +2332,18 @@ class TypeResolutionTests(fixtures.TestBase, testing.AssertsCompiledSQL): def test_plain_typealias_as_typemap_keys( self, decl_base: Type[DeclarativeBase] ): + + # anno only: global _StrTypeAlias, _UnionTypeAlias + + class _SomeDict1(TypedDict): + type: Literal["1"] + + class _SomeDict2(TypedDict): + type: Literal["2"] + + _StrTypeAlias = str + _UnionTypeAlias = Union[_SomeDict1, _SomeDict2] + decl_base.registry.update_type_annotation_map( {_UnionTypeAlias: JSON, _StrTypeAlias: String(30)} ) @@ -2662,117 +2452,454 @@ class TypeResolutionTests(fixtures.TestBase, testing.AssertsCompiledSQL): return @testing.variation( - "type_", + "type_", + [ + "str_extension", + "str_typing", + "generic_extension", + "generic_typing", + "generic_typed_extension", + "generic_typed_typing", + ], + ) + @testing.requires.python312 + def test_pep695_typealias_as_typemap_keys( + self, decl_base: Type[DeclarativeBase], type_, pep_695_types + ): + """test #10807, #12829""" + + decl_base.registry.update_type_annotation_map( + { + _UnionPep695: JSON, + _StrPep695: String(30), + _TypingStrPep695: String(30), + _GenericPep695: String(30), + _TypingGenericPep695: String(30), + _GenericPep695Typed: String(30), + _TypingGenericPep695Typed: String(30), + } + ) + + class Test(decl_base): + __tablename__ = "test" + id: Mapped[int] = mapped_column(primary_key=True) + if type_.str_extension: + data: Mapped[_StrPep695] + elif type_.str_typing: + data: Mapped[_TypingStrPep695] + elif type_.generic_extension: + data: Mapped[_GenericPep695] + elif type_.generic_typing: + data: Mapped[_TypingGenericPep695] + elif type_.generic_typed_extension: + data: Mapped[_GenericPep695Typed] + elif type_.generic_typed_typing: + data: Mapped[_TypingGenericPep695Typed] + else: + type_.fail() + structure: Mapped[_UnionPep695] + + eq_(Test.__table__.c.data.type._type_affinity, String) + eq_(Test.__table__.c.data.type.length, 30) + is_(Test.__table__.c.structure.type._type_affinity, JSON) + + def test_pep484_newtypes_as_typemap_keys( + self, decl_base: Type[DeclarativeBase] + ): + # anno only: global str50, str30, str3050 + + str50 = NewType("str50", str) + str30 = NewType("str30", str) + str3050 = NewType("str30", str50) + + decl_base.registry.update_type_annotation_map( + {str50: String(50), str30: String(30), str3050: String(150)} + ) + + class MyClass(decl_base): + __tablename__ = "my_table" + + id: Mapped[str50] = mapped_column(primary_key=True) + data_one: Mapped[str30] + data_two: Mapped[str50] + data_three: Mapped[Optional[str30]] + data_four: Mapped[str3050] + + eq_(MyClass.__table__.c.data_one.type.length, 30) + is_false(MyClass.__table__.c.data_one.nullable) + + eq_(MyClass.__table__.c.data_two.type.length, 50) + is_false(MyClass.__table__.c.data_two.nullable) + + eq_(MyClass.__table__.c.data_three.type.length, 30) + is_true(MyClass.__table__.c.data_three.nullable) + + eq_(MyClass.__table__.c.data_four.type.length, 150) + is_false(MyClass.__table__.c.data_four.nullable) + + def test_newtype_missing_from_map(self, decl_base): + # anno only: global str50 + + str50 = NewType("str50", str) + + if compat.py310: + text = ".*str50" + else: + # NewTypes before 3.10 had a very bad repr + # .new_type at 0x...> + text = ".*NewType.*" + + with expect_deprecated( + f"Matching the provided NewType '{text}' on its " + "resolved value without matching it in the " + "type_annotation_map is deprecated; add this type to the " + "type_annotation_map to allow it to match explicitly.", + ): + + class MyClass(decl_base): + __tablename__ = "my_table" + + id: Mapped[int] = mapped_column(primary_key=True) + data_one: Mapped[str50] + + is_true(isinstance(MyClass.data_one.type, String)) + + @testing.variation( + "union", + [ + "union", + ("pep604", requires.python310), + "union_null", + ("pep604_null", requires.python310), + ], + ) + def test_unions(self, union): + # anno only: global UnionType + our_type = Numeric(10, 2) + + if union.union: + UnionType = Union[float, Decimal] + elif union.union_null: + UnionType = Union[float, Decimal, None] + elif union.pep604: + UnionType = float | Decimal + elif union.pep604_null: + UnionType = float | Decimal | None + else: + union.fail() + + class Base(DeclarativeBase): + type_annotation_map = {UnionType: our_type} + + class User(Base): + __tablename__ = "users" + + id: Mapped[int] = mapped_column(primary_key=True) + + data: Mapped[Union[float, Decimal]] + reverse_data: Mapped[Union[Decimal, float]] + + optional_data: Mapped[Optional[Union[float, Decimal]]] = ( + mapped_column() + ) + + # use Optional directly + reverse_optional_data: Mapped[Optional[Union[Decimal, float]]] = ( + mapped_column() + ) + + # use Union with None, same as Optional but presents differently + # (Optional object with __origin__ Union vs. Union) + reverse_u_optional_data: Mapped[Union[Decimal, float, None]] = ( + mapped_column() + ) + + refer_union: Mapped[UnionType] + refer_union_optional: Mapped[Optional[UnionType]] + + # py38, 37 does not automatically flatten unions, add extra tests + # for this. maintain these in order to catch future regressions + # in the behavior of ``Union`` + unflat_union_optional_data: Mapped[ + Union[Union[Decimal, float, None], None] + ] = mapped_column() + + float_data: Mapped[float] = mapped_column() + decimal_data: Mapped[Decimal] = mapped_column() + + if compat.py310: + pep604_data: Mapped[float | Decimal] = mapped_column() + pep604_reverse: Mapped[Decimal | float] = mapped_column() + pep604_optional: Mapped[Decimal | float | None] = ( + mapped_column() + ) + pep604_data_fwd: Mapped["float | Decimal"] = mapped_column() + pep604_reverse_fwd: Mapped["Decimal | float"] = mapped_column() + pep604_optional_fwd: Mapped["Decimal | float | None"] = ( + mapped_column() + ) + + info = [ + ("data", False), + ("reverse_data", False), + ("optional_data", True), + ("reverse_optional_data", True), + ("reverse_u_optional_data", True), + ("refer_union", "null" in union.name), + ("refer_union_optional", True), + ("unflat_union_optional_data", True), + ] + if compat.py310: + info += [ + ("pep604_data", False), + ("pep604_reverse", False), + ("pep604_optional", True), + ("pep604_data_fwd", False), + ("pep604_reverse_fwd", False), + ("pep604_optional_fwd", True), + ] + + for name, nullable in info: + col = User.__table__.c[name] + is_(col.type, our_type, name) + is_(col.nullable, nullable, name) + + is_true(isinstance(User.__table__.c.float_data.type, Float)) + ne_(User.__table__.c.float_data.type, our_type) + + is_true(isinstance(User.__table__.c.decimal_data.type, Numeric)) + ne_(User.__table__.c.decimal_data.type, our_type) + + @testing.variation( + "union", [ - "str_extension", - "str_typing", - "generic_extension", - "generic_typing", - "generic_typed_extension", - "generic_typed_typing", + "union", + ("pep604", requires.python310), + ("pep695", requires.python312), ], ) - @testing.requires.python312 - def test_pep695_typealias_as_typemap_keys( - self, decl_base: Type[DeclarativeBase], type_ - ): - """test #10807, #12829""" + def test_optional_in_annotation_map(self, union): + """See issue #11370""" - decl_base.registry.update_type_annotation_map( - { - _UnionPep695: JSON, - _StrPep695: String(30), - _TypingStrPep695: String(30), - _GenericPep695: String(30), - _TypingGenericPep695: String(30), - _GenericPep695Typed: String(30), - _TypingGenericPep695Typed: String(30), - } - ) + global _Json, _JsonPep604, _JsonPep695 + + _JsonPrimitive = Union[str, int, float, bool, None] + _JsonObject = Dict[str, "_Json"] + _JsonArray = List["_Json"] + _Json = Union[_JsonObject, _JsonArray, _JsonPrimitive] + if requires.python310.enabled: + _JsonPrimitivePep604 = str | int | float | bool | None + _JsonObjectPep604 = dict[str, "_JsonPep604"] + _JsonArrayPep604 = list["_JsonPep604"] + _JsonPep604 = ( + _JsonObjectPep604 | _JsonArrayPep604 | _JsonPrimitivePep604 + ) + _JsonPep695 = TypeAliasType("_JsonPep695", _JsonPep604) + + class Base(DeclarativeBase): + if union.union: + type_annotation_map = {_Json: JSON} + elif union.pep604: + type_annotation_map = {_JsonPep604: JSON} + elif union.pep695: + type_annotation_map = {_JsonPep695: JSON} # noqa: F821 + else: + union.fail() + + class A(Base): + __tablename__ = "a" - class Test(decl_base): - __tablename__ = "test" id: Mapped[int] = mapped_column(primary_key=True) - if type_.str_extension: - data: Mapped[_StrPep695] - elif type_.str_typing: - data: Mapped[_TypingStrPep695] - elif type_.generic_extension: - data: Mapped[_GenericPep695] - elif type_.generic_typing: - data: Mapped[_TypingGenericPep695] - elif type_.generic_typed_extension: - data: Mapped[_GenericPep695Typed] - elif type_.generic_typed_typing: - data: Mapped[_TypingGenericPep695Typed] + if union.union: + json1: Mapped[_Json] + json2: Mapped[_Json] = mapped_column(nullable=False) + elif union.pep604: + json1: Mapped[_JsonPep604] + json2: Mapped[_JsonPep604] = mapped_column(nullable=False) + elif union.pep695: + json1: Mapped[_JsonPep695] # noqa: F821 + json2: Mapped[_JsonPep695] = mapped_column( # noqa: F821 + nullable=False + ) else: - type_.fail() - structure: Mapped[_UnionPep695] + union.fail() - eq_(Test.__table__.c.data.type._type_affinity, String) - eq_(Test.__table__.c.data.type.length, 30) - is_(Test.__table__.c.structure.type._type_affinity, JSON) + is_(A.__table__.c.json1.type._type_affinity, JSON) + is_(A.__table__.c.json2.type._type_affinity, JSON) + is_true(A.__table__.c.json1.nullable) + is_false(A.__table__.c.json2.nullable) - def test_pep484_newtypes_as_typemap_keys( - self, decl_base: Type[DeclarativeBase] + @testing.variation( + "option", + [ + "not_optional", + "optional", + "optional_fwd_ref", + "union_none", + ("pep604", testing.requires.python310), + ("pep604_fwd_ref", testing.requires.python310), + ], + ) + @testing.variation("brackets", ["oneset", "twosets"]) + @testing.combinations( + "include_mc_type", "derive_from_anno", argnames="include_mc_type" + ) + def test_optional_styles_nested_brackets( + self, option, brackets, include_mc_type ): - # anno only: global str50, str30, str3050 + """composed types test, includes tests that were added later for + #12207""" - str50 = NewType("str50", str) - str30 = NewType("str30", str) - str3050 = NewType("str30", str50) + class Base(DeclarativeBase): + if testing.requires.python310.enabled: + type_annotation_map = { + Dict[str, Decimal]: JSON, + dict[str, Decimal]: JSON, + Union[List[int], List[str]]: JSON, + list[int] | list[str]: JSON, + } + else: + type_annotation_map = { + Dict[str, Decimal]: JSON, + Union[List[int], List[str]]: JSON, + } - decl_base.registry.update_type_annotation_map( - {str50: String(50), str30: String(30), str3050: String(150)} - ) + if include_mc_type == "include_mc_type": + mc = mapped_column(JSON) + mc2 = mapped_column(JSON) + else: + mc = mapped_column() + mc2 = mapped_column() - class MyClass(decl_base): - __tablename__ = "my_table" + class A(Base): + __tablename__ = "a" - id: Mapped[str50] = mapped_column(primary_key=True) - data_one: Mapped[str30] - data_two: Mapped[str50] - data_three: Mapped[Optional[str30]] - data_four: Mapped[str3050] + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[str] = mapped_column() - eq_(MyClass.__table__.c.data_one.type.length, 30) - is_false(MyClass.__table__.c.data_one.nullable) + if brackets.oneset: + if option.not_optional: + json: Mapped[Dict[str, Decimal]] = mapped_column() # type: ignore # noqa: E501 + if testing.requires.python310.enabled: + json2: Mapped[dict[str, Decimal]] = mapped_column() # type: ignore # noqa: E501 + elif option.optional: + json: Mapped[Optional[Dict[str, Decimal]]] = mc + if testing.requires.python310.enabled: + json2: Mapped[Optional[dict[str, Decimal]]] = mc2 + elif option.optional_fwd_ref: + json: Mapped["Optional[Dict[str, Decimal]]"] = mc + if testing.requires.python310.enabled: + json2: Mapped["Optional[dict[str, Decimal]]"] = mc2 + elif option.union_none: + json: Mapped[Union[Dict[str, Decimal], None]] = mc + json2: Mapped[Union[None, Dict[str, Decimal]]] = mc2 + elif option.pep604: + json: Mapped[dict[str, Decimal] | None] = mc + if testing.requires.python310.enabled: + json2: Mapped[None | dict[str, Decimal]] = mc2 + elif option.pep604_fwd_ref: + json: Mapped["dict[str, Decimal] | None"] = mc + if testing.requires.python310.enabled: + json2: Mapped["None | dict[str, Decimal]"] = mc2 + elif brackets.twosets: + if option.not_optional: + json: Mapped[Union[List[int], List[str]]] = mapped_column() # type: ignore # noqa: E501 + elif option.optional: + json: Mapped[Optional[Union[List[int], List[str]]]] = mc + if testing.requires.python310.enabled: + json2: Mapped[ + Optional[Union[list[int], list[str]]] + ] = mc2 + elif option.optional_fwd_ref: + json: Mapped["Optional[Union[List[int], List[str]]]"] = mc + if testing.requires.python310.enabled: + json2: Mapped[ + "Optional[Union[list[int], list[str]]]" + ] = mc2 + elif option.union_none: + json: Mapped[Union[List[int], List[str], None]] = mc + if testing.requires.python310.enabled: + json2: Mapped[Union[None, list[int], list[str]]] = mc2 + elif option.pep604: + json: Mapped[list[int] | list[str] | None] = mc + json2: Mapped[None | list[int] | list[str]] = mc2 + elif option.pep604_fwd_ref: + json: Mapped["list[int] | list[str] | None"] = mc + json2: Mapped["None | list[int] | list[str]"] = mc2 + else: + brackets.fail() - eq_(MyClass.__table__.c.data_two.type.length, 50) - is_false(MyClass.__table__.c.data_two.nullable) + is_(A.__table__.c.json.type._type_affinity, JSON) + if hasattr(A, "json2"): + is_(A.__table__.c.json2.type._type_affinity, JSON) + if option.not_optional: + is_false(A.__table__.c.json2.nullable) + else: + is_true(A.__table__.c.json2.nullable) - eq_(MyClass.__table__.c.data_three.type.length, 30) - is_true(MyClass.__table__.c.data_three.nullable) + if option.not_optional: + is_false(A.__table__.c.json.nullable) + else: + is_true(A.__table__.c.json.nullable) - eq_(MyClass.__table__.c.data_four.type.length, 150) - is_false(MyClass.__table__.c.data_four.nullable) + @testing.variation("optional", [True, False]) + @testing.variation("provide_type", [True, False]) + @testing.variation("add_to_type_map", [True, False]) + def test_recursive_type( + self, decl_base, optional, provide_type, add_to_type_map + ): + """test #9553""" - def test_newtype_missing_from_map(self, decl_base): - # anno only: global str50 + global T - str50 = NewType("str50", str) + T = Dict[str, Optional["T"]] - if compat.py310: - text = ".*str50" - else: - # NewTypes before 3.10 had a very bad repr - # .new_type at 0x...> - text = ".*NewType.*" + if not provide_type and not add_to_type_map: + with expect_raises_message( + sa_exc.ArgumentError, + r"Could not locate SQLAlchemy.*" r".*ForwardRef\('T'\).*", + ): + + class TypeTest(decl_base): + __tablename__ = "my_table" + + id: Mapped[int] = mapped_column(primary_key=True) + if optional: + type_test: Mapped[Optional[T]] = mapped_column() + else: + type_test: Mapped[T] = mapped_column() - with expect_deprecated( - f"Matching the provided NewType '{text}' on its " - "resolved value without matching it in the " - "type_annotation_map is deprecated; add this type to the " - "type_annotation_map to allow it to match explicitly.", - ): + return - class MyClass(decl_base): + else: + if add_to_type_map: + decl_base.registry.update_type_annotation_map({T: JSON()}) + + class TypeTest(decl_base): __tablename__ = "my_table" id: Mapped[int] = mapped_column(primary_key=True) - data_one: Mapped[str50] - is_true(isinstance(MyClass.data_one.type, String)) + if add_to_type_map: + if optional: + type_test: Mapped[Optional[T]] = mapped_column() + else: + type_test: Mapped[T] = mapped_column() + else: + if optional: + type_test: Mapped[Optional[T]] = mapped_column(JSON()) + else: + type_test: Mapped[T] = mapped_column(JSON()) + + if optional: + is_(TypeTest.__table__.c.type_test.nullable, True) + else: + is_(TypeTest.__table__.c.type_test.nullable, False) + + self.assert_compile( + select(TypeTest), + "SELECT my_table.id, my_table.type_test FROM my_table", + ) class ResolveToEnumTest(fixtures.TestBase, testing.AssertsCompiledSQL): @@ -2906,6 +3033,119 @@ class ResolveToEnumTest(fixtures.TestBase, testing.AssertsCompiledSQL): is_(MyClass.__table__.c.data.type.enum_class, FooEnum) eq_(MyClass.__table__.c.data.type.name, "fooenum") # and not 'enum' + @testing.variation( + "type_", + [ + "literal", + "literal_typing", + "recursive", + "not_literal", + "not_literal_typing", + "generic", + "generic_typing", + "generic_typed", + "generic_typed_typing", + ], + ) + @testing.combinations(True, False, argnames="in_map") + @testing.requires.python312 + def test_pep695_literal_defaults_to_enum( + self, decl_base, type_, in_map, pep_695_types + ): + """test #11305.""" + + def declare(): + class Foo(decl_base): + __tablename__ = "footable" + + id: Mapped[int] = mapped_column(primary_key=True) + if type_.recursive: + status: Mapped[_RecursiveLiteral695] # noqa: F821 + elif type_.literal: + status: Mapped[_Literal695] # noqa: F821 + elif type_.literal_typing: + status: Mapped[_TypingLiteral695] # noqa: F821 + elif type_.not_literal: + status: Mapped[_StrPep695] # noqa: F821 + elif type_.not_literal_typing: + status: Mapped[_TypingStrPep695] # noqa: F821 + elif type_.generic: + status: Mapped[_GenericPep695] # noqa: F821 + elif type_.generic_typing: + status: Mapped[_TypingGenericPep695] # noqa: F821 + elif type_.generic_typed: + status: Mapped[_GenericPep695Typed] # noqa: F821 + elif type_.generic_typed_typing: + status: Mapped[_TypingGenericPep695Typed] # noqa: F821 + else: + type_.fail() + + return Foo + + if in_map: + decl_base.registry.update_type_annotation_map( + { + _Literal695: Enum(enum.Enum), # noqa: F821 + _TypingLiteral695: Enum(enum.Enum), # noqa: F821 + _RecursiveLiteral695: Enum(enum.Enum), # noqa: F821 + _StrPep695: Enum(enum.Enum), # noqa: F821 + _TypingStrPep695: Enum(enum.Enum), # noqa: F821 + _GenericPep695: Enum(enum.Enum), # noqa: F821 + _TypingGenericPep695: Enum(enum.Enum), # noqa: F821 + _GenericPep695Typed: Enum(enum.Enum), # noqa: F821 + _TypingGenericPep695Typed: Enum(enum.Enum), # noqa: F821 + } + ) + if type_.recursive: + with expect_deprecated( + "Mapping recursive TypeAliasType '.+' that resolve to " + "literal to generate an Enum is deprecated. SQLAlchemy " + "2.1 will not support this use case. Please avoid using " + "recursing TypeAliasType", + ): + Foo = declare() + elif type_.literal or type_.literal_typing: + Foo = declare() + else: + with expect_raises_message( + exc.ArgumentError, + "Can't associate TypeAliasType '.+' to an Enum " + "since it's not a direct alias of a Literal. Only " + "aliases in this form `type my_alias = Literal.'a', " + "'b'.` are supported when generating Enums.", + ): + declare() + elif type_.literal or type_.literal_typing: + Foo = declare() + col = Foo.__table__.c.status + is_true(isinstance(col.type, Enum)) + eq_(col.type.enums, ["to-do", "in-progress", "done"]) + is_(col.type.native_enum, False) + elif type_.not_literal or type_.not_literal_typing: + Foo = declare() + col = Foo.__table__.c.status + is_true(isinstance(col.type, String)) + elif type_.recursive: + with expect_deprecated( + "Matching to pep-695 type '_Literal695' in a " + "recursive fashion " + "without the recursed type being present in the " + "type_annotation_map is deprecated; add this type or its " + "recursed value to the type_annotation_map to allow it to " + "match explicitly." + ): + Foo = declare() + else: + with expect_raises_message( + orm_exc.MappedAnnotationError, + r"Could not locate SQLAlchemy Core type when resolving " + r"for Python type " + r"indicated by '.+' inside the Mapped\[\] " + r"annotation for the 'status' attribute", + ): + declare() + return + @testing.variation( "sqltype", [ @@ -3082,117 +3322,6 @@ class ResolveToEnumTest(fixtures.TestBase, testing.AssertsCompiledSQL): is_true(isinstance(Foo.__table__.c.status.type, JSON)) - @testing.variation( - "type_", - [ - "literal", - "literal_typing", - "recursive", - "not_literal", - "not_literal_typing", - "generic", - "generic_typing", - "generic_typed", - "generic_typed_typing", - ], - ) - @testing.combinations(True, False, argnames="in_map") - @testing.requires.python312 - def test_pep695_literal_defaults_to_enum(self, decl_base, type_, in_map): - """test #11305.""" - - def declare(): - class Foo(decl_base): - __tablename__ = "footable" - - id: Mapped[int] = mapped_column(primary_key=True) - if type_.recursive: - status: Mapped[_RecursiveLiteral695] # noqa: F821 - elif type_.literal: - status: Mapped[_Literal695] # noqa: F821 - elif type_.literal_typing: - status: Mapped[_TypingLiteral695] # noqa: F821 - elif type_.not_literal: - status: Mapped[_StrPep695] # noqa: F821 - elif type_.not_literal_typing: - status: Mapped[_TypingStrPep695] # noqa: F821 - elif type_.generic: - status: Mapped[_GenericPep695] # noqa: F821 - elif type_.generic_typing: - status: Mapped[_TypingGenericPep695] # noqa: F821 - elif type_.generic_typed: - status: Mapped[_GenericPep695Typed] # noqa: F821 - elif type_.generic_typed_typing: - status: Mapped[_TypingGenericPep695Typed] # noqa: F821 - else: - type_.fail() - - return Foo - - if in_map: - decl_base.registry.update_type_annotation_map( - { - _Literal695: Enum(enum.Enum), # noqa: F821 - _TypingLiteral695: Enum(enum.Enum), # noqa: F821 - _RecursiveLiteral695: Enum(enum.Enum), # noqa: F821 - _StrPep695: Enum(enum.Enum), # noqa: F821 - _TypingStrPep695: Enum(enum.Enum), # noqa: F821 - _GenericPep695: Enum(enum.Enum), # noqa: F821 - _TypingGenericPep695: Enum(enum.Enum), # noqa: F821 - _GenericPep695Typed: Enum(enum.Enum), # noqa: F821 - _TypingGenericPep695Typed: Enum(enum.Enum), # noqa: F821 - } - ) - if type_.recursive: - with expect_deprecated( - "Mapping recursive TypeAliasType '.+' that resolve to " - "literal to generate an Enum is deprecated. SQLAlchemy " - "2.1 will not support this use case. Please avoid using " - "recursing TypeAliasType", - ): - Foo = declare() - elif type_.literal or type_.literal_typing: - Foo = declare() - else: - with expect_raises_message( - exc.ArgumentError, - "Can't associate TypeAliasType '.+' to an Enum " - "since it's not a direct alias of a Literal. Only " - "aliases in this form `type my_alias = Literal.'a', " - "'b'.` are supported when generating Enums.", - ): - declare() - elif type_.literal or type_.literal_typing: - Foo = declare() - col = Foo.__table__.c.status - is_true(isinstance(col.type, Enum)) - eq_(col.type.enums, ["to-do", "in-progress", "done"]) - is_(col.type.native_enum, False) - elif type_.not_literal or type_.not_literal_typing: - Foo = declare() - col = Foo.__table__.c.status - is_true(isinstance(col.type, String)) - elif type_.recursive: - with expect_deprecated( - "Matching to pep-695 type '_Literal695' in a " - "recursive fashion " - "without the recursed type being present in the " - "type_annotation_map is deprecated; add this type or its " - "recursed value to the type_annotation_map to allow it to " - "match explicitly." - ): - Foo = declare() - else: - with expect_raises_message( - orm_exc.MappedAnnotationError, - r"Could not locate SQLAlchemy Core type when resolving " - r"for Python type " - r"indicated by '.+' inside the Mapped\[\] " - r"annotation for the 'status' attribute", - ): - declare() - return - class MixinTest(fixtures.TestBase, testing.AssertsCompiledSQL): __dialect__ = "default"