From a737e965c7edbda1cd89bd6d4a41ebcd157e9441 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Wed, 3 Sep 2025 21:44:33 -0400 Subject: [PATCH] liberalize pep695 matching after many days of discussion we are moving to liberalize the matching rules used for pep695 to the simple rule that we will resolve a pep695 type to its immediate ``__value__`` without requiring that it be present in the type map, however without any further recursive checks (that is, we will not resolve ``__value__`` of ``__value__``). This allows the vast majority of simple uses of pep695 types to not require entries in the type map, including when the type points to a simple Python type or any type that is present in the type_map. Also supported is resolution of generic pep695 types against the right side, including for Annotated types. The change here in 2.1 will form the base for a revised approach to the RegistryEvents patch for #9832, which will still provide the RegistryEvents.resolve_type_annotation hook. In 2.0, we need to scale back the warnings that are emitted so portions of this patch will also be backported including similar changes to the test suite. Fixes: #12829 Change-Id: Ib6e379793335da3f33f6ca2cd6874a6eaf1e36f4 --- doc/build/changelog/changelog_20.rst | 3 + doc/build/changelog/unreleased_20/12829.rst | 27 + doc/build/orm/declarative_tables.rst | 236 +- lib/sqlalchemy/orm/properties.py | 46 +- lib/sqlalchemy/util/typing.py | 5 +- .../test_tm_future_annotations_sync.py | 3149 +++++++++-------- test/orm/declarative/test_typed_mapping.py | 3149 +++++++++-------- 7 files changed, 3526 insertions(+), 3089 deletions(-) create mode 100644 doc/build/changelog/unreleased_20/12829.rst diff --git a/doc/build/changelog/changelog_20.rst b/doc/build/changelog/changelog_20.rst index c3ec1ed152..aa01eeabc9 100644 --- a/doc/build/changelog/changelog_20.rst +++ b/doc/build/changelog/changelog_20.rst @@ -717,6 +717,9 @@ :tags: bug, orm :tickets: 11955 + .. note:: this change has been revised in version 2.0.44. Simple matches + of ``TypeAliasType`` without a type map entry are no longer deprecated. + Consistently handle ``TypeAliasType`` (defined in PEP 695) obtained with the ``type X = int`` syntax introduced in python 3.12. Now in all cases one such alias must be explicitly added to the type map for it to be usable diff --git a/doc/build/changelog/unreleased_20/12829.rst b/doc/build/changelog/unreleased_20/12829.rst new file mode 100644 index 0000000000..f307545c3a --- /dev/null +++ b/doc/build/changelog/unreleased_20/12829.rst @@ -0,0 +1,27 @@ +.. change:: + :tags: usecase, orm + :tickets: 12829 + + The way ORM Annotated Declarative interprets Python :pep:`695` type aliases + in ``Mapped[]`` annotations has been refined to expand the lookup scheme. A + PEP 695 type can now be resolved based on either its direct presence in + :paramref:`_orm.registry.type_annotation_map` or its immediate resolved + value, as long as a recursive lookup across multiple pep-695 types is not + required for it to resolve. This change reverses part of the restrictions + introduced in 2.0.37 as part of :ticket:`11955`, which deprecated (and + disallowed in 2.1) the ability to resolve any PEP 695 type that was not + explicitly present in :paramref:`_orm.registry.type_annotation_map`. + Recursive lookups of PEP 695 types remains deprecated in 2.0 and disallowed + in version 2.1, as do implicit lookups of ``NewType`` types without an + entry in :paramref:`_orm.registry.type_annotation_map`. + + Additionally, new support has been added for generic PEP 695 aliases that + refer to PEP 593 ``Annotated`` constructs containing + :func:`_orm.mapped_column` configurations. See the sections below for + examples. + + .. seealso:: + + :ref:`orm_declarative_type_map_pep695_types` + + :ref:`orm_declarative_mapped_column_generic_pep593` diff --git a/doc/build/orm/declarative_tables.rst b/doc/build/orm/declarative_tables.rst index 50e68cb174..dff21e776b 100644 --- a/doc/build/orm/declarative_tables.rst +++ b/doc/build/orm/declarative_tables.rst @@ -814,7 +814,6 @@ is described in the next section, :ref:`orm_declarative_type_map_pep695_types`. Support for Type Alias Types (defined by PEP 695) and NewType ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - In contrast to the typing lookup described in :ref:`orm_declarative_type_map_union_types`, Python typing also includes two ways to create a composed type in a more formal way, using ``typing.NewType`` as @@ -823,13 +822,20 @@ differently from ordinary type aliases (i.e. assigning a type to a variable name), and this difference is honored in how SQLAlchemy resolves these types from the type map. -.. versionchanged:: 2.0.37 The behaviors described in this section for ``typing.NewType`` - as well as :pep:`695` ``type`` have been formalized and corrected. - Deprecation warnings are now emitted for "loose matching" patterns that have - worked in some 2.0 releases, but are to be removed in SQLAlchemy 2.1. +.. versionchanged:: 2.0.44 Support for resolving pep-695 types without a + corresponding entry in :paramref:`_orm.registry.type_annotation_map` + has been expanded, reversing part of the restrictions introduced in 2.0.37. Please ensure SQLAlchemy is up to date before attempting to use the features described in this section. +.. versionchanged:: 2.0.37 The behaviors described in this section for ``typing.NewType`` + as well as :pep:`695` ``type`` were formalized to disallow these types + from being implicitly resolvable without entries in + :paramref:`_orm.registry.type_annotation_map`, with deprecation warnings + emitted when these patterns were detected. As of 2.0.44, a pep-695 type + is implicitly resolvable as long as the type it resolves to is present + in the type map. + The typing module allows the creation of "new types" using ``typing.NewType``:: from typing import NewType @@ -837,110 +843,115 @@ The typing module allows the creation of "new types" using ``typing.NewType``:: nstr30 = NewType("nstr30", str) nstr50 = NewType("nstr50", str) -Additionally, in Python 3.12, a new feature defined by :pep:`695` was introduced which -provides the ``type`` keyword to accomplish a similar task; using -``type`` produces an object that is similar in many ways to ``typing.NewType`` -which is internally referred to as ``typing.TypeAliasType``:: +The ``NewType`` construct creates types that are analogous to creating a +subclass of the referenced type. + +Additionally, :pep:`695` introduced in Python 3.12 provides a new ``type`` +keyword for creating type aliases with greater separation of concerns from plain +aliases, as well as succinct support for generics without requiring explicit +use of ``TypeVar`` or ``Generic`` elements. Types created by the ``type`` +keyword are represented at runtime by ``typing.TypeAliasType``:: type SmallInt = int type BigInt = int type JsonScalar = str | float | bool | None -For the purposes of how SQLAlchemy treats these type objects when used -for SQL type lookup inside of :class:`_orm.Mapped`, it's important to note -that Python does not consider two equivalent ``typing.TypeAliasType`` -or ``typing.NewType`` objects to be equal:: - - # two typing.NewType objects are not equal even if they are both str - >>> nstr50 == nstr30 - False - - # two TypeAliasType objects are not equal even if they are both int - >>> SmallInt == BigInt - False - - # an equivalent union is not equal to JsonScalar - >>> JsonScalar == str | float | bool | None - False - -This is the opposite behavior from how ordinary unions are compared, and -informs the correct behavior for SQLAlchemy's ``type_annotation_map``. When -using ``typing.NewType`` or :pep:`695` ``type`` objects, the type object is -expected to be explicit within the ``type_annotation_map`` for it to be matched -from a :class:`_orm.Mapped` type, where the same object must be stated in order -for a match to be made (excluding whether or not the type inside of -:class:`_orm.Mapped` also unions on ``None``). This is distinct from the -behavior described at :ref:`orm_declarative_type_map_union_types`, where a -plain ``Union`` that is referenced directly will match to other ``Unions`` -based on the composition, rather than the object identity, of a particular type -in ``type_annotation_map``. - -In the example below, the composed types for ``nstr30``, ``nstr50``, -``SmallInt``, ``BigInt``, and ``JsonScalar`` have no overlap with each other -and can be named distinctly within each :class:`_orm.Mapped` construct, and -are also all explicit in ``type_annotation_map``. Any of these types may -also be unioned with ``None`` or declared as ``Optional[]`` without affecting -the lookup, only deriving column nullability:: +Both ``NewType`` and pep-695 ``type`` constructs may be used as arguments +within :class:`_orm.Mapped` annotations, where they will be resolved to Python +types using the following rules: - from typing import NewType +* When a ``TypeAliasType`` or ``NewType`` object is present in the + :paramref:`_orm.registry.type_annotation_map`, it will resolve directly:: - from sqlalchemy import SmallInteger, BigInteger, JSON, String - from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column - from sqlalchemy.schema import CreateTable + from typing import NewType + from sqlalchemy import String, BigInteger nstr30 = NewType("nstr30", str) - nstr50 = NewType("nstr50", str) - type SmallInt = int type BigInt = int - type JsonScalar = str | float | bool | None - class TABase(DeclarativeBase): - type_annotation_map = { - nstr30: String(30), - nstr50: String(50), - SmallInt: SmallInteger, - BigInteger: BigInteger, - JsonScalar: JSON, - } + class Base(DeclarativeBase): + type_annotation_map = {nstr30: String(30), BigInt: BigInteger} - class SomeClass(TABase): + class SomeClass(Base): __tablename__ = "some_table" - id: Mapped[int] = mapped_column(primary_key=True) - normal_str: Mapped[str] + # BigInt is in the type_annotation_map. So this + # will resolve to sqlalchemy.BigInteger + id: Mapped[BigInt] = mapped_column(primary_key=True) - short_str: Mapped[nstr30] - long_str_nullable: Mapped[nstr50 | None] + # nstr30 is in the type_annotation_map. So this + # will resolve to sqlalchemy.String(30) + data: Mapped[nstr30] - small_int: Mapped[SmallInt] - big_int: Mapped[BigInteger] - scalar_col: Mapped[JsonScalar] +* A ``TypeAliasType`` that refers **directly** to another type present + in the type map will resolve against that type:: -a CREATE TABLE for the above mapping will illustrate the different variants -of integer and string we've configured, and looks like: + type PlainInt = int -.. sourcecode:: pycon+sql - >>> print(CreateTable(SomeClass.__table__)) - {printsql}CREATE TABLE some_table ( - id INTEGER NOT NULL, - normal_str VARCHAR NOT NULL, - short_str VARCHAR(30) NOT NULL, - long_str_nullable VARCHAR(50), - small_int SMALLINT NOT NULL, - big_int BIGINT NOT NULL, - scalar_col JSON, - PRIMARY KEY (id) - ) + class Base(DeclarativeBase): + pass + + + class SomeClass(Base): + __tablename__ = "some_table" + + # PlainInt refers to int, which is one of the default types + # already in the type_annotation_map. So this + # will resolve to sqlalchemy.Integer via the int type + id: Mapped[PlainInt] = mapped_column(primary_key=True) + +* A ``TypeAliasType`` that refers to another pep-695 ``TypeAliasType`` + not present in the type map will not resolve (emits a deprecation + warning in 2.0), as this would involve a recursive lookup:: + + type PlainInt = int + type AlsoAnInt = PlainInt + + + class Base(DeclarativeBase): + pass + + + class SomeClass(Base): + __tablename__ = "some_table" + + # AlsoAnInt refers to PlainInt, which is not in the type_annotation_map. + # This will emit a deprecation warning in 2.0, will fail in 2.1 + id: Mapped[AlsoAnInt] = mapped_column(primary_key=True) -Regarding nullability, the ``JsonScalar`` type includes ``None`` in its -definition, which indicates a nullable column. Similarly the -``long_str_nullable`` column applies a union of ``None`` to ``nstr50``, -which matches to the ``nstr50`` type in the ``type_annotation_map`` while -also applying nullability to the mapped column. The other columns all remain -NOT NULL as they are not indicated as optional. +* A ``NewType`` that is not in the type map will not resolve (emits a + deprecation warning in 2.0). Since ``NewType`` is analogous to creating an + entirely new type with different semantics than the type it extends, these + must be explicitly matched in the type map:: + + + from typing import NewType + + nstr30 = NewType("nstr30", str) + + + class Base(DeclarativeBase): + pass + + + class SomeClass(Base): + __tablename__ = "some_table" + + # a NewType is a new kind of type, so this will emit a deprecation + # warning in 2.0 and fail in 2.1, as nstr30 is not present + # in the type_annotation_map. + id: Mapped[nstr30] = mapped_column(primary_key=True) + +For all of the above examples, any type that is combined with ``Optional[]`` +or ``| None`` will consider this to indicate the column is nullable, if +no other directive for nullability is present. + +.. seealso:: + + :ref:`orm_declarative_mapped_column_generic_pep593` .. _orm_declarative_mapped_column_type_map_pep593: @@ -1231,6 +1242,57 @@ adding a ``FOREIGN KEY`` constraint as well as substituting will raise a ``NotImplementedError`` exception at runtime, but may be implemented in future releases. + +.. _orm_declarative_mapped_column_generic_pep593: + +Mapping Whole Column Declarations to Generic Python Types +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Using the ``Annotated`` approach from the previous section, we may also +create a generic version that will apply particular :func:`_orm.mapped_column` +elements across many different Python/SQL types in one step. Below +illustrates a plain alias against a generic form of ``Annotated`` that +will apply the ``primary_key=True`` option to any column to which it's applied:: + + from typing import Annotated + from typing import TypeVar + + T = TypeVar("T", bound=Any) + + PrimaryKey = Annotated[T, mapped_column(primary_key=True)] + +The above type can now apply ``primary_key=True`` to any Python type:: + + import uuid + + + class Base(DeclarativeBase): + pass + + + class A(Base): + __tablename__ = "a" + + # will create an Integer primary key + id: Mapped[PrimaryKey[int]] + + + class B(Base): + __tablename__ = "b" + + # will create a UUID primary key + id: Mapped[PrimaryKey[uuid.UUID]] + +The type alias may also be defined equivalently using the pep-695 ``type`` +keyword in Python 3.12 or above:: + + type PrimaryKey[T] = Annotated[T, mapped_column(primary_key=True)] + +.. versionadded:: 2.0.44 Generic pep-695 types may be used with pep-593 + ``Annotated`` elements to create generic types that automatically + deliver :func:`_orm.mapped_column` arguments. + + .. _orm_declarative_mapped_column_enums: Using Python ``Enum`` or pep-586 ``Literal`` types in the type map diff --git a/lib/sqlalchemy/orm/properties.py b/lib/sqlalchemy/orm/properties.py index 5c5cac7ff4..5623e61623 100644 --- a/lib/sqlalchemy/orm/properties.py +++ b/lib/sqlalchemy/orm/properties.py @@ -788,11 +788,19 @@ class MappedColumn( if not self._has_nullable: self.column.nullable = nullable - our_type = de_optionalize_union_types(argument) - find_mapped_in: Tuple[Any, ...] = () our_type_is_pep593 = False raw_pep_593_type = None + raw_pep_695_type = None + + our_type: Any = de_optionalize_union_types(argument) + + if is_pep695(our_type): + raw_pep_695_type = our_type + our_type = de_optionalize_union_types(raw_pep_695_type.__value__) + our_args = get_args(raw_pep_695_type) + if our_args: + our_type = our_type[our_args] if is_pep593(our_type): our_type_is_pep593 = True @@ -802,9 +810,6 @@ class MappedColumn( if nullable: raw_pep_593_type = de_optionalize_union_types(raw_pep_593_type) find_mapped_in = pep_593_components[1:] - elif is_pep695(argument) and is_pep593(argument.__value__): - # do not support nested annotation inside unions ets - find_mapped_in = get_args(argument.__value__)[1:] use_args_from: Optional[MappedColumn[Any]] for elem in find_mapped_in: @@ -900,6 +905,9 @@ class MappedColumn( else: checks = [our_type] + if raw_pep_695_type is not None: + checks.insert(0, raw_pep_695_type) + for check_type in checks: new_sqltype = registry._resolve_type(check_type) if new_sqltype is not None: @@ -914,17 +922,35 @@ class MappedColumn( "attribute Mapped annotation is the SQLAlchemy type " f"{our_type}. Expected a Python type instead" ) - elif is_a_type(our_type): + elif is_a_type(checks[0]): + if len(checks) == 1: + detail = ( + "the type object is not resolvable by the registry" + ) + elif len(checks) == 2: + detail = ( + f"neither '{checks[0]}' nor '{checks[1]}' " + "are resolvable by the registry" + ) + else: + detail = ( + f"""none of { + ", ".join(f"'{t}'" for t in checks) + } """ + "are resolvable by the registry" + ) raise orm_exc.MappedAnnotationError( - "Could not locate SQLAlchemy Core type for Python " - f"type {our_type} inside the {self.column.key!r} " - "attribute Mapped annotation" + "Could not locate SQLAlchemy Core type when resolving " + f"for Python type indicated by '{checks[0]}' inside " + "the " + f"Mapped[] annotation for the {self.column.key!r} " + f"attribute; {detail}" ) else: raise orm_exc.MappedAnnotationError( f"The object provided inside the {self.column.key!r} " "attribute Mapped annotation is not a Python type, " - f"it's the object {our_type!r}. Expected a Python " + f"it's the object {argument!r}. Expected a Python " "type." ) diff --git a/lib/sqlalchemy/util/typing.py b/lib/sqlalchemy/util/typing.py index 439bbb85e7..91a2380109 100644 --- a/lib/sqlalchemy/util/typing.py +++ b/lib/sqlalchemy/util/typing.py @@ -549,8 +549,9 @@ def includes_none(type_: Any) -> bool: def is_a_type(type_: Any) -> bool: return ( isinstance(type_, type) - or hasattr(type_, "__origin__") - or type_.__module__ in ("typing", "typing_extensions") + or get_origin(type_) is not None + or getattr(type_, "__module__", None) + in ("typing", "typing_extensions") or type(type_).__mro__[0].__module__ in ("typing", "typing_extensions") ) diff --git a/test/orm/declarative/test_tm_future_annotations_sync.py b/test/orm/declarative/test_tm_future_annotations_sync.py index 4a3b50596f..960f4c0c9c 100644 --- a/test/orm/declarative/test_tm_future_annotations_sync.py +++ b/test/orm/declarative/test_tm_future_annotations_sync.py @@ -164,6 +164,19 @@ _TypingLiteral695 = TypingTypeAliasType( ) _RecursiveLiteral695 = TypeAliasType("_RecursiveLiteral695", _Literal695) +_GenericPep593TypeAlias = Annotated[TV, mapped_column(info={"hi": "there"})] + +_GenericPep593Pep695 = TypingTypeAliasType( + "_GenericPep593Pep695", + Annotated[TV, mapped_column(info={"hi": "there"})], + type_params=(TV,), +) + +_RecursivePep695Pep593 = TypingTypeAliasType( + "_RecursivePep695Pep593", + Annotated[_TypingStrPep695, mapped_column(info={"hi": "there"})], +) + def expect_annotation_syntax_error(name): return expect_raises_message( @@ -328,31 +341,6 @@ class MappedColumnTest(fixtures.TestBase, testing.AssertsCompiledSQL): assert Child.__mapper__.attrs.parent.strategy.use_get - @testing.combinations( - (BIGINT(),), - (BIGINT,), - (Integer().with_variant(BIGINT, "default")), - (Integer().with_variant(BIGINT(), "default")), - (BIGINT().with_variant(String(), "some_other_dialect")), - ) - def test_type_map_varieties(self, typ): - Base = declarative_base(type_annotation_map={int: typ}) - - class MyClass(Base): - __tablename__ = "mytable" - - id: Mapped[int] = mapped_column(primary_key=True) - x: Mapped[int] - y: Mapped[int] = mapped_column() - z: Mapped[int] = mapped_column(typ) - - self.assert_compile( - CreateTable(MyClass.__table__), - "CREATE TABLE mytable (id BIGINT NOT NULL, " - "x BIGINT NOT NULL, y BIGINT NOT NULL, z BIGINT NOT NULL, " - "PRIMARY KEY (id))", - ) - def test_required_no_arg(self, decl_base): with expect_raises_message( sa_exc.ArgumentError, @@ -608,198 +596,6 @@ class MappedColumnTest(fixtures.TestBase, testing.AssertsCompiledSQL): is_true(User.__table__.c.data.nullable) assert isinstance(User.__table__.c.created_at.type, DateTime) - def test_construct_lhs_type_missing(self, decl_base): - global MyClass - - class MyClass: - pass - - with expect_raises_message( - sa_exc.ArgumentError, - "Could not locate SQLAlchemy Core type for Python type " - ".*MyClass.* inside the 'data' attribute Mapped annotation", - ): - - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[MyClass] = mapped_column() - - @testing.variation( - "argtype", - [ - "type", - "column", - "mapped_column", - "column_class", - "ref_to_type", - "ref_to_column", - ], - ) - def test_construct_lhs_sqlalchemy_type(self, decl_base, argtype): - """test for #12329. - - of note here are all the different messages we have for when the - wrong thing is put into Mapped[], and in fact in #12329 we added - another one. - - This is a lot of different messages, but at the same time they - occur at different places in the interpretation of types. If - we were to centralize all these messages, we'd still likely end up - doing distinct messages for each scenario, so instead we added - a new ArgumentError subclass MappedAnnotationError that provides - some commonality to all of these cases. - - - """ - expect_future_annotations = "annotations" in globals() - - if argtype.type: - with expect_raises_message( - orm_exc.MappedAnnotationError, - # properties.py -> _init_column_for_annotation, type is - # a SQL type - "The type provided inside the 'data' attribute Mapped " - "annotation is the SQLAlchemy type .*BigInteger.*. Expected " - "a Python type instead", - ): - - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[BigInteger] = mapped_column() - - elif argtype.column: - with expect_raises_message( - orm_exc.MappedAnnotationError, - # util.py -> _extract_mapped_subtype - ( - re.escape( - "Could not interpret annotation " - "Mapped[Column('q', BigInteger)]." - ) - if expect_future_annotations - # properties.py -> _init_column_for_annotation, object is - # not a SQL type or a python type, it's just some object - else re.escape( - "The object provided inside the 'data' attribute " - "Mapped annotation is not a Python type, it's the " - "object Column('q', BigInteger(), table=None). " - "Expected a Python type." - ) - ), - ): - - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[Column("q", BigInteger)] = ( # noqa: F821 - mapped_column() - ) - - elif argtype.mapped_column: - with expect_raises_message( - orm_exc.MappedAnnotationError, - # properties.py -> _init_column_for_annotation, object is - # not a SQL type or a python type, it's just some object - # interestingly, this raises at the same point for both - # future annotations mode and legacy annotations mode - r"The object provided inside the 'data' attribute " - "Mapped annotation is not a Python type, it's the object " - r"\. " - "Expected a Python type.", - ): - - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - big_integer: Mapped[int] = mapped_column() - data: Mapped[big_integer] = mapped_column() - - elif argtype.column_class: - with expect_raises_message( - orm_exc.MappedAnnotationError, - # properties.py -> _init_column_for_annotation, type is not - # a SQL type - re.escape( - "Could not locate SQLAlchemy Core type for Python type " - " inside the " - "'data' attribute Mapped annotation" - ), - ): - - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[Column] = mapped_column() - - elif argtype.ref_to_type: - mytype = BigInteger - with expect_raises_message( - orm_exc.MappedAnnotationError, - ( - # decl_base.py -> _exract_mappable_attributes - re.escape( - "Could not resolve all types within mapped " - 'annotation: "Mapped[mytype]"' - ) - if expect_future_annotations - # properties.py -> _init_column_for_annotation, type is - # a SQL type - else re.escape( - "The type provided inside the 'data' attribute Mapped " - "annotation is the SQLAlchemy type " - ". " - "Expected a Python type instead" - ) - ), - ): - - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[mytype] = mapped_column() - - elif argtype.ref_to_column: - mycol = Column("q", BigInteger) - - with expect_raises_message( - orm_exc.MappedAnnotationError, - # decl_base.py -> _exract_mappable_attributes - ( - re.escape( - "Could not resolve all types within mapped " - 'annotation: "Mapped[mycol]"' - ) - if expect_future_annotations - else - # properties.py -> _init_column_for_annotation, object is - # not a SQL type or a python type, it's just some object - re.escape( - "The object provided inside the 'data' attribute " - "Mapped " - "annotation is not a Python type, it's the object " - "Column('q', BigInteger(), table=None). " - "Expected a Python type." - ) - ), - ): - - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[mycol] = mapped_column() - - else: - argtype.fail() - def test_construct_rhs_type_override_lhs(self, decl_base): class Element(decl_base): __tablename__ = "element" @@ -965,1175 +761,1672 @@ class MappedColumnTest(fixtures.TestBase, testing.AssertsCompiledSQL): is_true(User.__table__.c.lnl_rnl._copy().nullable) def test_fwd_refs(self, decl_base: Type[DeclarativeBase]): + # TODO: add an assertion? class MyClass(decl_base): __tablename__ = "my_table" id: Mapped["int"] = mapped_column(primary_key=True) data_one: Mapped["str"] - def test_pep593_types_as_typemap_keys( - self, decl_base: Type[DeclarativeBase] - ): - """neat!!!""" - global str50, str30, opt_str50, opt_str30 + def test_typing_literal_identity(self, decl_base): + """See issue #11820""" - str50 = Annotated[str, 50] - str30 = Annotated[str, 30] - opt_str50 = Optional[str50] - opt_str30 = Optional[str30] + class Foo(decl_base): + __tablename__ = "footable" - decl_base.registry.update_type_annotation_map( - {str50: String(50), str30: String(30)} - ) + id: Mapped[int] = mapped_column(primary_key=True) + t: Mapped[_TypingLiteral] + te: Mapped[_TypingExtensionsLiteral] - class MyClass(decl_base): - __tablename__ = "my_table" + for col in (Foo.__table__.c.t, Foo.__table__.c.te): + is_true(isinstance(col.type, Enum)) + eq_(col.type.enums, ["a", "b"]) + is_(col.type.native_enum, False) - id: Mapped[str50] = mapped_column(primary_key=True) - data_one: Mapped[str30] - data_two: Mapped[opt_str30] - data_three: Mapped[str50] - data_four: Mapped[opt_str50] - data_five: Mapped[str] - data_six: Mapped[Optional[str]] + def test_we_got_all_attrs_test_annotated(self): + argnames = _py_inspect.getfullargspec(mapped_column) + assert _annotated_names_tested.issuperset(argnames.kwonlyargs), ( + f"annotated attributes were not tested: " + f"{set(argnames.kwonlyargs).difference(_annotated_names_tested)}" + ) - eq_(MyClass.__table__.c.data_one.type.length, 30) - is_false(MyClass.__table__.c.data_one.nullable) + @annotated_name_test_cases( + ("sort_order", 100, lambda sort_order: sort_order == 100), + ("nullable", False, lambda column: column.nullable is False), + ( + "active_history", + True, + lambda column_property: column_property.active_history is True, + ), + ( + "deferred", + True, + lambda column_property: column_property.deferred is True, + ), + ( + "deferred", + _NoArg.NO_ARG, + lambda column_property: column_property is None, + ), + ( + "deferred_group", + "mygroup", + lambda column_property: column_property.deferred is True + and column_property.group == "mygroup", + ), + ( + "deferred_raiseload", + True, + lambda column_property: column_property.deferred is True + and column_property.raiseload is True, + ), + ( + "server_default", + "25", + lambda column: column.server_default.arg == "25", + ), + ( + "server_onupdate", + "25", + lambda column: column.server_onupdate.arg == "25", + ), + ( + "default", + 25, + lambda column: column.default.arg == 25, + ), + ( + "insert_default", + 25, + lambda column: column.default.arg == 25, + ), + ( + "onupdate", + 25, + lambda column: column.onupdate.arg == 25, + ), + ("doc", "some doc", lambda column: column.doc == "some doc"), + ( + "comment", + "some comment", + lambda column: column.comment == "some comment", + ), + ("index", True, lambda column: column.index is True), + ("index", _NoArg.NO_ARG, lambda column: column.index is None), + ("index", False, lambda column: column.index is False), + ("unique", True, lambda column: column.unique is True), + ("unique", False, lambda column: column.unique is False), + ("autoincrement", True, lambda column: column.autoincrement is True), + ("system", True, lambda column: column.system is True), + ("primary_key", True, lambda column: column.primary_key is True), + ("type_", BIGINT, lambda column: isinstance(column.type, BIGINT)), + ("info", {"foo": "bar"}, lambda column: column.info == {"foo": "bar"}), + ( + "use_existing_column", + True, + lambda mc: mc._use_existing_column is True, + ), + ( + "quote", + True, + exc.SADeprecationWarning( + "Can't use the 'key' or 'name' arguments in Annotated " + ), + ), + ( + "key", + "mykey", + exc.SADeprecationWarning( + "Can't use the 'key' or 'name' arguments in Annotated " + ), + ), + ( + "name", + "mykey", + exc.SADeprecationWarning( + "Can't use the 'key' or 'name' arguments in Annotated " + ), + ), + ( + "kw_only", + True, + exc.SADeprecationWarning( + "Argument 'kw_only' is a dataclass argument " + ), + ), + ( + "compare", + True, + exc.SADeprecationWarning( + "Argument 'compare' is a dataclass argument " + ), + ), + ( + "default_factory", + lambda: 25, + exc.SADeprecationWarning( + "Argument 'default_factory' is a dataclass argument " + ), + ), + ( + "repr", + True, + exc.SADeprecationWarning( + "Argument 'repr' is a dataclass argument " + ), + ), + ( + "init", + True, + exc.SADeprecationWarning( + "Argument 'init' is a dataclass argument" + ), + ), + ( + "hash", + True, + exc.SADeprecationWarning( + "Argument 'hash' is a dataclass argument" + ), + ), + ( + "dataclass_metadata", + {}, + exc.SADeprecationWarning( + "Argument 'dataclass_metadata' is a dataclass argument" + ), + ), + argnames="argname, argument, assertion", + ) + @testing.variation("use_annotated", [True, False, "control"]) + def test_names_encountered_for_annotated( + self, argname, argument, assertion, use_annotated, decl_base + ): + global myint + + if argument is not _NoArg.NO_ARG: + kw = {argname: argument} + + if argname == "quote": + kw["name"] = "somename" + else: + kw = {} + + is_warning = isinstance(assertion, exc.SADeprecationWarning) + is_dataclass = argname in ( + "kw_only", + "init", + "repr", + "compare", + "default_factory", + "hash", + "dataclass_metadata", + ) + + if is_dataclass: + + class Base(MappedAsDataclass, decl_base): + __abstract__ = True + + else: + Base = decl_base + + if use_annotated.control: + # test in reverse; that kw set on the main mapped_column() takes + # effect when the Annotated is there also and does not have the + # kw + amc = mapped_column() + myint = Annotated[int, amc] + + mc = mapped_column(**kw) + + class User(Base): + __tablename__ = "user" + id: Mapped[int] = mapped_column(primary_key=True) + myname: Mapped[myint] = mc + + elif use_annotated: + amc = mapped_column(**kw) + myint = Annotated[int, amc] + + mc = mapped_column() + + if is_warning: + with expect_deprecated(assertion.args[0]): + + class User(Base): + __tablename__ = "user" + id: Mapped[int] = mapped_column(primary_key=True) + myname: Mapped[myint] = mc + + else: + + class User(Base): + __tablename__ = "user" + id: Mapped[int] = mapped_column(primary_key=True) + myname: Mapped[myint] = mc + + else: + mc = cast(MappedColumn, mapped_column(**kw)) + + mapper_prop = mc.mapper_property_to_assign + column_to_assign, sort_order = mc.columns_to_assign[0] + + if not is_warning: + assert_result = testing.resolve_lambda( + assertion, + sort_order=sort_order, + column_property=mapper_prop, + column=column_to_assign, + mc=mc, + ) + assert assert_result + elif is_dataclass and (not use_annotated or use_annotated.control): + eq_( + getattr(mc._attribute_options, f"dataclasses_{argname}"), + argument, + ) + + @testing.combinations(("index",), ("unique",), argnames="paramname") + @testing.combinations((True,), (False,), (None,), argnames="orig") + @testing.combinations((True,), (False,), (None,), argnames="merging") + def test_index_unique_combinations( + self, paramname, orig, merging, decl_base + ): + """test #11091""" + + global myint + + amc = mapped_column(**{paramname: merging}) + myint = Annotated[int, amc] + + mc = mapped_column(**{paramname: orig}) + + class User(decl_base): + __tablename__ = "user" + id: Mapped[int] = mapped_column(primary_key=True) + myname: Mapped[myint] = mc + + result = getattr(User.__table__.c.myname, paramname) + if orig is None: + is_(result, merging) + else: + is_(result, orig) + + def test_missing_mapped_lhs(self, decl_base): + with expect_annotation_syntax_error("User.name"): + + class User(decl_base): + __tablename__ = "users" + + id: Mapped[int] = mapped_column(primary_key=True) + name: str = mapped_column() # type: ignore + + def test_construct_lhs_separate_name(self, decl_base): + class User(decl_base): + __tablename__ = "users" + + id: Mapped[int] = mapped_column(primary_key=True) + name: Mapped[str] = mapped_column() + data: Mapped[Optional[str]] = mapped_column("the_data") + + self.assert_compile( + select(User.data), "SELECT users.the_data FROM users" + ) + is_true(User.__table__.c.the_data.nullable) + + def test_construct_works_in_expr(self, decl_base): + class User(decl_base): + __tablename__ = "users" + + id: Mapped[int] = mapped_column(primary_key=True) + + class Address(decl_base): + __tablename__ = "addresses" + + id: Mapped[int] = mapped_column(primary_key=True) + user_id: Mapped[int] = mapped_column(ForeignKey("users.id")) + + user = relationship(User, primaryjoin=user_id == User.id) + + self.assert_compile( + select(Address.user_id, User.id).join(Address.user), + "SELECT addresses.user_id, users.id FROM addresses " + "JOIN users ON addresses.user_id = users.id", + ) + + def test_construct_works_as_polymorphic_on(self, decl_base): + class User(decl_base): + __tablename__ = "users" + + id: Mapped[int] = mapped_column(primary_key=True) + type: Mapped[str] = mapped_column() + + __mapper_args__ = {"polymorphic_on": type} + + decl_base.registry.configure() + is_(User.__table__.c.type, User.__mapper__.polymorphic_on) + + def test_construct_works_as_version_id_col(self, decl_base): + class User(decl_base): + __tablename__ = "users" + + id: Mapped[int] = mapped_column(primary_key=True) + version_id: Mapped[int] = mapped_column() + + __mapper_args__ = {"version_id_col": version_id} + + decl_base.registry.configure() + is_(User.__table__.c.version_id, User.__mapper__.version_id_col) + + def test_construct_works_in_deferred(self, decl_base): + class User(decl_base): + __tablename__ = "users" + + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[str] = deferred(mapped_column()) + + self.assert_compile(select(User), "SELECT users.id FROM users") + self.assert_compile( + select(User).options(undefer(User.data)), + "SELECT users.id, users.data FROM users", + ) + + def test_deferred_kw(self, decl_base): + class User(decl_base): + __tablename__ = "users" + + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[str] = mapped_column(deferred=True) + + self.assert_compile(select(User), "SELECT users.id FROM users") + self.assert_compile( + select(User).options(undefer(User.data)), + "SELECT users.id, users.data FROM users", + ) + + +class Pep593InterpretationTests(fixtures.TestBase, testing.AssertsCompiledSQL): + __dialect__ = "default" + + def test_extract_from_pep593(self, decl_base): + global Address + + @dataclasses.dataclass + class Address: + street: str + state: str + zip_: str + + class User(decl_base): + __tablename__ = "user" + + id: Mapped[int] = mapped_column(primary_key=True) + name: Mapped[str] = mapped_column() + + address: Mapped[Annotated[Address, "foo"]] = composite( + mapped_column(), mapped_column(), mapped_column("zip") + ) + + self.assert_compile( + select(User), + 'SELECT "user".id, "user".name, "user".street, ' + '"user".state, "user".zip FROM "user"', + dialect="default", + ) + + def test_pep593_types_as_typemap_keys( + self, decl_base: Type[DeclarativeBase] + ): + """neat!!!""" + global str50, str30, opt_str50, opt_str30 + + str50 = Annotated[str, 50] + str30 = Annotated[str, 30] + opt_str50 = Optional[str50] + opt_str30 = Optional[str30] + + decl_base.registry.update_type_annotation_map( + {str50: String(50), str30: String(30)} + ) + + class MyClass(decl_base): + __tablename__ = "my_table" + + id: Mapped[str50] = mapped_column(primary_key=True) + data_one: Mapped[str30] + data_two: Mapped[opt_str30] + data_three: Mapped[str50] + data_four: Mapped[opt_str50] + data_five: Mapped[str] + data_six: Mapped[Optional[str]] + + eq_(MyClass.__table__.c.data_one.type.length, 30) + is_false(MyClass.__table__.c.data_one.nullable) eq_(MyClass.__table__.c.data_two.type.length, 30) is_true(MyClass.__table__.c.data_two.nullable) eq_(MyClass.__table__.c.data_three.type.length, 50) - def test_plain_typealias_as_typemap_keys( + @testing.variation( + "alias_type", + [ + "none", + "typekeyword", + "typekeyword_unpopulated", + "typealias", + "typekeyword_nested", + ], + ) + @testing.requires.python312 + def test_extract_pep593_from_pep695( + self, decl_base: Type[DeclarativeBase], alias_type + ): + """test #11130""" + if alias_type.typekeyword: + decl_base.registry.update_type_annotation_map( + {strtypalias_keyword: VARCHAR(33)} # noqa: F821 + ) + if alias_type.typekeyword_nested: + decl_base.registry.update_type_annotation_map( + {strtypalias_keyword_nested: VARCHAR(42)} # noqa: F821 + ) + + class MyClass(decl_base): + __tablename__ = "my_table" + + id: Mapped[int] = mapped_column(primary_key=True) + + if alias_type.typekeyword or alias_type.typekeyword_unpopulated: + data_one: Mapped[strtypalias_keyword] # noqa: F821 + elif alias_type.typealias: + data_one: Mapped[strtypalias_ta] # noqa: F821 + elif alias_type.none: + data_one: Mapped[strtypalias_plain] # noqa: F821 + elif alias_type.typekeyword_nested: + data_one: Mapped[strtypalias_keyword_nested] # noqa: F821 + else: + alias_type.fail() + + table = MyClass.__table__ + assert table is not None + + if alias_type.typekeyword_nested: + # a nested annotation is not supported + eq_(MyClass.data_one.expression.info, {}) + else: + eq_(MyClass.data_one.expression.info, {"hi": "there"}) + + if alias_type.typekeyword: + eq_(MyClass.data_one.type.length, 33) + elif alias_type.typekeyword_nested: + eq_(MyClass.data_one.type.length, 42) + else: + eq_(MyClass.data_one.type.length, None) + + @testing.requires.python312 + def test_no_recursive_pep593_from_pep695( + self, decl_base: Type[DeclarativeBase] + ): + def declare(): + class MyClass(decl_base): + __tablename__ = "my_table" + + id: Mapped[int] = mapped_column(primary_key=True) + + data_one: Mapped[_RecursivePep695Pep593] # noqa: F821 + + with expect_raises_message( + orm_exc.MappedAnnotationError, + r"Could not locate SQLAlchemy Core type when resolving for Python " + r"type " + r"indicated by '_RecursivePep695Pep593' inside the Mapped\[\] " + r"annotation for the 'data_one' attribute; none of " + r"'_RecursivePep695Pep593', " + r"'typing.Annotated\[_TypingStrPep695, .*\]', '_TypingStrPep695' " + r"are resolvable by the registry", + ): + declare() + + def test_extract_base_type_from_pep593( self, decl_base: Type[DeclarativeBase] ): - decl_base.registry.update_type_annotation_map( - {_UnionTypeAlias: JSON, _StrTypeAlias: String(30)} - ) + """base type is extracted from an Annotated structure if not otherwise + in the type lookup dictionary""" - class Test(decl_base): - __tablename__ = "test" - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[_StrTypeAlias] - structure: Mapped[_UnionTypeAlias] + class MyClass(decl_base): + __tablename__ = "my_table" - eq_(Test.__table__.c.data.type.length, 30) - is_(Test.__table__.c.structure.type._type_affinity, JSON) + id: Mapped[Annotated[Annotated[int, "q"], "t"]] = mapped_column( + primary_key=True + ) - @testing.variation( - "option", - [ - "plain", - "union", - "union_604", - "null", - "union_null", - "union_null_604", - "optional", - "optional_union", - "optional_union_604", - "union_newtype", - "union_null_newtype", - "union_695", - "union_null_695", - ], - ) - @testing.variation("in_map", ["yes", "no", "value"]) - @testing.requires.python312 - def test_pep695_behavior(self, decl_base, in_map, option): - """Issue #11955""" - global tat + is_(MyClass.__table__.c.id.type._type_affinity, Integer) - if option.plain: - tat = TypeAliasType("tat", str) - elif option.union: - tat = TypeAliasType("tat", Union[str, int]) - elif option.union_604: - tat = TypeAliasType("tat", str | int) - elif option.null: - tat = TypeAliasType("tat", None) - elif option.union_null: - tat = TypeAliasType("tat", Union[str, int, None]) - elif option.union_null_604: - tat = TypeAliasType("tat", str | int | None) - elif option.optional: - tat = TypeAliasType("tat", Optional[str]) - elif option.optional_union: - tat = TypeAliasType("tat", Optional[Union[str, int]]) - elif option.optional_union_604: - tat = TypeAliasType("tat", Optional[str | int]) - elif option.union_newtype: - # this seems to be illegal for typing but "works" - tat = NewType("tat", Union[str, int]) - elif option.union_null_newtype: - # this seems to be illegal for typing but "works" - tat = NewType("tat", Union[str, int, None]) - elif option.union_695: - tat = TypeAliasType("tat", str | int) - elif option.union_null_695: - tat = TypeAliasType("tat", str | int | None) - else: - option.fail() + def test_extract_sqla_from_pep593_not_yet( + self, decl_base: Type[DeclarativeBase] + ): + """https://twitter.com/zzzeek/status/1536693554621341697""" - if in_map.yes: - decl_base.registry.update_type_annotation_map({tat: String(99)}) - elif in_map.value and "newtype" not in option.name: - decl_base.registry.update_type_annotation_map( - {tat.__value__: String(99)} - ) + global SomeRelated - def declare(): - class Test(decl_base): - __tablename__ = "test" - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[tat] + class SomeRelated(decl_base): + __tablename__: ClassVar[Optional[str]] = "some_related" + id: Mapped["int"] = mapped_column(primary_key=True) - return Test.__table__.c.data + with expect_raises_message( + NotImplementedError, + r"Use of the 'Relationship' construct inside of an Annotated " + r"object is not yet supported.", + ): - if in_map.yes: - col = declare() - is_true(isinstance(col.type, String)) - eq_(col.type.length, 99) - nullable = "null" in option.name or "optional" in option.name - eq_(col.nullable, nullable) + class MyClass(decl_base): + __tablename__ = "my_table" + + id: Mapped["int"] = mapped_column(primary_key=True) + data_one: Mapped[Annotated["SomeRelated", relationship()]] + + def test_extract_sqla_from_pep593_plain( + self, decl_base: Type[DeclarativeBase] + ): + """extraction of mapped_column() from the Annotated type + + https://twitter.com/zzzeek/status/1536693554621341697""" + global intpk, strnone, str30nullable + global opt_strnone, opt_str30 + + intpk = Annotated[int, mapped_column(primary_key=True)] + + strnone = Annotated[str, mapped_column()] # str -> NOT NULL + str30nullable = Annotated[ + str, mapped_column(String(30), nullable=True) # nullable -> NULL + ] + opt_strnone = Optional[strnone] # Optional[str] -> NULL + opt_str30 = Optional[str30nullable] # nullable -> NULL + + class MyClass(decl_base): + __tablename__ = "my_table" + + id: Mapped[intpk] + + data_one: Mapped[strnone] + data_two: Mapped[str30nullable] + data_three: Mapped[opt_strnone] + data_four: Mapped[opt_str30] + + class MyOtherClass(decl_base): + __tablename__ = "my_other_table" + + id: Mapped[intpk] + + data_one: Mapped[strnone] + data_two: Mapped[str30nullable] + data_three: Mapped[opt_strnone] + data_four: Mapped[opt_str30] + + for cls in MyClass, MyOtherClass: + table = cls.__table__ + assert table is not None + + is_(table.c.id.primary_key, True) + is_(table.c.id.table, table) + + eq_(table.c.data_one.type.length, None) + eq_(table.c.data_two.type.length, 30) + eq_(table.c.data_three.type.length, None) + + is_false(table.c.data_one.nullable) + is_true(table.c.data_two.nullable) + is_true(table.c.data_three.nullable) + is_true(table.c.data_four.nullable) + + def test_extract_sqla_from_pep593_mixin( + self, decl_base: Type[DeclarativeBase] + ): + """extraction of mapped_column() from the Annotated type + + https://twitter.com/zzzeek/status/1536693554621341697""" + + global intpk, strnone, str30nullable + global opt_strnone, opt_str30 + intpk = Annotated[int, mapped_column(primary_key=True)] + + strnone = Annotated[str, mapped_column()] # str -> NOT NULL + str30nullable = Annotated[ + str, mapped_column(String(30), nullable=True) # nullable -> NULL + ] + opt_strnone = Optional[strnone] # Optional[str] -> NULL + opt_str30 = Optional[str30nullable] # nullable -> NULL + + class HasPk: + id: Mapped[intpk] + + data_one: Mapped[strnone] + data_two: Mapped[str30nullable] + + class MyClass(HasPk, decl_base): + __tablename__ = "my_table" + + data_three: Mapped[opt_strnone] + data_four: Mapped[opt_str30] + + table = MyClass.__table__ + assert table is not None + + is_(table.c.id.primary_key, True) + is_(table.c.id.table, table) + + eq_(table.c.data_one.type.length, None) + eq_(table.c.data_two.type.length, 30) + eq_(table.c.data_three.type.length, None) + + is_false(table.c.data_one.nullable) + is_true(table.c.data_two.nullable) + is_true(table.c.data_three.nullable) + is_true(table.c.data_four.nullable) + + @testing.variation("to_assert", ["ddl", "fkcount", "references"]) + @testing.variation("assign_blank", [True, False]) + def test_extract_fk_col_from_pep593( + self, decl_base: Type[DeclarativeBase], to_assert, assign_blank + ): + global intpk, element_ref + intpk = Annotated[int, mapped_column(primary_key=True)] + element_ref = Annotated[int, mapped_column(ForeignKey("element.id"))] + + class Element(decl_base): + __tablename__ = "element" + + id: Mapped[intpk] + + class RefElementOne(decl_base): + __tablename__ = "refone" + + id: Mapped[intpk] + + if assign_blank: + other_id: Mapped[element_ref] = mapped_column() + else: + other_id: Mapped[element_ref] + + class RefElementTwo(decl_base): + __tablename__ = "reftwo" + + id: Mapped[intpk] + if assign_blank: + some_id: Mapped[element_ref] = mapped_column() + else: + some_id: Mapped[element_ref] + + assert Element.__table__ is not None + assert RefElementOne.__table__ is not None + assert RefElementTwo.__table__ is not None + + if to_assert.fkcount: + # test #9766 + eq_(len(RefElementOne.__table__.c.other_id.foreign_keys), 1) + eq_(len(RefElementTwo.__table__.c.some_id.foreign_keys), 1) + elif to_assert.references: + is_true( + RefElementOne.__table__.c.other_id.references( + Element.__table__.c.id + ) + ) + is_true( + RefElementTwo.__table__.c.some_id.references( + Element.__table__.c.id + ) + ) + elif to_assert.ddl: + self.assert_compile( + CreateTable(RefElementOne.__table__), + "CREATE TABLE refone " + "(id INTEGER NOT NULL, other_id INTEGER NOT NULL, " + "PRIMARY KEY (id), " + "FOREIGN KEY(other_id) REFERENCES element (id))", + ) + self.assert_compile( + CreateTable(RefElementTwo.__table__), + "CREATE TABLE reftwo " + "(id INTEGER NOT NULL, some_id INTEGER NOT NULL, " + "PRIMARY KEY (id), " + "FOREIGN KEY(some_id) REFERENCES element (id))", + ) else: - with expect_raises_message( - orm_exc.MappedAnnotationError, - r"Could not locate SQLAlchemy Core type for Python type .*tat " - "inside the 'data' attribute Mapped annotation", - ): - declare() + to_assert.fail() - @testing.variation( - "type_", - [ - "str_extension", - "str_typing", - "generic_extension", - "generic_typing", - "generic_typed_extension", - "generic_typed_typing", - ], + @testing.combinations( + (collections.abc.Sequence, (str,)), + (collections.abc.MutableSequence, (str,)), + (collections.abc.Mapping, (str, str)), + (collections.abc.MutableMapping, (str, str)), + (typing.Mapping, (str, str)), + (typing.MutableMapping, (str, str)), + (typing.Sequence, (str,)), + (typing.MutableSequence, (str,)), + (list, (str,)), + (List, (str,)), + (dict, (str, str)), + (Dict, (str, str)), + (list, None), + (List, None), + (dict, None), + (Dict, None), + id_="sa", + argnames="container_typ,args", ) - @testing.requires.python312 - def test_pep695_typealias_as_typemap_keys( - self, decl_base: Type[DeclarativeBase], type_ - ): - """test #10807""" + @testing.variation("style", ["pep593", "alias", "direct"]) + def test_extract_composed(self, container_typ, args, style): + """test #9099 (pep593) - decl_base.registry.update_type_annotation_map( - { - _UnionPep695: JSON, - _StrPep695: String(30), - _TypingStrPep695: String(30), - _GenericPep695: String(30), - _TypingGenericPep695: String(30), - _GenericPep695Typed: String(30), - _TypingGenericPep695Typed: String(30), - } - ) + test #11814 - class Test(decl_base): - __tablename__ = "test" - id: Mapped[int] = mapped_column(primary_key=True) - if type_.str_extension: - data: Mapped[_StrPep695] - elif type_.str_typing: - data: Mapped[_TypingStrPep695] - elif type_.generic_extension: - data: Mapped[_GenericPep695] - elif type_.generic_typing: - data: Mapped[_TypingGenericPep695] - elif type_.generic_typed_extension: - data: Mapped[_GenericPep695Typed] - elif type_.generic_typed_typing: - data: Mapped[_TypingGenericPep695Typed] - else: - type_.fail() - structure: Mapped[_UnionPep695] + test #11831, regression from #11814 + """ - eq_(Test.__table__.c.data.type._type_affinity, String) - eq_(Test.__table__.c.data.type.length, 30) - is_(Test.__table__.c.structure.type._type_affinity, JSON) + global TestType - @testing.variation( - "alias_type", - ["none", "typekeyword", "typealias", "typekeyword_nested"], - ) - @testing.requires.python312 - def test_extract_pep593_from_pep695( - self, decl_base: Type[DeclarativeBase], alias_type - ): - """test #11130""" - if alias_type.typekeyword: - decl_base.registry.update_type_annotation_map( - {strtypalias_keyword: VARCHAR(33)} # noqa: F821 - ) - if alias_type.typekeyword_nested: - decl_base.registry.update_type_annotation_map( - {strtypalias_keyword_nested: VARCHAR(42)} # noqa: F821 - ) + if style.pep593: + if args is None: + TestType = Annotated[container_typ, 0] + else: + TestType = Annotated[container_typ[args], 0] + elif style.alias: + if args is None: + TestType = container_typ + else: + TestType = container_typ[args] + elif style.direct: + TestType = container_typ - class MyClass(decl_base): + class Base(DeclarativeBase): + if style.direct: + if args == (str, str): + type_annotation_map = {TestType[str, str]: JSON()} + elif args is None: + type_annotation_map = {TestType: JSON()} + else: + type_annotation_map = {TestType[str]: JSON()} + else: + type_annotation_map = {TestType: JSON()} + + class MyClass(Base): __tablename__ = "my_table" id: Mapped[int] = mapped_column(primary_key=True) - if alias_type.typekeyword: - data_one: Mapped[strtypalias_keyword] # noqa: F821 - elif alias_type.typealias: - data_one: Mapped[strtypalias_ta] # noqa: F821 - elif alias_type.none: - data_one: Mapped[strtypalias_plain] # noqa: F821 - elif alias_type.typekeyword_nested: - data_one: Mapped[strtypalias_keyword_nested] # noqa: F821 + if style.direct: + if args == (str, str): + data: Mapped[TestType[str, str]] = mapped_column() + elif args is None: + data: Mapped[TestType] = mapped_column() + else: + data: Mapped[TestType[str]] = mapped_column() else: - alias_type.fail() + data: Mapped[TestType] = mapped_column() - table = MyClass.__table__ - assert table is not None + is_(MyClass.__table__.c.data.type._type_affinity, JSON) - if alias_type.typekeyword_nested: - # a nested annotation is not supported - eq_(MyClass.data_one.expression.info, {}) - else: - eq_(MyClass.data_one.expression.info, {"hi": "there"}) + @testing.combinations( + ("default", lambda ctx: 10), + ("default", func.foo()), + ("onupdate", lambda ctx: 10), + ("onupdate", func.foo()), + ("server_onupdate", func.foo()), + ("server_default", func.foo()), + ("server_default", Identity()), + ("nullable", True), + ("nullable", False), + ("type", BigInteger()), + ("index", True), + ("unique", True), + argnames="paramname, value", + ) + @testing.combinations(True, False, argnames="optional") + @testing.combinations(True, False, argnames="include_existing_col") + def test_combine_args_from_pep593( + self, + decl_base: Type[DeclarativeBase], + paramname, + value, + include_existing_col, + optional, + ): + global intpk, element_ref + intpk = Annotated[int, mapped_column(primary_key=True)] - if alias_type.typekeyword: - eq_(MyClass.data_one.type.length, 33) - elif alias_type.typekeyword_nested: - eq_(MyClass.data_one.type.length, 42) + args = [] + params = {} + if paramname == "type": + args.append(value) else: - eq_(MyClass.data_one.type.length, None) + params[paramname] = value - @testing.variation( - "type_", - [ - "literal", - "literal_typing", - "recursive", - "not_literal", - "not_literal_typing", - "generic", - "generic_typing", - "generic_typed", - "generic_typed_typing", - ], - ) - @testing.combinations(True, False, argnames="in_map") - @testing.requires.python312 - def test_pep695_literal_defaults_to_enum(self, decl_base, type_, in_map): - """test #11305.""" + element_ref = Annotated[int, mapped_column(*args, **params)] + if optional: + element_ref = Optional[element_ref] - def declare(): - class Foo(decl_base): - __tablename__ = "footable" + class Element(decl_base): + __tablename__ = "element" - id: Mapped[int] = mapped_column(primary_key=True) - if type_.recursive: - status: Mapped[_RecursiveLiteral695] # noqa: F821 - elif type_.literal: - status: Mapped[_Literal695] # noqa: F821 - elif type_.literal_typing: - status: Mapped[_TypingLiteral695] # noqa: F821 - elif type_.not_literal: - status: Mapped[_StrPep695] # noqa: F821 - elif type_.not_literal_typing: - status: Mapped[_TypingStrPep695] # noqa: F821 - elif type_.generic: - status: Mapped[_GenericPep695] # noqa: F821 - elif type_.generic_typing: - status: Mapped[_TypingGenericPep695] # noqa: F821 - elif type_.generic_typed: - status: Mapped[_GenericPep695Typed] # noqa: F821 - elif type_.generic_typed_typing: - status: Mapped[_TypingGenericPep695Typed] # noqa: F821 - else: - type_.fail() + id: Mapped[intpk] + + if include_existing_col: + data: Mapped[element_ref] = mapped_column() + else: + data: Mapped[element_ref] - return Foo + data_col = Element.__table__.c.data + if paramname in ( + "default", + "onupdate", + "server_default", + "server_onupdate", + ): + default = getattr(data_col, paramname) + if default.is_server_default and default.has_argument: + is_(default.arg, value) + is_(default.column, data_col) + elif paramname == "type": + assert type(data_col.type) is type(value) + else: + is_(getattr(data_col, paramname), value) - if in_map: - decl_base.registry.update_type_annotation_map( - { - _Literal695: Enum(enum.Enum), # noqa: F821 - _TypingLiteral695: Enum(enum.Enum), # noqa: F821 - _RecursiveLiteral695: Enum(enum.Enum), # noqa: F821 - _StrPep695: Enum(enum.Enum), # noqa: F821 - _TypingStrPep695: Enum(enum.Enum), # noqa: F821 - _GenericPep695: Enum(enum.Enum), # noqa: F821 - _TypingGenericPep695: Enum(enum.Enum), # noqa: F821 - _GenericPep695Typed: Enum(enum.Enum), # noqa: F821 - _TypingGenericPep695Typed: Enum(enum.Enum), # noqa: F821 - } - ) - if type_.literal or type_.literal_typing: - Foo = declare() - col = Foo.__table__.c.status - is_true(isinstance(col.type, Enum)) - eq_(col.type.enums, ["to-do", "in-progress", "done"]) - is_(col.type.native_enum, False) + # test _copy() for #8410 + is_(getattr(data_col._copy(), paramname), value) + + sd = data_col.server_default + if sd is not None and isinstance(sd, Identity): + if paramname == "nullable" and value: + is_(data_col.nullable, True) else: - with expect_raises_message( - exc.ArgumentError, - "Can't associate TypeAliasType '.+' to an Enum " - "since it's not a direct alias of a Literal. Only " - "aliases in this form `type my_alias = Literal.'a', " - "'b'.` are supported when generating Enums.", - ): - declare() + is_(data_col.nullable, False) + elif paramname != "nullable": + is_(data_col.nullable, optional) else: - with expect_raises_message( - exc.ArgumentError, - "Could not locate SQLAlchemy Core type for Python type " - ".+ inside the 'status' attribute Mapped annotation", - ): - declare() + is_(data_col.nullable, value) - def test_typing_literal_identity(self, decl_base): - """See issue #11820""" + @testing.combinations(True, False, argnames="specify_identity") + @testing.combinations(True, False, None, argnames="specify_nullable") + @testing.combinations(True, False, argnames="optional") + @testing.combinations(True, False, argnames="include_existing_col") + def test_combine_args_from_pep593_identity_nullable( + self, + decl_base: Type[DeclarativeBase], + specify_identity, + specify_nullable, + optional, + include_existing_col, + ): + global intpk, element_ref + intpk = Annotated[int, mapped_column(primary_key=True)] - class Foo(decl_base): - __tablename__ = "footable" + if specify_identity: + args = [Identity()] + else: + args = [] - id: Mapped[int] = mapped_column(primary_key=True) - t: Mapped[_TypingLiteral] - te: Mapped[_TypingExtensionsLiteral] + if specify_nullable is not None: + params = {"nullable": specify_nullable} + else: + params = {} - for col in (Foo.__table__.c.t, Foo.__table__.c.te): - is_true(isinstance(col.type, Enum)) - eq_(col.type.enums, ["a", "b"]) - is_(col.type.native_enum, False) + element_ref = Annotated[int, mapped_column(*args, **params)] + if optional: + element_ref = Optional[element_ref] - def test_we_got_all_attrs_test_annotated(self): - argnames = _py_inspect.getfullargspec(mapped_column) - assert _annotated_names_tested.issuperset(argnames.kwonlyargs), ( - f"annotated attributes were not tested: " - f"{set(argnames.kwonlyargs).difference(_annotated_names_tested)}" - ) + class Element(decl_base): + __tablename__ = "element" - @annotated_name_test_cases( - ("sort_order", 100, lambda sort_order: sort_order == 100), - ("nullable", False, lambda column: column.nullable is False), - ( - "active_history", - True, - lambda column_property: column_property.active_history is True, - ), - ( - "deferred", - True, - lambda column_property: column_property.deferred is True, - ), - ( - "deferred", - _NoArg.NO_ARG, - lambda column_property: column_property is None, - ), - ( - "deferred_group", - "mygroup", - lambda column_property: column_property.deferred is True - and column_property.group == "mygroup", - ), - ( - "deferred_raiseload", - True, - lambda column_property: column_property.deferred is True - and column_property.raiseload is True, - ), - ( - "server_default", - "25", - lambda column: column.server_default.arg == "25", - ), - ( - "server_onupdate", - "25", - lambda column: column.server_onupdate.arg == "25", - ), - ( - "default", - 25, - lambda column: column.default.arg == 25, - ), - ( - "insert_default", - 25, - lambda column: column.default.arg == 25, - ), - ( - "onupdate", - 25, - lambda column: column.onupdate.arg == 25, - ), - ("doc", "some doc", lambda column: column.doc == "some doc"), - ( - "comment", - "some comment", - lambda column: column.comment == "some comment", - ), - ("index", True, lambda column: column.index is True), - ("index", _NoArg.NO_ARG, lambda column: column.index is None), - ("index", False, lambda column: column.index is False), - ("unique", True, lambda column: column.unique is True), - ("unique", False, lambda column: column.unique is False), - ("autoincrement", True, lambda column: column.autoincrement is True), - ("system", True, lambda column: column.system is True), - ("primary_key", True, lambda column: column.primary_key is True), - ("type_", BIGINT, lambda column: isinstance(column.type, BIGINT)), - ("info", {"foo": "bar"}, lambda column: column.info == {"foo": "bar"}), - ( - "use_existing_column", - True, - lambda mc: mc._use_existing_column is True, - ), - ( - "quote", - True, - exc.SADeprecationWarning( - "Can't use the 'key' or 'name' arguments in Annotated " - ), - ), - ( - "key", - "mykey", - exc.SADeprecationWarning( - "Can't use the 'key' or 'name' arguments in Annotated " - ), - ), - ( - "name", - "mykey", - exc.SADeprecationWarning( - "Can't use the 'key' or 'name' arguments in Annotated " - ), - ), - ( - "kw_only", - True, - exc.SADeprecationWarning( - "Argument 'kw_only' is a dataclass argument " - ), - ), - ( - "compare", - True, - exc.SADeprecationWarning( - "Argument 'compare' is a dataclass argument " - ), - ), - ( - "default_factory", - lambda: 25, - exc.SADeprecationWarning( - "Argument 'default_factory' is a dataclass argument " - ), - ), - ( - "repr", - True, - exc.SADeprecationWarning( - "Argument 'repr' is a dataclass argument " - ), - ), - ( - "init", - True, - exc.SADeprecationWarning( - "Argument 'init' is a dataclass argument" - ), - ), - ( - "hash", - True, - exc.SADeprecationWarning( - "Argument 'hash' is a dataclass argument" - ), - ), - ( - "dataclass_metadata", - {}, - exc.SADeprecationWarning( - "Argument 'dataclass_metadata' is a dataclass argument" - ), - ), - argnames="argname, argument, assertion", + id: Mapped[intpk] + + if include_existing_col: + data: Mapped[element_ref] = mapped_column() + else: + data: Mapped[element_ref] + + # test identity + _copy() for #8410 + for col in ( + Element.__table__.c.data, + Element.__table__.c.data._copy(), + ): + if specify_nullable is True: + is_(col.nullable, True) + elif specify_identity: + is_(col.nullable, False) + elif specify_nullable is False: + is_(col.nullable, False) + elif not optional: + is_(col.nullable, False) + else: + is_(col.nullable, True) + + @testing.combinations( + ("default", lambda ctx: 10, lambda ctx: 15), + ("default", func.foo(), func.bar()), + ("onupdate", lambda ctx: 10, lambda ctx: 15), + ("onupdate", func.foo(), func.bar()), + ("server_onupdate", func.foo(), func.bar()), + ("server_default", func.foo(), func.bar()), + ("nullable", True, False), + ("nullable", False, True), + ("type", BigInteger(), Numeric()), + argnames="paramname, value, override_value", ) - @testing.variation("use_annotated", [True, False, "control"]) - def test_names_encountered_for_annotated( - self, argname, argument, assertion, use_annotated, decl_base + def test_dont_combine_args_from_pep593( + self, + decl_base: Type[DeclarativeBase], + paramname, + value, + override_value, ): - global myint - - if argument is not _NoArg.NO_ARG: - kw = {argname: argument} + global intpk, element_ref + intpk = Annotated[int, mapped_column(primary_key=True)] - if argname == "quote": - kw["name"] = "somename" + args = [] + params = {} + override_args = [] + override_params = {} + if paramname == "type": + args.append(value) + override_args.append(override_value) else: - kw = {} + params[paramname] = value + if paramname == "default": + override_params["insert_default"] = override_value + else: + override_params[paramname] = override_value - is_warning = isinstance(assertion, exc.SADeprecationWarning) - is_dataclass = argname in ( - "kw_only", - "init", - "repr", - "compare", - "default_factory", - "hash", - "dataclass_metadata", - ) + element_ref = Annotated[int, mapped_column(*args, **params)] - if is_dataclass: + class Element(decl_base): + __tablename__ = "element" - class Base(MappedAsDataclass, decl_base): - __abstract__ = True + id: Mapped[intpk] - else: - Base = decl_base + data: Mapped[element_ref] = mapped_column( + *override_args, **override_params + ) - if use_annotated.control: - # test in reverse; that kw set on the main mapped_column() takes - # effect when the Annotated is there also and does not have the - # kw - amc = mapped_column() - myint = Annotated[int, amc] + if paramname in ( + "default", + "onupdate", + "server_default", + "server_onupdate", + ): + default = getattr(Element.__table__.c.data, paramname) + is_(default.arg, override_value) + is_(default.column, Element.__table__.c.data) + elif paramname == "type": + assert type(Element.__table__.c.data.type) is type(override_value) + else: + is_(getattr(Element.__table__.c.data, paramname), override_value) - mc = mapped_column(**kw) + def test_use_existing_column_from_pep_593(self, decl_base): + """test #12787""" - class User(Base): - __tablename__ = "user" - id: Mapped[int] = mapped_column(primary_key=True) - myname: Mapped[myint] = mc + global Label + Label = Annotated[ + str, mapped_column(String(20), use_existing_column=True) + ] - elif use_annotated: - amc = mapped_column(**kw) - myint = Annotated[int, amc] + class A(decl_base): + __tablename__ = "table_a" - mc = mapped_column() + id: Mapped[int] = mapped_column(primary_key=True) + discriminator: Mapped[int] - if is_warning: - with expect_deprecated(assertion.args[0]): + __mapper_args__ = { + "polymorphic_on": "discriminator", + "polymorphic_abstract": True, + } - class User(Base): - __tablename__ = "user" - id: Mapped[int] = mapped_column(primary_key=True) - myname: Mapped[myint] = mc + class A_1(A): + label: Mapped[Label] - else: + __mapper_args__ = {"polymorphic_identity": 1} - class User(Base): - __tablename__ = "user" - id: Mapped[int] = mapped_column(primary_key=True) - myname: Mapped[myint] = mc + class A_2(A): + label: Mapped[Label] - else: - mc = cast(MappedColumn, mapped_column(**kw)) + __mapper_args__ = {"polymorphic_identity": 2} - mapper_prop = mc.mapper_property_to_assign - column_to_assign, sort_order = mc.columns_to_assign[0] + is_(A_1.label.property.columns[0], A_2.label.property.columns[0]) - if not is_warning: - assert_result = testing.resolve_lambda( - assertion, - sort_order=sort_order, - column_property=mapper_prop, - column=column_to_assign, - mc=mc, - ) - assert assert_result - elif is_dataclass and (not use_annotated or use_annotated.control): - eq_( - getattr(mc._attribute_options, f"dataclasses_{argname}"), - argument, - ) + eq_(A_1.label.property.columns[0].table, A.__table__) + eq_(A_2.label.property.columns[0].table, A.__table__) - @testing.combinations(("index",), ("unique",), argnames="paramname") - @testing.combinations((True,), (False,), (None,), argnames="orig") - @testing.combinations((True,), (False,), (None,), argnames="merging") - def test_index_unique_combinations( - self, paramname, orig, merging, decl_base + @testing.variation("in_map", [True, False]) + @testing.variation("alias_type", ["plain", "pep695"]) + @testing.requires.python312 + def test_generic_typealias_pep593( + self, decl_base: Type[DeclarativeBase], alias_type: Variation, in_map ): - """test #11091""" - global myint - - amc = mapped_column(**{paramname: merging}) - myint = Annotated[int, amc] + if in_map: + decl_base.registry.update_type_annotation_map( + { + _GenericPep593TypeAlias[str]: VARCHAR(33), + _GenericPep593Pep695[str]: VARCHAR(33), + } + ) - mc = mapped_column(**{paramname: orig}) + class MyClass(decl_base): + __tablename__ = "my_table" - class User(decl_base): - __tablename__ = "user" id: Mapped[int] = mapped_column(primary_key=True) - myname: Mapped[myint] = mc - result = getattr(User.__table__.c.myname, paramname) - if orig is None: - is_(result, merging) + if alias_type.plain: + data_one: Mapped[_GenericPep593TypeAlias[str]] # noqa: F821 + elif alias_type.pep695: + data_one: Mapped[_GenericPep593Pep695[str]] # noqa: F821 + else: + alias_type.fail() + + eq_(MyClass.data_one.expression.info, {"hi": "there"}) + if in_map: + eq_(MyClass.data_one.expression.type.length, 33) else: - is_(result, orig) + eq_(MyClass.data_one.expression.type.length, None) - def test_pep484_newtypes_as_typemap_keys( - self, decl_base: Type[DeclarativeBase] - ): - global str50, str30, str3050 - str50 = NewType("str50", str) - str30 = NewType("str30", str) - str3050 = NewType("str30", str50) +class TypeResolutionTests(fixtures.TestBase, testing.AssertsCompiledSQL): + __dialect__ = "default" - decl_base.registry.update_type_annotation_map( - {str50: String(50), str30: String(30), str3050: String(150)} - ) + @testing.combinations( + (str, types.String), + (Decimal, types.Numeric), + (float, types.Float), + (datetime.datetime, types.DateTime), + (uuid.UUID, types.Uuid), + argnames="pytype_arg,sqltype", + ) + def test_datatype_lookups(self, decl_base, pytype_arg, sqltype): + global pytype + pytype = pytype_arg class MyClass(decl_base): - __tablename__ = "my_table" + __tablename__ = "mytable" + id: Mapped[int] = mapped_column(primary_key=True) - id: Mapped[str50] = mapped_column(primary_key=True) - data_one: Mapped[str30] - data_two: Mapped[str50] - data_three: Mapped[Optional[str30]] - data_four: Mapped[str3050] + data: Mapped[pytype] - eq_(MyClass.__table__.c.data_one.type.length, 30) - is_false(MyClass.__table__.c.data_one.nullable) + assert isinstance(MyClass.__table__.c.data.type, sqltype) - eq_(MyClass.__table__.c.data_two.type.length, 50) - is_false(MyClass.__table__.c.data_two.nullable) + @testing.combinations( + (BIGINT(),), + (BIGINT,), + (Integer().with_variant(BIGINT, "default")), + (Integer().with_variant(BIGINT(), "default")), + (BIGINT().with_variant(String(), "some_other_dialect")), + ) + def test_type_map_varieties(self, typ): + Base = declarative_base(type_annotation_map={int: typ}) - eq_(MyClass.__table__.c.data_three.type.length, 30) - is_true(MyClass.__table__.c.data_three.nullable) + class MyClass(Base): + __tablename__ = "mytable" - eq_(MyClass.__table__.c.data_four.type.length, 150) - is_false(MyClass.__table__.c.data_four.nullable) + id: Mapped[int] = mapped_column(primary_key=True) + x: Mapped[int] + y: Mapped[int] = mapped_column() + z: Mapped[int] = mapped_column(typ) - def test_newtype_missing_from_map(self, decl_base): - global str50 + self.assert_compile( + CreateTable(MyClass.__table__), + "CREATE TABLE mytable (id BIGINT NOT NULL, " + "x BIGINT NOT NULL, y BIGINT NOT NULL, z BIGINT NOT NULL, " + "PRIMARY KEY (id))", + ) - str50 = NewType("str50", str) + def test_dont_ignore_unresolvable(self, decl_base): + """test #8888""" with expect_raises_message( - orm_exc.MappedAnnotationError, - "Could not locate SQLAlchemy Core type for Python type " - ".*str50 inside the 'data_one' attribute Mapped annotation", + sa_exc.ArgumentError, + r"Could not resolve all types within mapped annotation: " + r"\".*Mapped\[.*fake.*\]\". Ensure all types are written " + r"correctly and are imported within the module in use.", ): - class MyClass(decl_base): - __tablename__ = "my_table" + class A(decl_base): + __tablename__ = "a" id: Mapped[int] = mapped_column(primary_key=True) - data_one: Mapped[str50] - - def test_extract_base_type_from_pep593( - self, decl_base: Type[DeclarativeBase] - ): - """base type is extracted from an Annotated structure if not otherwise - in the type lookup dictionary""" - - class MyClass(decl_base): - __tablename__ = "my_table" + data: Mapped["fake"] # noqa - id: Mapped[Annotated[Annotated[int, "q"], "t"]] = mapped_column( - primary_key=True - ) + def test_type_dont_mis_resolve_on_superclass(self): + """test for #8859. - is_(MyClass.__table__.c.id.type._type_affinity, Integer) + For subclasses of a type that's in the map, don't resolve this + by default, even though we do a search through __mro__. - def test_extract_sqla_from_pep593_not_yet( - self, decl_base: Type[DeclarativeBase] - ): - """https://twitter.com/zzzeek/status/1536693554621341697""" + """ + global int_sub - global SomeRelated + class int_sub(int): + pass - class SomeRelated(decl_base): - __tablename__: ClassVar[Optional[str]] = "some_related" - id: Mapped["int"] = mapped_column(primary_key=True) + Base = declarative_base( + type_annotation_map={ + int: Integer, + } + ) with expect_raises_message( - NotImplementedError, - r"Use of the 'Relationship' construct inside of an Annotated " - r"object is not yet supported.", + orm_exc.MappedAnnotationError, + "Could not locate SQLAlchemy Core type", ): - class MyClass(decl_base): - __tablename__ = "my_table" - - id: Mapped["int"] = mapped_column(primary_key=True) - data_one: Mapped[Annotated["SomeRelated", relationship()]] - - def test_extract_sqla_from_pep593_plain( - self, decl_base: Type[DeclarativeBase] - ): - """extraction of mapped_column() from the Annotated type - - https://twitter.com/zzzeek/status/1536693554621341697""" - global intpk, strnone, str30nullable - global opt_strnone, opt_str30 - - intpk = Annotated[int, mapped_column(primary_key=True)] - - strnone = Annotated[str, mapped_column()] # str -> NOT NULL - str30nullable = Annotated[ - str, mapped_column(String(30), nullable=True) # nullable -> NULL - ] - opt_strnone = Optional[strnone] # Optional[str] -> NULL - opt_str30 = Optional[str30nullable] # nullable -> NULL - - class MyClass(decl_base): - __tablename__ = "my_table" + class MyClass(Base): + __tablename__ = "mytable" - id: Mapped[intpk] + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[int_sub] - data_one: Mapped[strnone] - data_two: Mapped[str30nullable] - data_three: Mapped[opt_strnone] - data_four: Mapped[opt_str30] + @testing.variation("dict_key", ["typing", "plain"]) + def test_type_dont_mis_resolve_on_non_generic(self, dict_key): + """test for #8859. - class MyOtherClass(decl_base): - __tablename__ = "my_other_table" + For a specific generic type with arguments, don't do any MRO + lookup. - id: Mapped[intpk] + """ - data_one: Mapped[strnone] - data_two: Mapped[str30nullable] - data_three: Mapped[opt_strnone] - data_four: Mapped[opt_str30] + Base = declarative_base( + type_annotation_map={ + dict: String, + } + ) - for cls in MyClass, MyOtherClass: - table = cls.__table__ - assert table is not None + with expect_raises_message( + sa_exc.ArgumentError, "Could not locate SQLAlchemy Core type" + ): - is_(table.c.id.primary_key, True) - is_(table.c.id.table, table) + class MyClass(Base): + __tablename__ = "mytable" - eq_(table.c.data_one.type.length, None) - eq_(table.c.data_two.type.length, 30) - eq_(table.c.data_three.type.length, None) + id: Mapped[int] = mapped_column(primary_key=True) - is_false(table.c.data_one.nullable) - is_true(table.c.data_two.nullable) - is_true(table.c.data_three.nullable) - is_true(table.c.data_four.nullable) + if dict_key.plain: + data: Mapped[dict[str, str]] + elif dict_key.typing: + data: Mapped[Dict[str, str]] - def test_extract_sqla_from_pep593_mixin( - self, decl_base: Type[DeclarativeBase] - ): - """extraction of mapped_column() from the Annotated type + def test_type_secondary_resolution(self): + class MyString(String): + def _resolve_for_python_type( + self, python_type, matched_type, matched_on_flattened + ): + return String(length=42) - https://twitter.com/zzzeek/status/1536693554621341697""" + Base = declarative_base(type_annotation_map={str: MyString}) - global intpk, strnone, str30nullable - global opt_strnone, opt_str30 - intpk = Annotated[int, mapped_column(primary_key=True)] + class MyClass(Base): + __tablename__ = "mytable" - strnone = Annotated[str, mapped_column()] # str -> NOT NULL - str30nullable = Annotated[ - str, mapped_column(String(30), nullable=True) # nullable -> NULL - ] - opt_strnone = Optional[strnone] # Optional[str] -> NULL - opt_str30 = Optional[str30nullable] # nullable -> NULL + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[str] - class HasPk: - id: Mapped[intpk] + is_true(isinstance(MyClass.__table__.c.data.type, String)) + eq_(MyClass.__table__.c.data.type.length, 42) - data_one: Mapped[strnone] - data_two: Mapped[str30nullable] + def test_construct_lhs_type_missing(self, decl_base): + global MyClass - class MyClass(HasPk, decl_base): - __tablename__ = "my_table" + class MyClass: + pass - data_three: Mapped[opt_strnone] - data_four: Mapped[opt_str30] + with expect_raises_message( + orm_exc.MappedAnnotationError, + "Could not locate SQLAlchemy Core type when resolving for Python " + r"type indicated by '.*class .*MyClass.*' inside the " + r"Mapped\[\] annotation for the 'data' attribute; the type " + "object is not resolvable by the registry", + ): - table = MyClass.__table__ - assert table is not None + class User(decl_base): + __tablename__ = "users" - is_(table.c.id.primary_key, True) - is_(table.c.id.table, table) + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[MyClass] = mapped_column() - eq_(table.c.data_one.type.length, None) - eq_(table.c.data_two.type.length, 30) - eq_(table.c.data_three.type.length, None) + @testing.variation( + "argtype", + [ + "type", + "column", + "mapped_column", + "column_class", + "ref_to_type", + "ref_to_column", + ], + ) + def test_construct_lhs_sqlalchemy_type(self, decl_base, argtype): + """test for #12329. - is_false(table.c.data_one.nullable) - is_true(table.c.data_two.nullable) - is_true(table.c.data_three.nullable) - is_true(table.c.data_four.nullable) + of note here are all the different messages we have for when the + wrong thing is put into Mapped[], and in fact in #12329 we added + another one. - @testing.variation("to_assert", ["ddl", "fkcount", "references"]) - @testing.variation("assign_blank", [True, False]) - def test_extract_fk_col_from_pep593( - self, decl_base: Type[DeclarativeBase], to_assert, assign_blank - ): - global intpk, element_ref - intpk = Annotated[int, mapped_column(primary_key=True)] - element_ref = Annotated[int, mapped_column(ForeignKey("element.id"))] + This is a lot of different messages, but at the same time they + occur at different places in the interpretation of types. If + we were to centralize all these messages, we'd still likely end up + doing distinct messages for each scenario, so instead we added + a new ArgumentError subclass MappedAnnotationError that provides + some commonality to all of these cases. - class Element(decl_base): - __tablename__ = "element" - id: Mapped[intpk] + """ + expect_future_annotations = "annotations" in globals() - class RefElementOne(decl_base): - __tablename__ = "refone" + if argtype.type: + with expect_raises_message( + orm_exc.MappedAnnotationError, + # properties.py -> _init_column_for_annotation, type is + # a SQL type + "The type provided inside the 'data' attribute Mapped " + "annotation is the SQLAlchemy type .*BigInteger.*. Expected " + "a Python type instead", + ): - id: Mapped[intpk] + class User(decl_base): + __tablename__ = "users" - if assign_blank: - other_id: Mapped[element_ref] = mapped_column() - else: - other_id: Mapped[element_ref] + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[BigInteger] = mapped_column() - class RefElementTwo(decl_base): - __tablename__ = "reftwo" + elif argtype.column: + with expect_raises_message( + orm_exc.MappedAnnotationError, + # util.py -> _extract_mapped_subtype + ( + re.escape( + "Could not interpret annotation " + "Mapped[Column('q', BigInteger)]." + ) + if expect_future_annotations + # properties.py -> _init_column_for_annotation, object is + # not a SQL type or a python type, it's just some object + else re.escape( + "The object provided inside the 'data' attribute " + "Mapped annotation is not a Python type, it's the " + "object Column('q', BigInteger(), table=None). " + "Expected a Python type." + ) + ), + ): - id: Mapped[intpk] - if assign_blank: - some_id: Mapped[element_ref] = mapped_column() - else: - some_id: Mapped[element_ref] + class User(decl_base): + __tablename__ = "users" - assert Element.__table__ is not None - assert RefElementOne.__table__ is not None - assert RefElementTwo.__table__ is not None + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[Column("q", BigInteger)] = ( # noqa: F821 + mapped_column() + ) - if to_assert.fkcount: - # test #9766 - eq_(len(RefElementOne.__table__.c.other_id.foreign_keys), 1) - eq_(len(RefElementTwo.__table__.c.some_id.foreign_keys), 1) - elif to_assert.references: - is_true( - RefElementOne.__table__.c.other_id.references( - Element.__table__.c.id - ) - ) - is_true( - RefElementTwo.__table__.c.some_id.references( - Element.__table__.c.id - ) - ) + elif argtype.mapped_column: + with expect_raises_message( + orm_exc.MappedAnnotationError, + # properties.py -> _init_column_for_annotation, object is + # not a SQL type or a python type, it's just some object + # interestingly, this raises at the same point for both + # future annotations mode and legacy annotations mode + r"The object provided inside the 'data' attribute " + "Mapped annotation is not a Python type, it's the object " + r"\. " + "Expected a Python type.", + ): - elif to_assert.ddl: - self.assert_compile( - CreateTable(RefElementOne.__table__), - "CREATE TABLE refone " - "(id INTEGER NOT NULL, other_id INTEGER NOT NULL, " - "PRIMARY KEY (id), " - "FOREIGN KEY(other_id) REFERENCES element (id))", - ) - self.assert_compile( - CreateTable(RefElementTwo.__table__), - "CREATE TABLE reftwo " - "(id INTEGER NOT NULL, some_id INTEGER NOT NULL, " - "PRIMARY KEY (id), " - "FOREIGN KEY(some_id) REFERENCES element (id))", - ) - else: - to_assert.fail() + class User(decl_base): + __tablename__ = "users" - @testing.combinations( - (collections.abc.Sequence, (str,)), - (collections.abc.MutableSequence, (str,)), - (collections.abc.Mapping, (str, str)), - (collections.abc.MutableMapping, (str, str)), - (typing.Mapping, (str, str)), - (typing.MutableMapping, (str, str)), - (typing.Sequence, (str,)), - (typing.MutableSequence, (str,)), - (list, (str,)), - (List, (str,)), - (dict, (str, str)), - (Dict, (str, str)), - (list, None), - (List, None), - (dict, None), - (Dict, None), - id_="sa", - argnames="container_typ,args", - ) - @testing.variation("style", ["pep593", "alias", "direct"]) - def test_extract_composed(self, container_typ, args, style): - """test #9099 (pep593) + id: Mapped[int] = mapped_column(primary_key=True) + big_integer: Mapped[int] = mapped_column() + data: Mapped[big_integer] = mapped_column() - test #11814 + elif argtype.column_class: + with expect_raises_message( + orm_exc.MappedAnnotationError, + # properties.py -> _init_column_for_annotation, type is not + # a SQL type + "Could not locate SQLAlchemy Core type when resolving for " + "Python type indicated by " + r"'.*class .*.Column.*' inside the " + r"Mapped\[\] annotation for the 'data' attribute; the " + "type object is not resolvable by the registry", + ): - test #11831, regression from #11814 - """ + class User(decl_base): + __tablename__ = "users" - global TestType + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[Column] = mapped_column() - if style.pep593: - if args is None: - TestType = Annotated[container_typ, 0] - else: - TestType = Annotated[container_typ[args], 0] - elif style.alias: - if args is None: - TestType = container_typ - else: - TestType = container_typ[args] - elif style.direct: - TestType = container_typ + elif argtype.ref_to_type: + mytype = BigInteger + with expect_raises_message( + orm_exc.MappedAnnotationError, + ( + # decl_base.py -> _exract_mappable_attributes + re.escape( + "Could not resolve all types within mapped " + 'annotation: "Mapped[mytype]"' + ) + if expect_future_annotations + # properties.py -> _init_column_for_annotation, type is + # a SQL type + else re.escape( + "The type provided inside the 'data' attribute Mapped " + "annotation is the SQLAlchemy type " + ". " + "Expected a Python type instead" + ) + ), + ): - class Base(DeclarativeBase): - if style.direct: - if args == (str, str): - type_annotation_map = {TestType[str, str]: JSON()} - elif args is None: - type_annotation_map = {TestType: JSON()} - else: - type_annotation_map = {TestType[str]: JSON()} - else: - type_annotation_map = {TestType: JSON()} + class User(decl_base): + __tablename__ = "users" - class MyClass(Base): - __tablename__ = "my_table" + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[mytype] = mapped_column() - id: Mapped[int] = mapped_column(primary_key=True) + elif argtype.ref_to_column: + mycol = Column("q", BigInteger) - if style.direct: - if args == (str, str): - data: Mapped[TestType[str, str]] = mapped_column() - elif args is None: - data: Mapped[TestType] = mapped_column() - else: - data: Mapped[TestType[str]] = mapped_column() - else: - data: Mapped[TestType] = mapped_column() + with expect_raises_message( + orm_exc.MappedAnnotationError, + # decl_base.py -> _exract_mappable_attributes + ( + re.escape( + "Could not resolve all types within mapped " + 'annotation: "Mapped[mycol]"' + ) + if expect_future_annotations + else + # properties.py -> _init_column_for_annotation, object is + # not a SQL type or a python type, it's just some object + re.escape( + "The object provided inside the 'data' attribute " + "Mapped " + "annotation is not a Python type, it's the object " + "Column('q', BigInteger(), table=None). " + "Expected a Python type." + ) + ), + ): - is_(MyClass.__table__.c.data.type._type_affinity, JSON) + class User(decl_base): + __tablename__ = "users" - @testing.combinations( - ("default", lambda ctx: 10), - ("default", func.foo()), - ("onupdate", lambda ctx: 10), - ("onupdate", func.foo()), - ("server_onupdate", func.foo()), - ("server_default", func.foo()), - ("server_default", Identity()), - ("nullable", True), - ("nullable", False), - ("type", BigInteger()), - ("index", True), - ("unique", True), - argnames="paramname, value", - ) - @testing.combinations(True, False, argnames="optional") - @testing.combinations(True, False, argnames="include_existing_col") - def test_combine_args_from_pep593( - self, - decl_base: Type[DeclarativeBase], - paramname, - value, - include_existing_col, - optional, - ): - global intpk, element_ref - intpk = Annotated[int, mapped_column(primary_key=True)] + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[mycol] = mapped_column() - args = [] - params = {} - if paramname == "type": - args.append(value) else: - params[paramname] = value + argtype.fail() - element_ref = Annotated[int, mapped_column(*args, **params)] - if optional: - element_ref = Optional[element_ref] + def test_plain_typealias_as_typemap_keys( + self, decl_base: Type[DeclarativeBase] + ): + decl_base.registry.update_type_annotation_map( + {_UnionTypeAlias: JSON, _StrTypeAlias: String(30)} + ) - class Element(decl_base): - __tablename__ = "element" + class Test(decl_base): + __tablename__ = "test" + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[_StrTypeAlias] + structure: Mapped[_UnionTypeAlias] - id: Mapped[intpk] + eq_(Test.__table__.c.data.type.length, 30) + is_(Test.__table__.c.structure.type._type_affinity, JSON) + + @testing.variation( + "option", + [ + "plain", + "union", + "union_604", + "null", + "union_null", + "union_null_604", + "optional", + "optional_union", + "optional_union_604", + "union_newtype", + "union_null_newtype", + "union_695", + "union_null_695", + ], + ) + @testing.variation("in_map", ["yes", "no", "value"]) + @testing.requires.python312 + def test_pep695_behavior(self, decl_base, in_map, option): + """Issue #11955; later issue #12829""" - if include_existing_col: - data: Mapped[element_ref] = mapped_column() - else: - data: Mapped[element_ref] + global tat - data_col = Element.__table__.c.data - if paramname in ( - "default", - "onupdate", - "server_default", - "server_onupdate", - ): - default = getattr(data_col, paramname) - if default.is_server_default and default.has_argument: - is_(default.arg, value) - is_(default.column, data_col) - elif paramname == "type": - assert type(data_col.type) is type(value) + if option.plain: + tat = TypeAliasType("tat", str) + elif option.union: + tat = TypeAliasType("tat", Union[str, int]) + elif option.union_604: + tat = TypeAliasType("tat", str | int) + elif option.null: + tat = TypeAliasType("tat", None) + elif option.union_null: + tat = TypeAliasType("tat", Union[str, int, None]) + elif option.union_null_604: + tat = TypeAliasType("tat", str | int | None) + elif option.optional: + tat = TypeAliasType("tat", Optional[str]) + elif option.optional_union: + tat = TypeAliasType("tat", Optional[Union[str, int]]) + elif option.optional_union_604: + tat = TypeAliasType("tat", Optional[str | int]) + elif option.union_newtype: + # this seems to be illegal for typing but "works" + tat = NewType("tat", Union[str, int]) + elif option.union_null_newtype: + # this seems to be illegal for typing but "works" + tat = NewType("tat", Union[str, int, None]) + elif option.union_695: + tat = TypeAliasType("tat", str | int) + elif option.union_null_695: + tat = TypeAliasType("tat", str | int | None) else: - is_(getattr(data_col, paramname), value) + option.fail() - # test _copy() for #8410 - is_(getattr(data_col._copy(), paramname), value) + is_newtype = "newtype" in option.name + if in_map.yes: + decl_base.registry.update_type_annotation_map({tat: String(99)}) + elif in_map.value and not is_newtype: + decl_base.registry.update_type_annotation_map( + {tat.__value__: String(99)} + ) - sd = data_col.server_default - if sd is not None and isinstance(sd, Identity): - if paramname == "nullable" and value: - is_(data_col.nullable, True) - else: - is_(data_col.nullable, False) - elif paramname != "nullable": - is_(data_col.nullable, optional) - else: - is_(data_col.nullable, value) + def declare(): + class Test(decl_base): + __tablename__ = "test" + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[tat] - @testing.combinations(True, False, argnames="specify_identity") - @testing.combinations(True, False, None, argnames="specify_nullable") - @testing.combinations(True, False, argnames="optional") - @testing.combinations(True, False, argnames="include_existing_col") - def test_combine_args_from_pep593_identity_nullable( - self, - decl_base: Type[DeclarativeBase], - specify_identity, - specify_nullable, - optional, - include_existing_col, - ): - global intpk, element_ref - intpk = Annotated[int, mapped_column(primary_key=True)] + return Test.__table__.c.data - if specify_identity: - args = [Identity()] + if in_map.yes or (in_map.value and not is_newtype): + col = declare() + # String(99) inside the type_map + is_true(isinstance(col.type, String)) + eq_(col.type.length, 99) + nullable = "null" in option.name or "optional" in option.name + eq_(col.nullable, nullable) + elif option.plain or option.optional: + col = declare() + # plain string from default lookup + is_true(isinstance(col.type, String)) + eq_(col.type.length, None) + nullable = "null" in option.name or "optional" in option.name + eq_(col.nullable, nullable) else: - args = [] + with expect_raises_message( + orm_exc.MappedAnnotationError, + r"Could not locate SQLAlchemy Core type when resolving " + r"for Python type " + r"indicated by '.*tat' inside the Mapped\[\] " + r"annotation for the 'data' attribute;", + ): + declare() - if specify_nullable is not None: - params = {"nullable": specify_nullable} - else: - params = {} + @testing.variation("in_map", ["yes", "no", "value"]) + @testing.variation("lookup", ["A", "B", "value"]) + def test_recursive_pep695_cases( + self, decl_base, in_map: Variation, lookup: Variation + ): + global A, B + A = TypingTypeAliasType("A", Union[int, float]) + B = TypingTypeAliasType("B", A) - element_ref = Annotated[int, mapped_column(*args, **params)] - if optional: - element_ref = Optional[element_ref] + if in_map.yes: + decl_base.registry.update_type_annotation_map({A: Numeric(10, 5)}) + elif in_map.value: + decl_base.registry.update_type_annotation_map( + {A.__value__: Numeric(10, 5)} + ) - class Element(decl_base): - __tablename__ = "element" + def declare(): + class MyClass(decl_base): + __tablename__ = "my_table" + id: Mapped[int] = mapped_column(primary_key=True) - id: Mapped[intpk] + if lookup.A: + data: Mapped[A] + elif lookup.B: + data: Mapped[B] + elif lookup.value: + data: Mapped[Union[int, float]] + else: + lookup.fail() - if include_existing_col: - data: Mapped[element_ref] = mapped_column() - else: - data: Mapped[element_ref] + return MyClass - # test identity + _copy() for #8410 - for col in ( - Element.__table__.c.data, - Element.__table__.c.data._copy(), + if ( + (in_map.value and lookup.B) + or in_map.no + or (in_map.yes and lookup.value) ): - if specify_nullable is True: - is_(col.nullable, True) - elif specify_identity: - is_(col.nullable, False) - elif specify_nullable is False: - is_(col.nullable, False) - elif not optional: - is_(col.nullable, False) - else: - is_(col.nullable, True) + with expect_raises_message( + orm_exc.MappedAnnotationError, + "Could not locate SQLAlchemy Core type when resolving " + "for Python type indicated by", + ): + declare() + else: + MyClass = declare() + eq_(MyClass.data.expression.type.precision, 10) - @testing.combinations( - ("default", lambda ctx: 10, lambda ctx: 15), - ("default", func.foo(), func.bar()), - ("onupdate", lambda ctx: 10, lambda ctx: 15), - ("onupdate", func.foo(), func.bar()), - ("server_onupdate", func.foo(), func.bar()), - ("server_default", func.foo(), func.bar()), - ("nullable", True, False), - ("nullable", False, True), - ("type", BigInteger(), Numeric()), - argnames="paramname, value, override_value", + @testing.variation( + "type_", + [ + "str_extension", + "str_typing", + "generic_extension", + "generic_typing", + "generic_typed_extension", + "generic_typed_typing", + ], ) - def test_dont_combine_args_from_pep593( - self, - decl_base: Type[DeclarativeBase], - paramname, - value, - override_value, + @testing.requires.python312 + def test_pep695_typealias_as_typemap_keys( + self, decl_base: Type[DeclarativeBase], type_ ): - global intpk, element_ref - intpk = Annotated[int, mapped_column(primary_key=True)] - - args = [] - params = {} - override_args = [] - override_params = {} - if paramname == "type": - args.append(value) - override_args.append(override_value) - else: - params[paramname] = value - if paramname == "default": - override_params["insert_default"] = override_value - else: - override_params[paramname] = override_value + """test #10807, #12829""" - element_ref = Annotated[int, mapped_column(*args, **params)] - - class Element(decl_base): - __tablename__ = "element" + decl_base.registry.update_type_annotation_map( + { + _UnionPep695: JSON, + _StrPep695: String(30), + _TypingStrPep695: String(30), + _GenericPep695: String(30), + _TypingGenericPep695: String(30), + _GenericPep695Typed: String(30), + _TypingGenericPep695Typed: String(30), + } + ) - id: Mapped[intpk] + class Test(decl_base): + __tablename__ = "test" + id: Mapped[int] = mapped_column(primary_key=True) + if type_.str_extension: + data: Mapped[_StrPep695] + elif type_.str_typing: + data: Mapped[_TypingStrPep695] + elif type_.generic_extension: + data: Mapped[_GenericPep695] + elif type_.generic_typing: + data: Mapped[_TypingGenericPep695] + elif type_.generic_typed_extension: + data: Mapped[_GenericPep695Typed] + elif type_.generic_typed_typing: + data: Mapped[_TypingGenericPep695Typed] + else: + type_.fail() + structure: Mapped[_UnionPep695] - data: Mapped[element_ref] = mapped_column( - *override_args, **override_params - ) + eq_(Test.__table__.c.data.type._type_affinity, String) + eq_(Test.__table__.c.data.type.length, 30) + is_(Test.__table__.c.structure.type._type_affinity, JSON) - if paramname in ( - "default", - "onupdate", - "server_default", - "server_onupdate", - ): - default = getattr(Element.__table__.c.data, paramname) - is_(default.arg, override_value) - is_(default.column, Element.__table__.c.data) - elif paramname == "type": - assert type(Element.__table__.c.data.type) is type(override_value) - else: - is_(getattr(Element.__table__.c.data, paramname), override_value) + def test_pep484_newtypes_as_typemap_keys( + self, decl_base: Type[DeclarativeBase] + ): + global str50, str30, str3050 - def test_use_existing_column_from_pep_593(self, decl_base): - """test #12787""" + str50 = NewType("str50", str) + str30 = NewType("str30", str) + str3050 = NewType("str30", str50) - global Label - Label = Annotated[ - str, mapped_column(String(20), use_existing_column=True) - ] + decl_base.registry.update_type_annotation_map( + {str50: String(50), str30: String(30), str3050: String(150)} + ) - class A(decl_base): - __tablename__ = "table_a" + class MyClass(decl_base): + __tablename__ = "my_table" - id: Mapped[int] = mapped_column(primary_key=True) - discriminator: Mapped[int] + id: Mapped[str50] = mapped_column(primary_key=True) + data_one: Mapped[str30] + data_two: Mapped[str50] + data_three: Mapped[Optional[str30]] + data_four: Mapped[str3050] - __mapper_args__ = { - "polymorphic_on": "discriminator", - "polymorphic_abstract": True, - } + eq_(MyClass.__table__.c.data_one.type.length, 30) + is_false(MyClass.__table__.c.data_one.nullable) - class A_1(A): - label: Mapped[Label] + eq_(MyClass.__table__.c.data_two.type.length, 50) + is_false(MyClass.__table__.c.data_two.nullable) - __mapper_args__ = {"polymorphic_identity": 1} + eq_(MyClass.__table__.c.data_three.type.length, 30) + is_true(MyClass.__table__.c.data_three.nullable) - class A_2(A): - label: Mapped[Label] + eq_(MyClass.__table__.c.data_four.type.length, 150) + is_false(MyClass.__table__.c.data_four.nullable) - __mapper_args__ = {"polymorphic_identity": 2} + def test_newtype_missing_from_map(self, decl_base): + global str50 - is_(A_1.label.property.columns[0], A_2.label.property.columns[0]) + str50 = NewType("str50", str) - eq_(A_1.label.property.columns[0].table, A.__table__) - eq_(A_2.label.property.columns[0].table, A.__table__) + with expect_raises_message( + orm_exc.MappedAnnotationError, + "Could not locate SQLAlchemy Core type when resolving for Python " + r"type indicated by '.*.str50' inside the Mapped\[\] annotation " + "for the 'data_one' attribute; the type object is not " + "resolvable by the registry", + ): + + class MyClass(decl_base): + __tablename__ = "my_table" + + id: Mapped[int] = mapped_column(primary_key=True) + data_one: Mapped[str50] @testing.variation( "union", @@ -2342,300 +2635,96 @@ class MappedColumnTest(fixtures.TestBase, testing.AssertsCompiledSQL): json: Mapped[Optional[Union[List[int], List[str]]]] = mc json2: Mapped[Optional[Union[list[int], list[str]]]] = mc2 elif option.optional_fwd_ref: - json: Mapped["Optional[Union[List[int], List[str]]]"] = mc - json2: Mapped["Optional[Union[list[int], list[str]]]"] = ( - mc2 - ) - elif option.union_none: - json: Mapped[Union[List[int], List[str], None]] = mc - json2: Mapped[Union[None, list[int], list[str]]] = mc2 - elif option.pep604: - json: Mapped[list[int] | list[str] | None] = mc - json2: Mapped[None | list[int] | list[str]] = mc2 - elif option.pep604_fwd_ref: - json: Mapped["list[int] | list[str] | None"] = mc - json2: Mapped["None | list[int] | list[str]"] = mc2 - else: - brackets.fail() - - is_(A.__table__.c.json.type._type_affinity, JSON) - if hasattr(A, "json2"): - is_(A.__table__.c.json2.type._type_affinity, JSON) - if option.not_optional: - is_false(A.__table__.c.json2.nullable) - else: - is_true(A.__table__.c.json2.nullable) - - if option.not_optional: - is_false(A.__table__.c.json.nullable) - else: - is_true(A.__table__.c.json.nullable) - - @testing.variation("optional", [True, False]) - @testing.variation("provide_type", [True, False]) - @testing.variation("add_to_type_map", [True, False]) - def test_recursive_type( - self, decl_base, optional, provide_type, add_to_type_map - ): - """test #9553""" - - global T - - T = Dict[str, Optional["T"]] - - if not provide_type and not add_to_type_map: - with expect_raises_message( - sa_exc.ArgumentError, - r"Could not locate SQLAlchemy.*" r".*ForwardRef\('T'\).*", - ): - - class TypeTest(decl_base): - __tablename__ = "my_table" - - id: Mapped[int] = mapped_column(primary_key=True) - if optional: - type_test: Mapped[Optional[T]] = mapped_column() - else: - type_test: Mapped[T] = mapped_column() - - return - - else: - if add_to_type_map: - decl_base.registry.update_type_annotation_map({T: JSON()}) - - class TypeTest(decl_base): - __tablename__ = "my_table" - - id: Mapped[int] = mapped_column(primary_key=True) - - if add_to_type_map: - if optional: - type_test: Mapped[Optional[T]] = mapped_column() - else: - type_test: Mapped[T] = mapped_column() - else: - if optional: - type_test: Mapped[Optional[T]] = mapped_column(JSON()) - else: - type_test: Mapped[T] = mapped_column(JSON()) - - if optional: - is_(TypeTest.__table__.c.type_test.nullable, True) - else: - is_(TypeTest.__table__.c.type_test.nullable, False) - - self.assert_compile( - select(TypeTest), - "SELECT my_table.id, my_table.type_test FROM my_table", - ) - - def test_missing_mapped_lhs(self, decl_base): - with expect_annotation_syntax_error("User.name"): - - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - name: str = mapped_column() # type: ignore - - def test_construct_lhs_separate_name(self, decl_base): - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - name: Mapped[str] = mapped_column() - data: Mapped[Optional[str]] = mapped_column("the_data") - - self.assert_compile( - select(User.data), "SELECT users.the_data FROM users" - ) - is_true(User.__table__.c.the_data.nullable) - - def test_construct_works_in_expr(self, decl_base): - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - - class Address(decl_base): - __tablename__ = "addresses" - - id: Mapped[int] = mapped_column(primary_key=True) - user_id: Mapped[int] = mapped_column(ForeignKey("users.id")) - - user = relationship(User, primaryjoin=user_id == User.id) - - self.assert_compile( - select(Address.user_id, User.id).join(Address.user), - "SELECT addresses.user_id, users.id FROM addresses " - "JOIN users ON addresses.user_id = users.id", - ) - - def test_construct_works_as_polymorphic_on(self, decl_base): - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - type: Mapped[str] = mapped_column() - - __mapper_args__ = {"polymorphic_on": type} - - decl_base.registry.configure() - is_(User.__table__.c.type, User.__mapper__.polymorphic_on) - - def test_construct_works_as_version_id_col(self, decl_base): - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - version_id: Mapped[int] = mapped_column() - - __mapper_args__ = {"version_id_col": version_id} - - decl_base.registry.configure() - is_(User.__table__.c.version_id, User.__mapper__.version_id_col) - - def test_construct_works_in_deferred(self, decl_base): - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[str] = deferred(mapped_column()) - - self.assert_compile(select(User), "SELECT users.id FROM users") - self.assert_compile( - select(User).options(undefer(User.data)), - "SELECT users.id, users.data FROM users", - ) - - def test_deferred_kw(self, decl_base): - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[str] = mapped_column(deferred=True) - - self.assert_compile(select(User), "SELECT users.id FROM users") - self.assert_compile( - select(User).options(undefer(User.data)), - "SELECT users.id, users.data FROM users", - ) - - @testing.combinations( - (str, types.String), - (Decimal, types.Numeric), - (float, types.Float), - (datetime.datetime, types.DateTime), - (uuid.UUID, types.Uuid), - argnames="pytype_arg,sqltype", - ) - def test_datatype_lookups(self, decl_base, pytype_arg, sqltype): - global pytype - pytype = pytype_arg - - class MyClass(decl_base): - __tablename__ = "mytable" - id: Mapped[int] = mapped_column(primary_key=True) - - data: Mapped[pytype] - - assert isinstance(MyClass.__table__.c.data.type, sqltype) + json: Mapped["Optional[Union[List[int], List[str]]]"] = mc + json2: Mapped["Optional[Union[list[int], list[str]]]"] = ( + mc2 + ) + elif option.union_none: + json: Mapped[Union[List[int], List[str], None]] = mc + json2: Mapped[Union[None, list[int], list[str]]] = mc2 + elif option.pep604: + json: Mapped[list[int] | list[str] | None] = mc + json2: Mapped[None | list[int] | list[str]] = mc2 + elif option.pep604_fwd_ref: + json: Mapped["list[int] | list[str] | None"] = mc + json2: Mapped["None | list[int] | list[str]"] = mc2 + else: + brackets.fail() - def test_dont_ignore_unresolvable(self, decl_base): - """test #8888""" + is_(A.__table__.c.json.type._type_affinity, JSON) + if hasattr(A, "json2"): + is_(A.__table__.c.json2.type._type_affinity, JSON) + if option.not_optional: + is_false(A.__table__.c.json2.nullable) + else: + is_true(A.__table__.c.json2.nullable) - with expect_raises_message( - sa_exc.ArgumentError, - r"Could not resolve all types within mapped annotation: " - r"\".*Mapped\[.*fake.*\]\". Ensure all types are written " - r"correctly and are imported within the module in use.", - ): + if option.not_optional: + is_false(A.__table__.c.json.nullable) + else: + is_true(A.__table__.c.json.nullable) - class A(decl_base): - __tablename__ = "a" + @testing.variation("optional", [True, False]) + @testing.variation("provide_type", [True, False]) + @testing.variation("add_to_type_map", [True, False]) + def test_recursive_type( + self, decl_base, optional, provide_type, add_to_type_map + ): + """test #9553""" - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped["fake"] # noqa + global T - def test_type_dont_mis_resolve_on_superclass(self): - """test for #8859. + T = Dict[str, Optional["T"]] - For subclasses of a type that's in the map, don't resolve this - by default, even though we do a search through __mro__. + if not provide_type and not add_to_type_map: + with expect_raises_message( + sa_exc.ArgumentError, + r"Could not locate SQLAlchemy.*" r".*ForwardRef\('T'\).*", + ): - """ - global int_sub + class TypeTest(decl_base): + __tablename__ = "my_table" - class int_sub(int): - pass + id: Mapped[int] = mapped_column(primary_key=True) + if optional: + type_test: Mapped[Optional[T]] = mapped_column() + else: + type_test: Mapped[T] = mapped_column() - Base = declarative_base( - type_annotation_map={ - int: Integer, - } - ) + return - with expect_raises_message( - orm_exc.MappedAnnotationError, - "Could not locate SQLAlchemy Core type", - ): + else: + if add_to_type_map: + decl_base.registry.update_type_annotation_map({T: JSON()}) - class MyClass(Base): - __tablename__ = "mytable" + class TypeTest(decl_base): + __tablename__ = "my_table" id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[int_sub] - - @testing.variation("dict_key", ["typing", "plain"]) - def test_type_dont_mis_resolve_on_non_generic(self, dict_key): - """test for #8859. - For a specific generic type with arguments, don't do any MRO - lookup. + if add_to_type_map: + if optional: + type_test: Mapped[Optional[T]] = mapped_column() + else: + type_test: Mapped[T] = mapped_column() + else: + if optional: + type_test: Mapped[Optional[T]] = mapped_column(JSON()) + else: + type_test: Mapped[T] = mapped_column(JSON()) - """ + if optional: + is_(TypeTest.__table__.c.type_test.nullable, True) + else: + is_(TypeTest.__table__.c.type_test.nullable, False) - Base = declarative_base( - type_annotation_map={ - dict: String, - } + self.assert_compile( + select(TypeTest), + "SELECT my_table.id, my_table.type_test FROM my_table", ) - with expect_raises_message( - sa_exc.ArgumentError, "Could not locate SQLAlchemy Core type" - ): - - class MyClass(Base): - __tablename__ = "mytable" - - id: Mapped[int] = mapped_column(primary_key=True) - - if dict_key.plain: - data: Mapped[dict[str, str]] - elif dict_key.typing: - data: Mapped[Dict[str, str]] - - def test_type_secondary_resolution(self): - class MyString(String): - def _resolve_for_python_type( - self, python_type, matched_type, matched_on_flattened - ): - return String(length=42) - - Base = declarative_base(type_annotation_map={str: MyString}) - - class MyClass(Base): - __tablename__ = "mytable" - - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[str] - - is_true(isinstance(MyClass.__table__.c.data.type, String)) - eq_(MyClass.__table__.c.data.type.length, 42) - -class EnumOrLiteralTypeMapTest(fixtures.TestBase, testing.AssertsCompiledSQL): +class ResolveToEnumTest(fixtures.TestBase, testing.AssertsCompiledSQL): __dialect__ = "default" @testing.variation("use_explicit_name", [True, False]) @@ -2766,6 +2855,102 @@ class EnumOrLiteralTypeMapTest(fixtures.TestBase, testing.AssertsCompiledSQL): is_(MyClass.__table__.c.data.type.enum_class, FooEnum) eq_(MyClass.__table__.c.data.type.name, "fooenum") # and not 'enum' + @testing.variation( + "type_", + [ + "literal", + "literal_typing", + "recursive", + "not_literal", + "not_literal_typing", + "generic", + "generic_typing", + "generic_typed", + "generic_typed_typing", + ], + ) + @testing.combinations(True, False, argnames="in_map") + @testing.requires.python312 + def test_pep695_literal_defaults_to_enum(self, decl_base, type_, in_map): + """test #11305.""" + + def declare(): + class Foo(decl_base): + __tablename__ = "footable" + + id: Mapped[int] = mapped_column(primary_key=True) + if type_.recursive: + status: Mapped[_RecursiveLiteral695] # noqa: F821 + elif type_.literal: + status: Mapped[_Literal695] # noqa: F821 + elif type_.literal_typing: + status: Mapped[_TypingLiteral695] # noqa: F821 + elif type_.not_literal: + status: Mapped[_StrPep695] # noqa: F821 + elif type_.not_literal_typing: + status: Mapped[_TypingStrPep695] # noqa: F821 + elif type_.generic: + status: Mapped[_GenericPep695] # noqa: F821 + elif type_.generic_typing: + status: Mapped[_TypingGenericPep695] # noqa: F821 + elif type_.generic_typed: + status: Mapped[_GenericPep695Typed] # noqa: F821 + elif type_.generic_typed_typing: + status: Mapped[_TypingGenericPep695Typed] # noqa: F821 + else: + type_.fail() + + return Foo + + if in_map: + decl_base.registry.update_type_annotation_map( + { + _Literal695: Enum(enum.Enum), # noqa: F821 + _TypingLiteral695: Enum(enum.Enum), # noqa: F821 + _RecursiveLiteral695: Enum(enum.Enum), # noqa: F821 + _StrPep695: Enum(enum.Enum), # noqa: F821 + _TypingStrPep695: Enum(enum.Enum), # noqa: F821 + _GenericPep695: Enum(enum.Enum), # noqa: F821 + _TypingGenericPep695: Enum(enum.Enum), # noqa: F821 + _GenericPep695Typed: Enum(enum.Enum), # noqa: F821 + _TypingGenericPep695Typed: Enum(enum.Enum), # noqa: F821 + } + ) + if type_.literal or type_.literal_typing: + Foo = declare() + col = Foo.__table__.c.status + is_true(isinstance(col.type, Enum)) + eq_(col.type.enums, ["to-do", "in-progress", "done"]) + is_(col.type.native_enum, False) + else: + with expect_raises_message( + exc.ArgumentError, + "Can't associate TypeAliasType '.+' to an Enum " + "since it's not a direct alias of a Literal. Only " + "aliases in this form `type my_alias = Literal.'a', " + "'b'.` are supported when generating Enums.", + ): + declare() + elif type_.literal or type_.literal_typing: + Foo = declare() + col = Foo.__table__.c.status + is_true(isinstance(col.type, Enum)) + eq_(col.type.enums, ["to-do", "in-progress", "done"]) + is_(col.type.native_enum, False) + elif type_.not_literal or type_.not_literal_typing: + Foo = declare() + col = Foo.__table__.c.status + is_true(isinstance(col.type, String)) + else: + with expect_raises_message( + orm_exc.MappedAnnotationError, + r"Could not locate SQLAlchemy Core type when resolving " + r"for Python type " + r"indicated by '.+' inside the Mapped\[\] " + r"annotation for the 'status' attribute", + ): + declare() + @testing.variation( "sqltype", [ @@ -3845,32 +4030,6 @@ class CompositeTest(fixtures.TestBase, testing.AssertsCompiledSQL): mapped_column(), mapped_column(), mapped_column("zip") ) - def test_extract_from_pep593(self, decl_base): - global Address - - @dataclasses.dataclass - class Address: - street: str - state: str - zip_: str - - class User(decl_base): - __tablename__ = "user" - - id: Mapped[int] = mapped_column(primary_key=True) - name: Mapped[str] = mapped_column() - - address: Mapped[Annotated[Address, "foo"]] = composite( - mapped_column(), mapped_column(), mapped_column("zip") - ) - - self.assert_compile( - select(User), - 'SELECT "user".id, "user".name, "user".street, ' - '"user".state, "user".zip FROM "user"', - dialect="default", - ) - def test_cls_not_composite_compliant(self, decl_base): global Address diff --git a/test/orm/declarative/test_typed_mapping.py b/test/orm/declarative/test_typed_mapping.py index fda7fa25fd..0aa0ded6a0 100644 --- a/test/orm/declarative/test_typed_mapping.py +++ b/test/orm/declarative/test_typed_mapping.py @@ -155,6 +155,19 @@ _TypingLiteral695 = TypingTypeAliasType( ) _RecursiveLiteral695 = TypeAliasType("_RecursiveLiteral695", _Literal695) +_GenericPep593TypeAlias = Annotated[TV, mapped_column(info={"hi": "there"})] + +_GenericPep593Pep695 = TypingTypeAliasType( + "_GenericPep593Pep695", + Annotated[TV, mapped_column(info={"hi": "there"})], + type_params=(TV,), +) + +_RecursivePep695Pep593 = TypingTypeAliasType( + "_RecursivePep695Pep593", + Annotated[_TypingStrPep695, mapped_column(info={"hi": "there"})], +) + def expect_annotation_syntax_error(name): return expect_raises_message( @@ -319,31 +332,6 @@ class MappedColumnTest(fixtures.TestBase, testing.AssertsCompiledSQL): assert Child.__mapper__.attrs.parent.strategy.use_get - @testing.combinations( - (BIGINT(),), - (BIGINT,), - (Integer().with_variant(BIGINT, "default")), - (Integer().with_variant(BIGINT(), "default")), - (BIGINT().with_variant(String(), "some_other_dialect")), - ) - def test_type_map_varieties(self, typ): - Base = declarative_base(type_annotation_map={int: typ}) - - class MyClass(Base): - __tablename__ = "mytable" - - id: Mapped[int] = mapped_column(primary_key=True) - x: Mapped[int] - y: Mapped[int] = mapped_column() - z: Mapped[int] = mapped_column(typ) - - self.assert_compile( - CreateTable(MyClass.__table__), - "CREATE TABLE mytable (id BIGINT NOT NULL, " - "x BIGINT NOT NULL, y BIGINT NOT NULL, z BIGINT NOT NULL, " - "PRIMARY KEY (id))", - ) - def test_required_no_arg(self, decl_base): with expect_raises_message( sa_exc.ArgumentError, @@ -599,198 +587,6 @@ class MappedColumnTest(fixtures.TestBase, testing.AssertsCompiledSQL): is_true(User.__table__.c.data.nullable) assert isinstance(User.__table__.c.created_at.type, DateTime) - def test_construct_lhs_type_missing(self, decl_base): - # anno only: global MyClass - - class MyClass: - pass - - with expect_raises_message( - sa_exc.ArgumentError, - "Could not locate SQLAlchemy Core type for Python type " - ".*MyClass.* inside the 'data' attribute Mapped annotation", - ): - - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[MyClass] = mapped_column() - - @testing.variation( - "argtype", - [ - "type", - "column", - "mapped_column", - "column_class", - "ref_to_type", - "ref_to_column", - ], - ) - def test_construct_lhs_sqlalchemy_type(self, decl_base, argtype): - """test for #12329. - - of note here are all the different messages we have for when the - wrong thing is put into Mapped[], and in fact in #12329 we added - another one. - - This is a lot of different messages, but at the same time they - occur at different places in the interpretation of types. If - we were to centralize all these messages, we'd still likely end up - doing distinct messages for each scenario, so instead we added - a new ArgumentError subclass MappedAnnotationError that provides - some commonality to all of these cases. - - - """ - expect_future_annotations = "annotations" in globals() - - if argtype.type: - with expect_raises_message( - orm_exc.MappedAnnotationError, - # properties.py -> _init_column_for_annotation, type is - # a SQL type - "The type provided inside the 'data' attribute Mapped " - "annotation is the SQLAlchemy type .*BigInteger.*. Expected " - "a Python type instead", - ): - - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[BigInteger] = mapped_column() - - elif argtype.column: - with expect_raises_message( - orm_exc.MappedAnnotationError, - # util.py -> _extract_mapped_subtype - ( - re.escape( - "Could not interpret annotation " - "Mapped[Column('q', BigInteger)]." - ) - if expect_future_annotations - # properties.py -> _init_column_for_annotation, object is - # not a SQL type or a python type, it's just some object - else re.escape( - "The object provided inside the 'data' attribute " - "Mapped annotation is not a Python type, it's the " - "object Column('q', BigInteger(), table=None). " - "Expected a Python type." - ) - ), - ): - - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[Column("q", BigInteger)] = ( # noqa: F821 - mapped_column() - ) - - elif argtype.mapped_column: - with expect_raises_message( - orm_exc.MappedAnnotationError, - # properties.py -> _init_column_for_annotation, object is - # not a SQL type or a python type, it's just some object - # interestingly, this raises at the same point for both - # future annotations mode and legacy annotations mode - r"The object provided inside the 'data' attribute " - "Mapped annotation is not a Python type, it's the object " - r"\. " - "Expected a Python type.", - ): - - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - big_integer: Mapped[int] = mapped_column() - data: Mapped[big_integer] = mapped_column() - - elif argtype.column_class: - with expect_raises_message( - orm_exc.MappedAnnotationError, - # properties.py -> _init_column_for_annotation, type is not - # a SQL type - re.escape( - "Could not locate SQLAlchemy Core type for Python type " - " inside the " - "'data' attribute Mapped annotation" - ), - ): - - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[Column] = mapped_column() - - elif argtype.ref_to_type: - mytype = BigInteger - with expect_raises_message( - orm_exc.MappedAnnotationError, - ( - # decl_base.py -> _exract_mappable_attributes - re.escape( - "Could not resolve all types within mapped " - 'annotation: "Mapped[mytype]"' - ) - if expect_future_annotations - # properties.py -> _init_column_for_annotation, type is - # a SQL type - else re.escape( - "The type provided inside the 'data' attribute Mapped " - "annotation is the SQLAlchemy type " - ". " - "Expected a Python type instead" - ) - ), - ): - - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[mytype] = mapped_column() - - elif argtype.ref_to_column: - mycol = Column("q", BigInteger) - - with expect_raises_message( - orm_exc.MappedAnnotationError, - # decl_base.py -> _exract_mappable_attributes - ( - re.escape( - "Could not resolve all types within mapped " - 'annotation: "Mapped[mycol]"' - ) - if expect_future_annotations - else - # properties.py -> _init_column_for_annotation, object is - # not a SQL type or a python type, it's just some object - re.escape( - "The object provided inside the 'data' attribute " - "Mapped " - "annotation is not a Python type, it's the object " - "Column('q', BigInteger(), table=None). " - "Expected a Python type." - ) - ), - ): - - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[mycol] = mapped_column() - - else: - argtype.fail() - def test_construct_rhs_type_override_lhs(self, decl_base): class Element(decl_base): __tablename__ = "element" @@ -956,1175 +752,1672 @@ class MappedColumnTest(fixtures.TestBase, testing.AssertsCompiledSQL): is_true(User.__table__.c.lnl_rnl._copy().nullable) def test_fwd_refs(self, decl_base: Type[DeclarativeBase]): + # TODO: add an assertion? class MyClass(decl_base): __tablename__ = "my_table" id: Mapped["int"] = mapped_column(primary_key=True) data_one: Mapped["str"] - def test_pep593_types_as_typemap_keys( - self, decl_base: Type[DeclarativeBase] - ): - """neat!!!""" - # anno only: global str50, str30, opt_str50, opt_str30 + def test_typing_literal_identity(self, decl_base): + """See issue #11820""" - str50 = Annotated[str, 50] - str30 = Annotated[str, 30] - opt_str50 = Optional[str50] - opt_str30 = Optional[str30] + class Foo(decl_base): + __tablename__ = "footable" - decl_base.registry.update_type_annotation_map( - {str50: String(50), str30: String(30)} - ) + id: Mapped[int] = mapped_column(primary_key=True) + t: Mapped[_TypingLiteral] + te: Mapped[_TypingExtensionsLiteral] - class MyClass(decl_base): - __tablename__ = "my_table" + for col in (Foo.__table__.c.t, Foo.__table__.c.te): + is_true(isinstance(col.type, Enum)) + eq_(col.type.enums, ["a", "b"]) + is_(col.type.native_enum, False) - id: Mapped[str50] = mapped_column(primary_key=True) - data_one: Mapped[str30] - data_two: Mapped[opt_str30] - data_three: Mapped[str50] - data_four: Mapped[opt_str50] - data_five: Mapped[str] - data_six: Mapped[Optional[str]] + def test_we_got_all_attrs_test_annotated(self): + argnames = _py_inspect.getfullargspec(mapped_column) + assert _annotated_names_tested.issuperset(argnames.kwonlyargs), ( + f"annotated attributes were not tested: " + f"{set(argnames.kwonlyargs).difference(_annotated_names_tested)}" + ) - eq_(MyClass.__table__.c.data_one.type.length, 30) - is_false(MyClass.__table__.c.data_one.nullable) + @annotated_name_test_cases( + ("sort_order", 100, lambda sort_order: sort_order == 100), + ("nullable", False, lambda column: column.nullable is False), + ( + "active_history", + True, + lambda column_property: column_property.active_history is True, + ), + ( + "deferred", + True, + lambda column_property: column_property.deferred is True, + ), + ( + "deferred", + _NoArg.NO_ARG, + lambda column_property: column_property is None, + ), + ( + "deferred_group", + "mygroup", + lambda column_property: column_property.deferred is True + and column_property.group == "mygroup", + ), + ( + "deferred_raiseload", + True, + lambda column_property: column_property.deferred is True + and column_property.raiseload is True, + ), + ( + "server_default", + "25", + lambda column: column.server_default.arg == "25", + ), + ( + "server_onupdate", + "25", + lambda column: column.server_onupdate.arg == "25", + ), + ( + "default", + 25, + lambda column: column.default.arg == 25, + ), + ( + "insert_default", + 25, + lambda column: column.default.arg == 25, + ), + ( + "onupdate", + 25, + lambda column: column.onupdate.arg == 25, + ), + ("doc", "some doc", lambda column: column.doc == "some doc"), + ( + "comment", + "some comment", + lambda column: column.comment == "some comment", + ), + ("index", True, lambda column: column.index is True), + ("index", _NoArg.NO_ARG, lambda column: column.index is None), + ("index", False, lambda column: column.index is False), + ("unique", True, lambda column: column.unique is True), + ("unique", False, lambda column: column.unique is False), + ("autoincrement", True, lambda column: column.autoincrement is True), + ("system", True, lambda column: column.system is True), + ("primary_key", True, lambda column: column.primary_key is True), + ("type_", BIGINT, lambda column: isinstance(column.type, BIGINT)), + ("info", {"foo": "bar"}, lambda column: column.info == {"foo": "bar"}), + ( + "use_existing_column", + True, + lambda mc: mc._use_existing_column is True, + ), + ( + "quote", + True, + exc.SADeprecationWarning( + "Can't use the 'key' or 'name' arguments in Annotated " + ), + ), + ( + "key", + "mykey", + exc.SADeprecationWarning( + "Can't use the 'key' or 'name' arguments in Annotated " + ), + ), + ( + "name", + "mykey", + exc.SADeprecationWarning( + "Can't use the 'key' or 'name' arguments in Annotated " + ), + ), + ( + "kw_only", + True, + exc.SADeprecationWarning( + "Argument 'kw_only' is a dataclass argument " + ), + ), + ( + "compare", + True, + exc.SADeprecationWarning( + "Argument 'compare' is a dataclass argument " + ), + ), + ( + "default_factory", + lambda: 25, + exc.SADeprecationWarning( + "Argument 'default_factory' is a dataclass argument " + ), + ), + ( + "repr", + True, + exc.SADeprecationWarning( + "Argument 'repr' is a dataclass argument " + ), + ), + ( + "init", + True, + exc.SADeprecationWarning( + "Argument 'init' is a dataclass argument" + ), + ), + ( + "hash", + True, + exc.SADeprecationWarning( + "Argument 'hash' is a dataclass argument" + ), + ), + ( + "dataclass_metadata", + {}, + exc.SADeprecationWarning( + "Argument 'dataclass_metadata' is a dataclass argument" + ), + ), + argnames="argname, argument, assertion", + ) + @testing.variation("use_annotated", [True, False, "control"]) + def test_names_encountered_for_annotated( + self, argname, argument, assertion, use_annotated, decl_base + ): + # anno only: global myint + + if argument is not _NoArg.NO_ARG: + kw = {argname: argument} + + if argname == "quote": + kw["name"] = "somename" + else: + kw = {} + + is_warning = isinstance(assertion, exc.SADeprecationWarning) + is_dataclass = argname in ( + "kw_only", + "init", + "repr", + "compare", + "default_factory", + "hash", + "dataclass_metadata", + ) + + if is_dataclass: + + class Base(MappedAsDataclass, decl_base): + __abstract__ = True + + else: + Base = decl_base + + if use_annotated.control: + # test in reverse; that kw set on the main mapped_column() takes + # effect when the Annotated is there also and does not have the + # kw + amc = mapped_column() + myint = Annotated[int, amc] + + mc = mapped_column(**kw) + + class User(Base): + __tablename__ = "user" + id: Mapped[int] = mapped_column(primary_key=True) + myname: Mapped[myint] = mc + + elif use_annotated: + amc = mapped_column(**kw) + myint = Annotated[int, amc] + + mc = mapped_column() + + if is_warning: + with expect_deprecated(assertion.args[0]): + + class User(Base): + __tablename__ = "user" + id: Mapped[int] = mapped_column(primary_key=True) + myname: Mapped[myint] = mc + + else: + + class User(Base): + __tablename__ = "user" + id: Mapped[int] = mapped_column(primary_key=True) + myname: Mapped[myint] = mc + + else: + mc = cast(MappedColumn, mapped_column(**kw)) + + mapper_prop = mc.mapper_property_to_assign + column_to_assign, sort_order = mc.columns_to_assign[0] + + if not is_warning: + assert_result = testing.resolve_lambda( + assertion, + sort_order=sort_order, + column_property=mapper_prop, + column=column_to_assign, + mc=mc, + ) + assert assert_result + elif is_dataclass and (not use_annotated or use_annotated.control): + eq_( + getattr(mc._attribute_options, f"dataclasses_{argname}"), + argument, + ) + + @testing.combinations(("index",), ("unique",), argnames="paramname") + @testing.combinations((True,), (False,), (None,), argnames="orig") + @testing.combinations((True,), (False,), (None,), argnames="merging") + def test_index_unique_combinations( + self, paramname, orig, merging, decl_base + ): + """test #11091""" + + # anno only: global myint + + amc = mapped_column(**{paramname: merging}) + myint = Annotated[int, amc] + + mc = mapped_column(**{paramname: orig}) + + class User(decl_base): + __tablename__ = "user" + id: Mapped[int] = mapped_column(primary_key=True) + myname: Mapped[myint] = mc + + result = getattr(User.__table__.c.myname, paramname) + if orig is None: + is_(result, merging) + else: + is_(result, orig) + + def test_missing_mapped_lhs(self, decl_base): + with expect_annotation_syntax_error("User.name"): + + class User(decl_base): + __tablename__ = "users" + + id: Mapped[int] = mapped_column(primary_key=True) + name: str = mapped_column() # type: ignore + + def test_construct_lhs_separate_name(self, decl_base): + class User(decl_base): + __tablename__ = "users" + + id: Mapped[int] = mapped_column(primary_key=True) + name: Mapped[str] = mapped_column() + data: Mapped[Optional[str]] = mapped_column("the_data") + + self.assert_compile( + select(User.data), "SELECT users.the_data FROM users" + ) + is_true(User.__table__.c.the_data.nullable) + + def test_construct_works_in_expr(self, decl_base): + class User(decl_base): + __tablename__ = "users" + + id: Mapped[int] = mapped_column(primary_key=True) + + class Address(decl_base): + __tablename__ = "addresses" + + id: Mapped[int] = mapped_column(primary_key=True) + user_id: Mapped[int] = mapped_column(ForeignKey("users.id")) + + user = relationship(User, primaryjoin=user_id == User.id) + + self.assert_compile( + select(Address.user_id, User.id).join(Address.user), + "SELECT addresses.user_id, users.id FROM addresses " + "JOIN users ON addresses.user_id = users.id", + ) + + def test_construct_works_as_polymorphic_on(self, decl_base): + class User(decl_base): + __tablename__ = "users" + + id: Mapped[int] = mapped_column(primary_key=True) + type: Mapped[str] = mapped_column() + + __mapper_args__ = {"polymorphic_on": type} + + decl_base.registry.configure() + is_(User.__table__.c.type, User.__mapper__.polymorphic_on) + + def test_construct_works_as_version_id_col(self, decl_base): + class User(decl_base): + __tablename__ = "users" + + id: Mapped[int] = mapped_column(primary_key=True) + version_id: Mapped[int] = mapped_column() + + __mapper_args__ = {"version_id_col": version_id} + + decl_base.registry.configure() + is_(User.__table__.c.version_id, User.__mapper__.version_id_col) + + def test_construct_works_in_deferred(self, decl_base): + class User(decl_base): + __tablename__ = "users" + + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[str] = deferred(mapped_column()) + + self.assert_compile(select(User), "SELECT users.id FROM users") + self.assert_compile( + select(User).options(undefer(User.data)), + "SELECT users.id, users.data FROM users", + ) + + def test_deferred_kw(self, decl_base): + class User(decl_base): + __tablename__ = "users" + + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[str] = mapped_column(deferred=True) + + self.assert_compile(select(User), "SELECT users.id FROM users") + self.assert_compile( + select(User).options(undefer(User.data)), + "SELECT users.id, users.data FROM users", + ) + + +class Pep593InterpretationTests(fixtures.TestBase, testing.AssertsCompiledSQL): + __dialect__ = "default" + + def test_extract_from_pep593(self, decl_base): + # anno only: global Address + + @dataclasses.dataclass + class Address: + street: str + state: str + zip_: str + + class User(decl_base): + __tablename__ = "user" + + id: Mapped[int] = mapped_column(primary_key=True) + name: Mapped[str] = mapped_column() + + address: Mapped[Annotated[Address, "foo"]] = composite( + mapped_column(), mapped_column(), mapped_column("zip") + ) + + self.assert_compile( + select(User), + 'SELECT "user".id, "user".name, "user".street, ' + '"user".state, "user".zip FROM "user"', + dialect="default", + ) + + def test_pep593_types_as_typemap_keys( + self, decl_base: Type[DeclarativeBase] + ): + """neat!!!""" + # anno only: global str50, str30, opt_str50, opt_str30 + + str50 = Annotated[str, 50] + str30 = Annotated[str, 30] + opt_str50 = Optional[str50] + opt_str30 = Optional[str30] + + decl_base.registry.update_type_annotation_map( + {str50: String(50), str30: String(30)} + ) + + class MyClass(decl_base): + __tablename__ = "my_table" + + id: Mapped[str50] = mapped_column(primary_key=True) + data_one: Mapped[str30] + data_two: Mapped[opt_str30] + data_three: Mapped[str50] + data_four: Mapped[opt_str50] + data_five: Mapped[str] + data_six: Mapped[Optional[str]] + + eq_(MyClass.__table__.c.data_one.type.length, 30) + is_false(MyClass.__table__.c.data_one.nullable) eq_(MyClass.__table__.c.data_two.type.length, 30) is_true(MyClass.__table__.c.data_two.nullable) eq_(MyClass.__table__.c.data_three.type.length, 50) - def test_plain_typealias_as_typemap_keys( + @testing.variation( + "alias_type", + [ + "none", + "typekeyword", + "typekeyword_unpopulated", + "typealias", + "typekeyword_nested", + ], + ) + @testing.requires.python312 + def test_extract_pep593_from_pep695( + self, decl_base: Type[DeclarativeBase], alias_type + ): + """test #11130""" + if alias_type.typekeyword: + decl_base.registry.update_type_annotation_map( + {strtypalias_keyword: VARCHAR(33)} # noqa: F821 + ) + if alias_type.typekeyword_nested: + decl_base.registry.update_type_annotation_map( + {strtypalias_keyword_nested: VARCHAR(42)} # noqa: F821 + ) + + class MyClass(decl_base): + __tablename__ = "my_table" + + id: Mapped[int] = mapped_column(primary_key=True) + + if alias_type.typekeyword or alias_type.typekeyword_unpopulated: + data_one: Mapped[strtypalias_keyword] # noqa: F821 + elif alias_type.typealias: + data_one: Mapped[strtypalias_ta] # noqa: F821 + elif alias_type.none: + data_one: Mapped[strtypalias_plain] # noqa: F821 + elif alias_type.typekeyword_nested: + data_one: Mapped[strtypalias_keyword_nested] # noqa: F821 + else: + alias_type.fail() + + table = MyClass.__table__ + assert table is not None + + if alias_type.typekeyword_nested: + # a nested annotation is not supported + eq_(MyClass.data_one.expression.info, {}) + else: + eq_(MyClass.data_one.expression.info, {"hi": "there"}) + + if alias_type.typekeyword: + eq_(MyClass.data_one.type.length, 33) + elif alias_type.typekeyword_nested: + eq_(MyClass.data_one.type.length, 42) + else: + eq_(MyClass.data_one.type.length, None) + + @testing.requires.python312 + def test_no_recursive_pep593_from_pep695( + self, decl_base: Type[DeclarativeBase] + ): + def declare(): + class MyClass(decl_base): + __tablename__ = "my_table" + + id: Mapped[int] = mapped_column(primary_key=True) + + data_one: Mapped[_RecursivePep695Pep593] # noqa: F821 + + with expect_raises_message( + orm_exc.MappedAnnotationError, + r"Could not locate SQLAlchemy Core type when resolving for Python " + r"type " + r"indicated by '_RecursivePep695Pep593' inside the Mapped\[\] " + r"annotation for the 'data_one' attribute; none of " + r"'_RecursivePep695Pep593', " + r"'typing.Annotated\[_TypingStrPep695, .*\]', '_TypingStrPep695' " + r"are resolvable by the registry", + ): + declare() + + def test_extract_base_type_from_pep593( self, decl_base: Type[DeclarativeBase] ): - decl_base.registry.update_type_annotation_map( - {_UnionTypeAlias: JSON, _StrTypeAlias: String(30)} - ) + """base type is extracted from an Annotated structure if not otherwise + in the type lookup dictionary""" - class Test(decl_base): - __tablename__ = "test" - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[_StrTypeAlias] - structure: Mapped[_UnionTypeAlias] + class MyClass(decl_base): + __tablename__ = "my_table" - eq_(Test.__table__.c.data.type.length, 30) - is_(Test.__table__.c.structure.type._type_affinity, JSON) + id: Mapped[Annotated[Annotated[int, "q"], "t"]] = mapped_column( + primary_key=True + ) - @testing.variation( - "option", - [ - "plain", - "union", - "union_604", - "null", - "union_null", - "union_null_604", - "optional", - "optional_union", - "optional_union_604", - "union_newtype", - "union_null_newtype", - "union_695", - "union_null_695", - ], - ) - @testing.variation("in_map", ["yes", "no", "value"]) - @testing.requires.python312 - def test_pep695_behavior(self, decl_base, in_map, option): - """Issue #11955""" - # anno only: global tat + is_(MyClass.__table__.c.id.type._type_affinity, Integer) - if option.plain: - tat = TypeAliasType("tat", str) - elif option.union: - tat = TypeAliasType("tat", Union[str, int]) - elif option.union_604: - tat = TypeAliasType("tat", str | int) - elif option.null: - tat = TypeAliasType("tat", None) - elif option.union_null: - tat = TypeAliasType("tat", Union[str, int, None]) - elif option.union_null_604: - tat = TypeAliasType("tat", str | int | None) - elif option.optional: - tat = TypeAliasType("tat", Optional[str]) - elif option.optional_union: - tat = TypeAliasType("tat", Optional[Union[str, int]]) - elif option.optional_union_604: - tat = TypeAliasType("tat", Optional[str | int]) - elif option.union_newtype: - # this seems to be illegal for typing but "works" - tat = NewType("tat", Union[str, int]) - elif option.union_null_newtype: - # this seems to be illegal for typing but "works" - tat = NewType("tat", Union[str, int, None]) - elif option.union_695: - tat = TypeAliasType("tat", str | int) - elif option.union_null_695: - tat = TypeAliasType("tat", str | int | None) - else: - option.fail() + def test_extract_sqla_from_pep593_not_yet( + self, decl_base: Type[DeclarativeBase] + ): + """https://twitter.com/zzzeek/status/1536693554621341697""" - if in_map.yes: - decl_base.registry.update_type_annotation_map({tat: String(99)}) - elif in_map.value and "newtype" not in option.name: - decl_base.registry.update_type_annotation_map( - {tat.__value__: String(99)} - ) + global SomeRelated - def declare(): - class Test(decl_base): - __tablename__ = "test" - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[tat] + class SomeRelated(decl_base): + __tablename__: ClassVar[Optional[str]] = "some_related" + id: Mapped["int"] = mapped_column(primary_key=True) - return Test.__table__.c.data + with expect_raises_message( + NotImplementedError, + r"Use of the 'Relationship' construct inside of an Annotated " + r"object is not yet supported.", + ): - if in_map.yes: - col = declare() - is_true(isinstance(col.type, String)) - eq_(col.type.length, 99) - nullable = "null" in option.name or "optional" in option.name - eq_(col.nullable, nullable) + class MyClass(decl_base): + __tablename__ = "my_table" + + id: Mapped["int"] = mapped_column(primary_key=True) + data_one: Mapped[Annotated["SomeRelated", relationship()]] + + def test_extract_sqla_from_pep593_plain( + self, decl_base: Type[DeclarativeBase] + ): + """extraction of mapped_column() from the Annotated type + + https://twitter.com/zzzeek/status/1536693554621341697""" + # anno only: global intpk, strnone, str30nullable + # anno only: global opt_strnone, opt_str30 + + intpk = Annotated[int, mapped_column(primary_key=True)] + + strnone = Annotated[str, mapped_column()] # str -> NOT NULL + str30nullable = Annotated[ + str, mapped_column(String(30), nullable=True) # nullable -> NULL + ] + opt_strnone = Optional[strnone] # Optional[str] -> NULL + opt_str30 = Optional[str30nullable] # nullable -> NULL + + class MyClass(decl_base): + __tablename__ = "my_table" + + id: Mapped[intpk] + + data_one: Mapped[strnone] + data_two: Mapped[str30nullable] + data_three: Mapped[opt_strnone] + data_four: Mapped[opt_str30] + + class MyOtherClass(decl_base): + __tablename__ = "my_other_table" + + id: Mapped[intpk] + + data_one: Mapped[strnone] + data_two: Mapped[str30nullable] + data_three: Mapped[opt_strnone] + data_four: Mapped[opt_str30] + + for cls in MyClass, MyOtherClass: + table = cls.__table__ + assert table is not None + + is_(table.c.id.primary_key, True) + is_(table.c.id.table, table) + + eq_(table.c.data_one.type.length, None) + eq_(table.c.data_two.type.length, 30) + eq_(table.c.data_three.type.length, None) + + is_false(table.c.data_one.nullable) + is_true(table.c.data_two.nullable) + is_true(table.c.data_three.nullable) + is_true(table.c.data_four.nullable) + + def test_extract_sqla_from_pep593_mixin( + self, decl_base: Type[DeclarativeBase] + ): + """extraction of mapped_column() from the Annotated type + + https://twitter.com/zzzeek/status/1536693554621341697""" + + # anno only: global intpk, strnone, str30nullable + # anno only: global opt_strnone, opt_str30 + intpk = Annotated[int, mapped_column(primary_key=True)] + + strnone = Annotated[str, mapped_column()] # str -> NOT NULL + str30nullable = Annotated[ + str, mapped_column(String(30), nullable=True) # nullable -> NULL + ] + opt_strnone = Optional[strnone] # Optional[str] -> NULL + opt_str30 = Optional[str30nullable] # nullable -> NULL + + class HasPk: + id: Mapped[intpk] + + data_one: Mapped[strnone] + data_two: Mapped[str30nullable] + + class MyClass(HasPk, decl_base): + __tablename__ = "my_table" + + data_three: Mapped[opt_strnone] + data_four: Mapped[opt_str30] + + table = MyClass.__table__ + assert table is not None + + is_(table.c.id.primary_key, True) + is_(table.c.id.table, table) + + eq_(table.c.data_one.type.length, None) + eq_(table.c.data_two.type.length, 30) + eq_(table.c.data_three.type.length, None) + + is_false(table.c.data_one.nullable) + is_true(table.c.data_two.nullable) + is_true(table.c.data_three.nullable) + is_true(table.c.data_four.nullable) + + @testing.variation("to_assert", ["ddl", "fkcount", "references"]) + @testing.variation("assign_blank", [True, False]) + def test_extract_fk_col_from_pep593( + self, decl_base: Type[DeclarativeBase], to_assert, assign_blank + ): + # anno only: global intpk, element_ref + intpk = Annotated[int, mapped_column(primary_key=True)] + element_ref = Annotated[int, mapped_column(ForeignKey("element.id"))] + + class Element(decl_base): + __tablename__ = "element" + + id: Mapped[intpk] + + class RefElementOne(decl_base): + __tablename__ = "refone" + + id: Mapped[intpk] + + if assign_blank: + other_id: Mapped[element_ref] = mapped_column() + else: + other_id: Mapped[element_ref] + + class RefElementTwo(decl_base): + __tablename__ = "reftwo" + + id: Mapped[intpk] + if assign_blank: + some_id: Mapped[element_ref] = mapped_column() + else: + some_id: Mapped[element_ref] + + assert Element.__table__ is not None + assert RefElementOne.__table__ is not None + assert RefElementTwo.__table__ is not None + + if to_assert.fkcount: + # test #9766 + eq_(len(RefElementOne.__table__.c.other_id.foreign_keys), 1) + eq_(len(RefElementTwo.__table__.c.some_id.foreign_keys), 1) + elif to_assert.references: + is_true( + RefElementOne.__table__.c.other_id.references( + Element.__table__.c.id + ) + ) + is_true( + RefElementTwo.__table__.c.some_id.references( + Element.__table__.c.id + ) + ) + elif to_assert.ddl: + self.assert_compile( + CreateTable(RefElementOne.__table__), + "CREATE TABLE refone " + "(id INTEGER NOT NULL, other_id INTEGER NOT NULL, " + "PRIMARY KEY (id), " + "FOREIGN KEY(other_id) REFERENCES element (id))", + ) + self.assert_compile( + CreateTable(RefElementTwo.__table__), + "CREATE TABLE reftwo " + "(id INTEGER NOT NULL, some_id INTEGER NOT NULL, " + "PRIMARY KEY (id), " + "FOREIGN KEY(some_id) REFERENCES element (id))", + ) else: - with expect_raises_message( - orm_exc.MappedAnnotationError, - r"Could not locate SQLAlchemy Core type for Python type .*tat " - "inside the 'data' attribute Mapped annotation", - ): - declare() + to_assert.fail() - @testing.variation( - "type_", - [ - "str_extension", - "str_typing", - "generic_extension", - "generic_typing", - "generic_typed_extension", - "generic_typed_typing", - ], + @testing.combinations( + (collections.abc.Sequence, (str,)), + (collections.abc.MutableSequence, (str,)), + (collections.abc.Mapping, (str, str)), + (collections.abc.MutableMapping, (str, str)), + (typing.Mapping, (str, str)), + (typing.MutableMapping, (str, str)), + (typing.Sequence, (str,)), + (typing.MutableSequence, (str,)), + (list, (str,)), + (List, (str,)), + (dict, (str, str)), + (Dict, (str, str)), + (list, None), + (List, None), + (dict, None), + (Dict, None), + id_="sa", + argnames="container_typ,args", ) - @testing.requires.python312 - def test_pep695_typealias_as_typemap_keys( - self, decl_base: Type[DeclarativeBase], type_ - ): - """test #10807""" + @testing.variation("style", ["pep593", "alias", "direct"]) + def test_extract_composed(self, container_typ, args, style): + """test #9099 (pep593) - decl_base.registry.update_type_annotation_map( - { - _UnionPep695: JSON, - _StrPep695: String(30), - _TypingStrPep695: String(30), - _GenericPep695: String(30), - _TypingGenericPep695: String(30), - _GenericPep695Typed: String(30), - _TypingGenericPep695Typed: String(30), - } - ) + test #11814 - class Test(decl_base): - __tablename__ = "test" - id: Mapped[int] = mapped_column(primary_key=True) - if type_.str_extension: - data: Mapped[_StrPep695] - elif type_.str_typing: - data: Mapped[_TypingStrPep695] - elif type_.generic_extension: - data: Mapped[_GenericPep695] - elif type_.generic_typing: - data: Mapped[_TypingGenericPep695] - elif type_.generic_typed_extension: - data: Mapped[_GenericPep695Typed] - elif type_.generic_typed_typing: - data: Mapped[_TypingGenericPep695Typed] - else: - type_.fail() - structure: Mapped[_UnionPep695] + test #11831, regression from #11814 + """ - eq_(Test.__table__.c.data.type._type_affinity, String) - eq_(Test.__table__.c.data.type.length, 30) - is_(Test.__table__.c.structure.type._type_affinity, JSON) + global TestType - @testing.variation( - "alias_type", - ["none", "typekeyword", "typealias", "typekeyword_nested"], - ) - @testing.requires.python312 - def test_extract_pep593_from_pep695( - self, decl_base: Type[DeclarativeBase], alias_type - ): - """test #11130""" - if alias_type.typekeyword: - decl_base.registry.update_type_annotation_map( - {strtypalias_keyword: VARCHAR(33)} # noqa: F821 - ) - if alias_type.typekeyword_nested: - decl_base.registry.update_type_annotation_map( - {strtypalias_keyword_nested: VARCHAR(42)} # noqa: F821 - ) + if style.pep593: + if args is None: + TestType = Annotated[container_typ, 0] + else: + TestType = Annotated[container_typ[args], 0] + elif style.alias: + if args is None: + TestType = container_typ + else: + TestType = container_typ[args] + elif style.direct: + TestType = container_typ - class MyClass(decl_base): + class Base(DeclarativeBase): + if style.direct: + if args == (str, str): + type_annotation_map = {TestType[str, str]: JSON()} + elif args is None: + type_annotation_map = {TestType: JSON()} + else: + type_annotation_map = {TestType[str]: JSON()} + else: + type_annotation_map = {TestType: JSON()} + + class MyClass(Base): __tablename__ = "my_table" id: Mapped[int] = mapped_column(primary_key=True) - if alias_type.typekeyword: - data_one: Mapped[strtypalias_keyword] # noqa: F821 - elif alias_type.typealias: - data_one: Mapped[strtypalias_ta] # noqa: F821 - elif alias_type.none: - data_one: Mapped[strtypalias_plain] # noqa: F821 - elif alias_type.typekeyword_nested: - data_one: Mapped[strtypalias_keyword_nested] # noqa: F821 + if style.direct: + if args == (str, str): + data: Mapped[TestType[str, str]] = mapped_column() + elif args is None: + data: Mapped[TestType] = mapped_column() + else: + data: Mapped[TestType[str]] = mapped_column() else: - alias_type.fail() + data: Mapped[TestType] = mapped_column() - table = MyClass.__table__ - assert table is not None + is_(MyClass.__table__.c.data.type._type_affinity, JSON) - if alias_type.typekeyword_nested: - # a nested annotation is not supported - eq_(MyClass.data_one.expression.info, {}) - else: - eq_(MyClass.data_one.expression.info, {"hi": "there"}) + @testing.combinations( + ("default", lambda ctx: 10), + ("default", func.foo()), + ("onupdate", lambda ctx: 10), + ("onupdate", func.foo()), + ("server_onupdate", func.foo()), + ("server_default", func.foo()), + ("server_default", Identity()), + ("nullable", True), + ("nullable", False), + ("type", BigInteger()), + ("index", True), + ("unique", True), + argnames="paramname, value", + ) + @testing.combinations(True, False, argnames="optional") + @testing.combinations(True, False, argnames="include_existing_col") + def test_combine_args_from_pep593( + self, + decl_base: Type[DeclarativeBase], + paramname, + value, + include_existing_col, + optional, + ): + # anno only: global intpk, element_ref + intpk = Annotated[int, mapped_column(primary_key=True)] - if alias_type.typekeyword: - eq_(MyClass.data_one.type.length, 33) - elif alias_type.typekeyword_nested: - eq_(MyClass.data_one.type.length, 42) + args = [] + params = {} + if paramname == "type": + args.append(value) else: - eq_(MyClass.data_one.type.length, None) + params[paramname] = value - @testing.variation( - "type_", - [ - "literal", - "literal_typing", - "recursive", - "not_literal", - "not_literal_typing", - "generic", - "generic_typing", - "generic_typed", - "generic_typed_typing", - ], - ) - @testing.combinations(True, False, argnames="in_map") - @testing.requires.python312 - def test_pep695_literal_defaults_to_enum(self, decl_base, type_, in_map): - """test #11305.""" + element_ref = Annotated[int, mapped_column(*args, **params)] + if optional: + element_ref = Optional[element_ref] - def declare(): - class Foo(decl_base): - __tablename__ = "footable" + class Element(decl_base): + __tablename__ = "element" - id: Mapped[int] = mapped_column(primary_key=True) - if type_.recursive: - status: Mapped[_RecursiveLiteral695] # noqa: F821 - elif type_.literal: - status: Mapped[_Literal695] # noqa: F821 - elif type_.literal_typing: - status: Mapped[_TypingLiteral695] # noqa: F821 - elif type_.not_literal: - status: Mapped[_StrPep695] # noqa: F821 - elif type_.not_literal_typing: - status: Mapped[_TypingStrPep695] # noqa: F821 - elif type_.generic: - status: Mapped[_GenericPep695] # noqa: F821 - elif type_.generic_typing: - status: Mapped[_TypingGenericPep695] # noqa: F821 - elif type_.generic_typed: - status: Mapped[_GenericPep695Typed] # noqa: F821 - elif type_.generic_typed_typing: - status: Mapped[_TypingGenericPep695Typed] # noqa: F821 - else: - type_.fail() + id: Mapped[intpk] + + if include_existing_col: + data: Mapped[element_ref] = mapped_column() + else: + data: Mapped[element_ref] - return Foo + data_col = Element.__table__.c.data + if paramname in ( + "default", + "onupdate", + "server_default", + "server_onupdate", + ): + default = getattr(data_col, paramname) + if default.is_server_default and default.has_argument: + is_(default.arg, value) + is_(default.column, data_col) + elif paramname == "type": + assert type(data_col.type) is type(value) + else: + is_(getattr(data_col, paramname), value) - if in_map: - decl_base.registry.update_type_annotation_map( - { - _Literal695: Enum(enum.Enum), # noqa: F821 - _TypingLiteral695: Enum(enum.Enum), # noqa: F821 - _RecursiveLiteral695: Enum(enum.Enum), # noqa: F821 - _StrPep695: Enum(enum.Enum), # noqa: F821 - _TypingStrPep695: Enum(enum.Enum), # noqa: F821 - _GenericPep695: Enum(enum.Enum), # noqa: F821 - _TypingGenericPep695: Enum(enum.Enum), # noqa: F821 - _GenericPep695Typed: Enum(enum.Enum), # noqa: F821 - _TypingGenericPep695Typed: Enum(enum.Enum), # noqa: F821 - } - ) - if type_.literal or type_.literal_typing: - Foo = declare() - col = Foo.__table__.c.status - is_true(isinstance(col.type, Enum)) - eq_(col.type.enums, ["to-do", "in-progress", "done"]) - is_(col.type.native_enum, False) + # test _copy() for #8410 + is_(getattr(data_col._copy(), paramname), value) + + sd = data_col.server_default + if sd is not None and isinstance(sd, Identity): + if paramname == "nullable" and value: + is_(data_col.nullable, True) else: - with expect_raises_message( - exc.ArgumentError, - "Can't associate TypeAliasType '.+' to an Enum " - "since it's not a direct alias of a Literal. Only " - "aliases in this form `type my_alias = Literal.'a', " - "'b'.` are supported when generating Enums.", - ): - declare() + is_(data_col.nullable, False) + elif paramname != "nullable": + is_(data_col.nullable, optional) else: - with expect_raises_message( - exc.ArgumentError, - "Could not locate SQLAlchemy Core type for Python type " - ".+ inside the 'status' attribute Mapped annotation", - ): - declare() + is_(data_col.nullable, value) - def test_typing_literal_identity(self, decl_base): - """See issue #11820""" + @testing.combinations(True, False, argnames="specify_identity") + @testing.combinations(True, False, None, argnames="specify_nullable") + @testing.combinations(True, False, argnames="optional") + @testing.combinations(True, False, argnames="include_existing_col") + def test_combine_args_from_pep593_identity_nullable( + self, + decl_base: Type[DeclarativeBase], + specify_identity, + specify_nullable, + optional, + include_existing_col, + ): + # anno only: global intpk, element_ref + intpk = Annotated[int, mapped_column(primary_key=True)] - class Foo(decl_base): - __tablename__ = "footable" + if specify_identity: + args = [Identity()] + else: + args = [] - id: Mapped[int] = mapped_column(primary_key=True) - t: Mapped[_TypingLiteral] - te: Mapped[_TypingExtensionsLiteral] + if specify_nullable is not None: + params = {"nullable": specify_nullable} + else: + params = {} - for col in (Foo.__table__.c.t, Foo.__table__.c.te): - is_true(isinstance(col.type, Enum)) - eq_(col.type.enums, ["a", "b"]) - is_(col.type.native_enum, False) + element_ref = Annotated[int, mapped_column(*args, **params)] + if optional: + element_ref = Optional[element_ref] - def test_we_got_all_attrs_test_annotated(self): - argnames = _py_inspect.getfullargspec(mapped_column) - assert _annotated_names_tested.issuperset(argnames.kwonlyargs), ( - f"annotated attributes were not tested: " - f"{set(argnames.kwonlyargs).difference(_annotated_names_tested)}" - ) + class Element(decl_base): + __tablename__ = "element" - @annotated_name_test_cases( - ("sort_order", 100, lambda sort_order: sort_order == 100), - ("nullable", False, lambda column: column.nullable is False), - ( - "active_history", - True, - lambda column_property: column_property.active_history is True, - ), - ( - "deferred", - True, - lambda column_property: column_property.deferred is True, - ), - ( - "deferred", - _NoArg.NO_ARG, - lambda column_property: column_property is None, - ), - ( - "deferred_group", - "mygroup", - lambda column_property: column_property.deferred is True - and column_property.group == "mygroup", - ), - ( - "deferred_raiseload", - True, - lambda column_property: column_property.deferred is True - and column_property.raiseload is True, - ), - ( - "server_default", - "25", - lambda column: column.server_default.arg == "25", - ), - ( - "server_onupdate", - "25", - lambda column: column.server_onupdate.arg == "25", - ), - ( - "default", - 25, - lambda column: column.default.arg == 25, - ), - ( - "insert_default", - 25, - lambda column: column.default.arg == 25, - ), - ( - "onupdate", - 25, - lambda column: column.onupdate.arg == 25, - ), - ("doc", "some doc", lambda column: column.doc == "some doc"), - ( - "comment", - "some comment", - lambda column: column.comment == "some comment", - ), - ("index", True, lambda column: column.index is True), - ("index", _NoArg.NO_ARG, lambda column: column.index is None), - ("index", False, lambda column: column.index is False), - ("unique", True, lambda column: column.unique is True), - ("unique", False, lambda column: column.unique is False), - ("autoincrement", True, lambda column: column.autoincrement is True), - ("system", True, lambda column: column.system is True), - ("primary_key", True, lambda column: column.primary_key is True), - ("type_", BIGINT, lambda column: isinstance(column.type, BIGINT)), - ("info", {"foo": "bar"}, lambda column: column.info == {"foo": "bar"}), - ( - "use_existing_column", - True, - lambda mc: mc._use_existing_column is True, - ), - ( - "quote", - True, - exc.SADeprecationWarning( - "Can't use the 'key' or 'name' arguments in Annotated " - ), - ), - ( - "key", - "mykey", - exc.SADeprecationWarning( - "Can't use the 'key' or 'name' arguments in Annotated " - ), - ), - ( - "name", - "mykey", - exc.SADeprecationWarning( - "Can't use the 'key' or 'name' arguments in Annotated " - ), - ), - ( - "kw_only", - True, - exc.SADeprecationWarning( - "Argument 'kw_only' is a dataclass argument " - ), - ), - ( - "compare", - True, - exc.SADeprecationWarning( - "Argument 'compare' is a dataclass argument " - ), - ), - ( - "default_factory", - lambda: 25, - exc.SADeprecationWarning( - "Argument 'default_factory' is a dataclass argument " - ), - ), - ( - "repr", - True, - exc.SADeprecationWarning( - "Argument 'repr' is a dataclass argument " - ), - ), - ( - "init", - True, - exc.SADeprecationWarning( - "Argument 'init' is a dataclass argument" - ), - ), - ( - "hash", - True, - exc.SADeprecationWarning( - "Argument 'hash' is a dataclass argument" - ), - ), - ( - "dataclass_metadata", - {}, - exc.SADeprecationWarning( - "Argument 'dataclass_metadata' is a dataclass argument" - ), - ), - argnames="argname, argument, assertion", + id: Mapped[intpk] + + if include_existing_col: + data: Mapped[element_ref] = mapped_column() + else: + data: Mapped[element_ref] + + # test identity + _copy() for #8410 + for col in ( + Element.__table__.c.data, + Element.__table__.c.data._copy(), + ): + if specify_nullable is True: + is_(col.nullable, True) + elif specify_identity: + is_(col.nullable, False) + elif specify_nullable is False: + is_(col.nullable, False) + elif not optional: + is_(col.nullable, False) + else: + is_(col.nullable, True) + + @testing.combinations( + ("default", lambda ctx: 10, lambda ctx: 15), + ("default", func.foo(), func.bar()), + ("onupdate", lambda ctx: 10, lambda ctx: 15), + ("onupdate", func.foo(), func.bar()), + ("server_onupdate", func.foo(), func.bar()), + ("server_default", func.foo(), func.bar()), + ("nullable", True, False), + ("nullable", False, True), + ("type", BigInteger(), Numeric()), + argnames="paramname, value, override_value", ) - @testing.variation("use_annotated", [True, False, "control"]) - def test_names_encountered_for_annotated( - self, argname, argument, assertion, use_annotated, decl_base + def test_dont_combine_args_from_pep593( + self, + decl_base: Type[DeclarativeBase], + paramname, + value, + override_value, ): - # anno only: global myint - - if argument is not _NoArg.NO_ARG: - kw = {argname: argument} + # anno only: global intpk, element_ref + intpk = Annotated[int, mapped_column(primary_key=True)] - if argname == "quote": - kw["name"] = "somename" + args = [] + params = {} + override_args = [] + override_params = {} + if paramname == "type": + args.append(value) + override_args.append(override_value) else: - kw = {} + params[paramname] = value + if paramname == "default": + override_params["insert_default"] = override_value + else: + override_params[paramname] = override_value - is_warning = isinstance(assertion, exc.SADeprecationWarning) - is_dataclass = argname in ( - "kw_only", - "init", - "repr", - "compare", - "default_factory", - "hash", - "dataclass_metadata", - ) + element_ref = Annotated[int, mapped_column(*args, **params)] - if is_dataclass: + class Element(decl_base): + __tablename__ = "element" - class Base(MappedAsDataclass, decl_base): - __abstract__ = True + id: Mapped[intpk] - else: - Base = decl_base + data: Mapped[element_ref] = mapped_column( + *override_args, **override_params + ) - if use_annotated.control: - # test in reverse; that kw set on the main mapped_column() takes - # effect when the Annotated is there also and does not have the - # kw - amc = mapped_column() - myint = Annotated[int, amc] + if paramname in ( + "default", + "onupdate", + "server_default", + "server_onupdate", + ): + default = getattr(Element.__table__.c.data, paramname) + is_(default.arg, override_value) + is_(default.column, Element.__table__.c.data) + elif paramname == "type": + assert type(Element.__table__.c.data.type) is type(override_value) + else: + is_(getattr(Element.__table__.c.data, paramname), override_value) - mc = mapped_column(**kw) + def test_use_existing_column_from_pep_593(self, decl_base): + """test #12787""" - class User(Base): - __tablename__ = "user" - id: Mapped[int] = mapped_column(primary_key=True) - myname: Mapped[myint] = mc + # anno only: global Label + Label = Annotated[ + str, mapped_column(String(20), use_existing_column=True) + ] - elif use_annotated: - amc = mapped_column(**kw) - myint = Annotated[int, amc] + class A(decl_base): + __tablename__ = "table_a" - mc = mapped_column() + id: Mapped[int] = mapped_column(primary_key=True) + discriminator: Mapped[int] - if is_warning: - with expect_deprecated(assertion.args[0]): + __mapper_args__ = { + "polymorphic_on": "discriminator", + "polymorphic_abstract": True, + } - class User(Base): - __tablename__ = "user" - id: Mapped[int] = mapped_column(primary_key=True) - myname: Mapped[myint] = mc + class A_1(A): + label: Mapped[Label] - else: + __mapper_args__ = {"polymorphic_identity": 1} - class User(Base): - __tablename__ = "user" - id: Mapped[int] = mapped_column(primary_key=True) - myname: Mapped[myint] = mc + class A_2(A): + label: Mapped[Label] - else: - mc = cast(MappedColumn, mapped_column(**kw)) + __mapper_args__ = {"polymorphic_identity": 2} - mapper_prop = mc.mapper_property_to_assign - column_to_assign, sort_order = mc.columns_to_assign[0] + is_(A_1.label.property.columns[0], A_2.label.property.columns[0]) - if not is_warning: - assert_result = testing.resolve_lambda( - assertion, - sort_order=sort_order, - column_property=mapper_prop, - column=column_to_assign, - mc=mc, - ) - assert assert_result - elif is_dataclass and (not use_annotated or use_annotated.control): - eq_( - getattr(mc._attribute_options, f"dataclasses_{argname}"), - argument, - ) + eq_(A_1.label.property.columns[0].table, A.__table__) + eq_(A_2.label.property.columns[0].table, A.__table__) - @testing.combinations(("index",), ("unique",), argnames="paramname") - @testing.combinations((True,), (False,), (None,), argnames="orig") - @testing.combinations((True,), (False,), (None,), argnames="merging") - def test_index_unique_combinations( - self, paramname, orig, merging, decl_base + @testing.variation("in_map", [True, False]) + @testing.variation("alias_type", ["plain", "pep695"]) + @testing.requires.python312 + def test_generic_typealias_pep593( + self, decl_base: Type[DeclarativeBase], alias_type: Variation, in_map ): - """test #11091""" - # anno only: global myint - - amc = mapped_column(**{paramname: merging}) - myint = Annotated[int, amc] + if in_map: + decl_base.registry.update_type_annotation_map( + { + _GenericPep593TypeAlias[str]: VARCHAR(33), + _GenericPep593Pep695[str]: VARCHAR(33), + } + ) - mc = mapped_column(**{paramname: orig}) + class MyClass(decl_base): + __tablename__ = "my_table" - class User(decl_base): - __tablename__ = "user" id: Mapped[int] = mapped_column(primary_key=True) - myname: Mapped[myint] = mc - result = getattr(User.__table__.c.myname, paramname) - if orig is None: - is_(result, merging) + if alias_type.plain: + data_one: Mapped[_GenericPep593TypeAlias[str]] # noqa: F821 + elif alias_type.pep695: + data_one: Mapped[_GenericPep593Pep695[str]] # noqa: F821 + else: + alias_type.fail() + + eq_(MyClass.data_one.expression.info, {"hi": "there"}) + if in_map: + eq_(MyClass.data_one.expression.type.length, 33) else: - is_(result, orig) + eq_(MyClass.data_one.expression.type.length, None) - def test_pep484_newtypes_as_typemap_keys( - self, decl_base: Type[DeclarativeBase] - ): - # anno only: global str50, str30, str3050 - str50 = NewType("str50", str) - str30 = NewType("str30", str) - str3050 = NewType("str30", str50) +class TypeResolutionTests(fixtures.TestBase, testing.AssertsCompiledSQL): + __dialect__ = "default" - decl_base.registry.update_type_annotation_map( - {str50: String(50), str30: String(30), str3050: String(150)} - ) + @testing.combinations( + (str, types.String), + (Decimal, types.Numeric), + (float, types.Float), + (datetime.datetime, types.DateTime), + (uuid.UUID, types.Uuid), + argnames="pytype_arg,sqltype", + ) + def test_datatype_lookups(self, decl_base, pytype_arg, sqltype): + # anno only: global pytype + pytype = pytype_arg class MyClass(decl_base): - __tablename__ = "my_table" + __tablename__ = "mytable" + id: Mapped[int] = mapped_column(primary_key=True) - id: Mapped[str50] = mapped_column(primary_key=True) - data_one: Mapped[str30] - data_two: Mapped[str50] - data_three: Mapped[Optional[str30]] - data_four: Mapped[str3050] + data: Mapped[pytype] - eq_(MyClass.__table__.c.data_one.type.length, 30) - is_false(MyClass.__table__.c.data_one.nullable) + assert isinstance(MyClass.__table__.c.data.type, sqltype) - eq_(MyClass.__table__.c.data_two.type.length, 50) - is_false(MyClass.__table__.c.data_two.nullable) + @testing.combinations( + (BIGINT(),), + (BIGINT,), + (Integer().with_variant(BIGINT, "default")), + (Integer().with_variant(BIGINT(), "default")), + (BIGINT().with_variant(String(), "some_other_dialect")), + ) + def test_type_map_varieties(self, typ): + Base = declarative_base(type_annotation_map={int: typ}) - eq_(MyClass.__table__.c.data_three.type.length, 30) - is_true(MyClass.__table__.c.data_three.nullable) + class MyClass(Base): + __tablename__ = "mytable" - eq_(MyClass.__table__.c.data_four.type.length, 150) - is_false(MyClass.__table__.c.data_four.nullable) + id: Mapped[int] = mapped_column(primary_key=True) + x: Mapped[int] + y: Mapped[int] = mapped_column() + z: Mapped[int] = mapped_column(typ) - def test_newtype_missing_from_map(self, decl_base): - # anno only: global str50 + self.assert_compile( + CreateTable(MyClass.__table__), + "CREATE TABLE mytable (id BIGINT NOT NULL, " + "x BIGINT NOT NULL, y BIGINT NOT NULL, z BIGINT NOT NULL, " + "PRIMARY KEY (id))", + ) - str50 = NewType("str50", str) + def test_dont_ignore_unresolvable(self, decl_base): + """test #8888""" with expect_raises_message( - orm_exc.MappedAnnotationError, - "Could not locate SQLAlchemy Core type for Python type " - ".*str50 inside the 'data_one' attribute Mapped annotation", + sa_exc.ArgumentError, + r"Could not resolve all types within mapped annotation: " + r"\".*Mapped\[.*fake.*\]\". Ensure all types are written " + r"correctly and are imported within the module in use.", ): - class MyClass(decl_base): - __tablename__ = "my_table" + class A(decl_base): + __tablename__ = "a" id: Mapped[int] = mapped_column(primary_key=True) - data_one: Mapped[str50] - - def test_extract_base_type_from_pep593( - self, decl_base: Type[DeclarativeBase] - ): - """base type is extracted from an Annotated structure if not otherwise - in the type lookup dictionary""" - - class MyClass(decl_base): - __tablename__ = "my_table" + data: Mapped["fake"] # noqa - id: Mapped[Annotated[Annotated[int, "q"], "t"]] = mapped_column( - primary_key=True - ) + def test_type_dont_mis_resolve_on_superclass(self): + """test for #8859. - is_(MyClass.__table__.c.id.type._type_affinity, Integer) + For subclasses of a type that's in the map, don't resolve this + by default, even though we do a search through __mro__. - def test_extract_sqla_from_pep593_not_yet( - self, decl_base: Type[DeclarativeBase] - ): - """https://twitter.com/zzzeek/status/1536693554621341697""" + """ + # anno only: global int_sub - global SomeRelated + class int_sub(int): + pass - class SomeRelated(decl_base): - __tablename__: ClassVar[Optional[str]] = "some_related" - id: Mapped["int"] = mapped_column(primary_key=True) + Base = declarative_base( + type_annotation_map={ + int: Integer, + } + ) with expect_raises_message( - NotImplementedError, - r"Use of the 'Relationship' construct inside of an Annotated " - r"object is not yet supported.", + orm_exc.MappedAnnotationError, + "Could not locate SQLAlchemy Core type", ): - class MyClass(decl_base): - __tablename__ = "my_table" - - id: Mapped["int"] = mapped_column(primary_key=True) - data_one: Mapped[Annotated["SomeRelated", relationship()]] - - def test_extract_sqla_from_pep593_plain( - self, decl_base: Type[DeclarativeBase] - ): - """extraction of mapped_column() from the Annotated type - - https://twitter.com/zzzeek/status/1536693554621341697""" - # anno only: global intpk, strnone, str30nullable - # anno only: global opt_strnone, opt_str30 - - intpk = Annotated[int, mapped_column(primary_key=True)] - - strnone = Annotated[str, mapped_column()] # str -> NOT NULL - str30nullable = Annotated[ - str, mapped_column(String(30), nullable=True) # nullable -> NULL - ] - opt_strnone = Optional[strnone] # Optional[str] -> NULL - opt_str30 = Optional[str30nullable] # nullable -> NULL - - class MyClass(decl_base): - __tablename__ = "my_table" + class MyClass(Base): + __tablename__ = "mytable" - id: Mapped[intpk] + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[int_sub] - data_one: Mapped[strnone] - data_two: Mapped[str30nullable] - data_three: Mapped[opt_strnone] - data_four: Mapped[opt_str30] + @testing.variation("dict_key", ["typing", "plain"]) + def test_type_dont_mis_resolve_on_non_generic(self, dict_key): + """test for #8859. - class MyOtherClass(decl_base): - __tablename__ = "my_other_table" + For a specific generic type with arguments, don't do any MRO + lookup. - id: Mapped[intpk] + """ - data_one: Mapped[strnone] - data_two: Mapped[str30nullable] - data_three: Mapped[opt_strnone] - data_four: Mapped[opt_str30] + Base = declarative_base( + type_annotation_map={ + dict: String, + } + ) - for cls in MyClass, MyOtherClass: - table = cls.__table__ - assert table is not None + with expect_raises_message( + sa_exc.ArgumentError, "Could not locate SQLAlchemy Core type" + ): - is_(table.c.id.primary_key, True) - is_(table.c.id.table, table) + class MyClass(Base): + __tablename__ = "mytable" - eq_(table.c.data_one.type.length, None) - eq_(table.c.data_two.type.length, 30) - eq_(table.c.data_three.type.length, None) + id: Mapped[int] = mapped_column(primary_key=True) - is_false(table.c.data_one.nullable) - is_true(table.c.data_two.nullable) - is_true(table.c.data_three.nullable) - is_true(table.c.data_four.nullable) + if dict_key.plain: + data: Mapped[dict[str, str]] + elif dict_key.typing: + data: Mapped[Dict[str, str]] - def test_extract_sqla_from_pep593_mixin( - self, decl_base: Type[DeclarativeBase] - ): - """extraction of mapped_column() from the Annotated type + def test_type_secondary_resolution(self): + class MyString(String): + def _resolve_for_python_type( + self, python_type, matched_type, matched_on_flattened + ): + return String(length=42) - https://twitter.com/zzzeek/status/1536693554621341697""" + Base = declarative_base(type_annotation_map={str: MyString}) - # anno only: global intpk, strnone, str30nullable - # anno only: global opt_strnone, opt_str30 - intpk = Annotated[int, mapped_column(primary_key=True)] + class MyClass(Base): + __tablename__ = "mytable" - strnone = Annotated[str, mapped_column()] # str -> NOT NULL - str30nullable = Annotated[ - str, mapped_column(String(30), nullable=True) # nullable -> NULL - ] - opt_strnone = Optional[strnone] # Optional[str] -> NULL - opt_str30 = Optional[str30nullable] # nullable -> NULL + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[str] - class HasPk: - id: Mapped[intpk] + is_true(isinstance(MyClass.__table__.c.data.type, String)) + eq_(MyClass.__table__.c.data.type.length, 42) - data_one: Mapped[strnone] - data_two: Mapped[str30nullable] + def test_construct_lhs_type_missing(self, decl_base): + # anno only: global MyClass - class MyClass(HasPk, decl_base): - __tablename__ = "my_table" + class MyClass: + pass - data_three: Mapped[opt_strnone] - data_four: Mapped[opt_str30] + with expect_raises_message( + orm_exc.MappedAnnotationError, + "Could not locate SQLAlchemy Core type when resolving for Python " + r"type indicated by '.*class .*MyClass.*' inside the " + r"Mapped\[\] annotation for the 'data' attribute; the type " + "object is not resolvable by the registry", + ): - table = MyClass.__table__ - assert table is not None + class User(decl_base): + __tablename__ = "users" - is_(table.c.id.primary_key, True) - is_(table.c.id.table, table) + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[MyClass] = mapped_column() - eq_(table.c.data_one.type.length, None) - eq_(table.c.data_two.type.length, 30) - eq_(table.c.data_three.type.length, None) + @testing.variation( + "argtype", + [ + "type", + "column", + "mapped_column", + "column_class", + "ref_to_type", + "ref_to_column", + ], + ) + def test_construct_lhs_sqlalchemy_type(self, decl_base, argtype): + """test for #12329. - is_false(table.c.data_one.nullable) - is_true(table.c.data_two.nullable) - is_true(table.c.data_three.nullable) - is_true(table.c.data_four.nullable) + of note here are all the different messages we have for when the + wrong thing is put into Mapped[], and in fact in #12329 we added + another one. - @testing.variation("to_assert", ["ddl", "fkcount", "references"]) - @testing.variation("assign_blank", [True, False]) - def test_extract_fk_col_from_pep593( - self, decl_base: Type[DeclarativeBase], to_assert, assign_blank - ): - # anno only: global intpk, element_ref - intpk = Annotated[int, mapped_column(primary_key=True)] - element_ref = Annotated[int, mapped_column(ForeignKey("element.id"))] + This is a lot of different messages, but at the same time they + occur at different places in the interpretation of types. If + we were to centralize all these messages, we'd still likely end up + doing distinct messages for each scenario, so instead we added + a new ArgumentError subclass MappedAnnotationError that provides + some commonality to all of these cases. - class Element(decl_base): - __tablename__ = "element" - id: Mapped[intpk] + """ + expect_future_annotations = "annotations" in globals() - class RefElementOne(decl_base): - __tablename__ = "refone" + if argtype.type: + with expect_raises_message( + orm_exc.MappedAnnotationError, + # properties.py -> _init_column_for_annotation, type is + # a SQL type + "The type provided inside the 'data' attribute Mapped " + "annotation is the SQLAlchemy type .*BigInteger.*. Expected " + "a Python type instead", + ): - id: Mapped[intpk] + class User(decl_base): + __tablename__ = "users" - if assign_blank: - other_id: Mapped[element_ref] = mapped_column() - else: - other_id: Mapped[element_ref] + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[BigInteger] = mapped_column() - class RefElementTwo(decl_base): - __tablename__ = "reftwo" + elif argtype.column: + with expect_raises_message( + orm_exc.MappedAnnotationError, + # util.py -> _extract_mapped_subtype + ( + re.escape( + "Could not interpret annotation " + "Mapped[Column('q', BigInteger)]." + ) + if expect_future_annotations + # properties.py -> _init_column_for_annotation, object is + # not a SQL type or a python type, it's just some object + else re.escape( + "The object provided inside the 'data' attribute " + "Mapped annotation is not a Python type, it's the " + "object Column('q', BigInteger(), table=None). " + "Expected a Python type." + ) + ), + ): - id: Mapped[intpk] - if assign_blank: - some_id: Mapped[element_ref] = mapped_column() - else: - some_id: Mapped[element_ref] + class User(decl_base): + __tablename__ = "users" - assert Element.__table__ is not None - assert RefElementOne.__table__ is not None - assert RefElementTwo.__table__ is not None + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[Column("q", BigInteger)] = ( # noqa: F821 + mapped_column() + ) - if to_assert.fkcount: - # test #9766 - eq_(len(RefElementOne.__table__.c.other_id.foreign_keys), 1) - eq_(len(RefElementTwo.__table__.c.some_id.foreign_keys), 1) - elif to_assert.references: - is_true( - RefElementOne.__table__.c.other_id.references( - Element.__table__.c.id - ) - ) - is_true( - RefElementTwo.__table__.c.some_id.references( - Element.__table__.c.id - ) - ) + elif argtype.mapped_column: + with expect_raises_message( + orm_exc.MappedAnnotationError, + # properties.py -> _init_column_for_annotation, object is + # not a SQL type or a python type, it's just some object + # interestingly, this raises at the same point for both + # future annotations mode and legacy annotations mode + r"The object provided inside the 'data' attribute " + "Mapped annotation is not a Python type, it's the object " + r"\. " + "Expected a Python type.", + ): - elif to_assert.ddl: - self.assert_compile( - CreateTable(RefElementOne.__table__), - "CREATE TABLE refone " - "(id INTEGER NOT NULL, other_id INTEGER NOT NULL, " - "PRIMARY KEY (id), " - "FOREIGN KEY(other_id) REFERENCES element (id))", - ) - self.assert_compile( - CreateTable(RefElementTwo.__table__), - "CREATE TABLE reftwo " - "(id INTEGER NOT NULL, some_id INTEGER NOT NULL, " - "PRIMARY KEY (id), " - "FOREIGN KEY(some_id) REFERENCES element (id))", - ) - else: - to_assert.fail() + class User(decl_base): + __tablename__ = "users" - @testing.combinations( - (collections.abc.Sequence, (str,)), - (collections.abc.MutableSequence, (str,)), - (collections.abc.Mapping, (str, str)), - (collections.abc.MutableMapping, (str, str)), - (typing.Mapping, (str, str)), - (typing.MutableMapping, (str, str)), - (typing.Sequence, (str,)), - (typing.MutableSequence, (str,)), - (list, (str,)), - (List, (str,)), - (dict, (str, str)), - (Dict, (str, str)), - (list, None), - (List, None), - (dict, None), - (Dict, None), - id_="sa", - argnames="container_typ,args", - ) - @testing.variation("style", ["pep593", "alias", "direct"]) - def test_extract_composed(self, container_typ, args, style): - """test #9099 (pep593) + id: Mapped[int] = mapped_column(primary_key=True) + big_integer: Mapped[int] = mapped_column() + data: Mapped[big_integer] = mapped_column() - test #11814 + elif argtype.column_class: + with expect_raises_message( + orm_exc.MappedAnnotationError, + # properties.py -> _init_column_for_annotation, type is not + # a SQL type + "Could not locate SQLAlchemy Core type when resolving for " + "Python type indicated by " + r"'.*class .*.Column.*' inside the " + r"Mapped\[\] annotation for the 'data' attribute; the " + "type object is not resolvable by the registry", + ): - test #11831, regression from #11814 - """ + class User(decl_base): + __tablename__ = "users" - global TestType + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[Column] = mapped_column() - if style.pep593: - if args is None: - TestType = Annotated[container_typ, 0] - else: - TestType = Annotated[container_typ[args], 0] - elif style.alias: - if args is None: - TestType = container_typ - else: - TestType = container_typ[args] - elif style.direct: - TestType = container_typ + elif argtype.ref_to_type: + mytype = BigInteger + with expect_raises_message( + orm_exc.MappedAnnotationError, + ( + # decl_base.py -> _exract_mappable_attributes + re.escape( + "Could not resolve all types within mapped " + 'annotation: "Mapped[mytype]"' + ) + if expect_future_annotations + # properties.py -> _init_column_for_annotation, type is + # a SQL type + else re.escape( + "The type provided inside the 'data' attribute Mapped " + "annotation is the SQLAlchemy type " + ". " + "Expected a Python type instead" + ) + ), + ): - class Base(DeclarativeBase): - if style.direct: - if args == (str, str): - type_annotation_map = {TestType[str, str]: JSON()} - elif args is None: - type_annotation_map = {TestType: JSON()} - else: - type_annotation_map = {TestType[str]: JSON()} - else: - type_annotation_map = {TestType: JSON()} + class User(decl_base): + __tablename__ = "users" - class MyClass(Base): - __tablename__ = "my_table" + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[mytype] = mapped_column() - id: Mapped[int] = mapped_column(primary_key=True) + elif argtype.ref_to_column: + mycol = Column("q", BigInteger) - if style.direct: - if args == (str, str): - data: Mapped[TestType[str, str]] = mapped_column() - elif args is None: - data: Mapped[TestType] = mapped_column() - else: - data: Mapped[TestType[str]] = mapped_column() - else: - data: Mapped[TestType] = mapped_column() + with expect_raises_message( + orm_exc.MappedAnnotationError, + # decl_base.py -> _exract_mappable_attributes + ( + re.escape( + "Could not resolve all types within mapped " + 'annotation: "Mapped[mycol]"' + ) + if expect_future_annotations + else + # properties.py -> _init_column_for_annotation, object is + # not a SQL type or a python type, it's just some object + re.escape( + "The object provided inside the 'data' attribute " + "Mapped " + "annotation is not a Python type, it's the object " + "Column('q', BigInteger(), table=None). " + "Expected a Python type." + ) + ), + ): - is_(MyClass.__table__.c.data.type._type_affinity, JSON) + class User(decl_base): + __tablename__ = "users" - @testing.combinations( - ("default", lambda ctx: 10), - ("default", func.foo()), - ("onupdate", lambda ctx: 10), - ("onupdate", func.foo()), - ("server_onupdate", func.foo()), - ("server_default", func.foo()), - ("server_default", Identity()), - ("nullable", True), - ("nullable", False), - ("type", BigInteger()), - ("index", True), - ("unique", True), - argnames="paramname, value", - ) - @testing.combinations(True, False, argnames="optional") - @testing.combinations(True, False, argnames="include_existing_col") - def test_combine_args_from_pep593( - self, - decl_base: Type[DeclarativeBase], - paramname, - value, - include_existing_col, - optional, - ): - # anno only: global intpk, element_ref - intpk = Annotated[int, mapped_column(primary_key=True)] + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[mycol] = mapped_column() - args = [] - params = {} - if paramname == "type": - args.append(value) else: - params[paramname] = value + argtype.fail() - element_ref = Annotated[int, mapped_column(*args, **params)] - if optional: - element_ref = Optional[element_ref] + def test_plain_typealias_as_typemap_keys( + self, decl_base: Type[DeclarativeBase] + ): + decl_base.registry.update_type_annotation_map( + {_UnionTypeAlias: JSON, _StrTypeAlias: String(30)} + ) - class Element(decl_base): - __tablename__ = "element" + class Test(decl_base): + __tablename__ = "test" + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[_StrTypeAlias] + structure: Mapped[_UnionTypeAlias] - id: Mapped[intpk] + eq_(Test.__table__.c.data.type.length, 30) + is_(Test.__table__.c.structure.type._type_affinity, JSON) + + @testing.variation( + "option", + [ + "plain", + "union", + "union_604", + "null", + "union_null", + "union_null_604", + "optional", + "optional_union", + "optional_union_604", + "union_newtype", + "union_null_newtype", + "union_695", + "union_null_695", + ], + ) + @testing.variation("in_map", ["yes", "no", "value"]) + @testing.requires.python312 + def test_pep695_behavior(self, decl_base, in_map, option): + """Issue #11955; later issue #12829""" - if include_existing_col: - data: Mapped[element_ref] = mapped_column() - else: - data: Mapped[element_ref] + # anno only: global tat - data_col = Element.__table__.c.data - if paramname in ( - "default", - "onupdate", - "server_default", - "server_onupdate", - ): - default = getattr(data_col, paramname) - if default.is_server_default and default.has_argument: - is_(default.arg, value) - is_(default.column, data_col) - elif paramname == "type": - assert type(data_col.type) is type(value) + if option.plain: + tat = TypeAliasType("tat", str) + elif option.union: + tat = TypeAliasType("tat", Union[str, int]) + elif option.union_604: + tat = TypeAliasType("tat", str | int) + elif option.null: + tat = TypeAliasType("tat", None) + elif option.union_null: + tat = TypeAliasType("tat", Union[str, int, None]) + elif option.union_null_604: + tat = TypeAliasType("tat", str | int | None) + elif option.optional: + tat = TypeAliasType("tat", Optional[str]) + elif option.optional_union: + tat = TypeAliasType("tat", Optional[Union[str, int]]) + elif option.optional_union_604: + tat = TypeAliasType("tat", Optional[str | int]) + elif option.union_newtype: + # this seems to be illegal for typing but "works" + tat = NewType("tat", Union[str, int]) + elif option.union_null_newtype: + # this seems to be illegal for typing but "works" + tat = NewType("tat", Union[str, int, None]) + elif option.union_695: + tat = TypeAliasType("tat", str | int) + elif option.union_null_695: + tat = TypeAliasType("tat", str | int | None) else: - is_(getattr(data_col, paramname), value) + option.fail() - # test _copy() for #8410 - is_(getattr(data_col._copy(), paramname), value) + is_newtype = "newtype" in option.name + if in_map.yes: + decl_base.registry.update_type_annotation_map({tat: String(99)}) + elif in_map.value and not is_newtype: + decl_base.registry.update_type_annotation_map( + {tat.__value__: String(99)} + ) - sd = data_col.server_default - if sd is not None and isinstance(sd, Identity): - if paramname == "nullable" and value: - is_(data_col.nullable, True) - else: - is_(data_col.nullable, False) - elif paramname != "nullable": - is_(data_col.nullable, optional) - else: - is_(data_col.nullable, value) + def declare(): + class Test(decl_base): + __tablename__ = "test" + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[tat] - @testing.combinations(True, False, argnames="specify_identity") - @testing.combinations(True, False, None, argnames="specify_nullable") - @testing.combinations(True, False, argnames="optional") - @testing.combinations(True, False, argnames="include_existing_col") - def test_combine_args_from_pep593_identity_nullable( - self, - decl_base: Type[DeclarativeBase], - specify_identity, - specify_nullable, - optional, - include_existing_col, - ): - # anno only: global intpk, element_ref - intpk = Annotated[int, mapped_column(primary_key=True)] + return Test.__table__.c.data - if specify_identity: - args = [Identity()] + if in_map.yes or (in_map.value and not is_newtype): + col = declare() + # String(99) inside the type_map + is_true(isinstance(col.type, String)) + eq_(col.type.length, 99) + nullable = "null" in option.name or "optional" in option.name + eq_(col.nullable, nullable) + elif option.plain or option.optional: + col = declare() + # plain string from default lookup + is_true(isinstance(col.type, String)) + eq_(col.type.length, None) + nullable = "null" in option.name or "optional" in option.name + eq_(col.nullable, nullable) else: - args = [] + with expect_raises_message( + orm_exc.MappedAnnotationError, + r"Could not locate SQLAlchemy Core type when resolving " + r"for Python type " + r"indicated by '.*tat' inside the Mapped\[\] " + r"annotation for the 'data' attribute;", + ): + declare() - if specify_nullable is not None: - params = {"nullable": specify_nullable} - else: - params = {} + @testing.variation("in_map", ["yes", "no", "value"]) + @testing.variation("lookup", ["A", "B", "value"]) + def test_recursive_pep695_cases( + self, decl_base, in_map: Variation, lookup: Variation + ): + # anno only: global A, B + A = TypingTypeAliasType("A", Union[int, float]) + B = TypingTypeAliasType("B", A) - element_ref = Annotated[int, mapped_column(*args, **params)] - if optional: - element_ref = Optional[element_ref] + if in_map.yes: + decl_base.registry.update_type_annotation_map({A: Numeric(10, 5)}) + elif in_map.value: + decl_base.registry.update_type_annotation_map( + {A.__value__: Numeric(10, 5)} + ) - class Element(decl_base): - __tablename__ = "element" + def declare(): + class MyClass(decl_base): + __tablename__ = "my_table" + id: Mapped[int] = mapped_column(primary_key=True) - id: Mapped[intpk] + if lookup.A: + data: Mapped[A] + elif lookup.B: + data: Mapped[B] + elif lookup.value: + data: Mapped[Union[int, float]] + else: + lookup.fail() - if include_existing_col: - data: Mapped[element_ref] = mapped_column() - else: - data: Mapped[element_ref] + return MyClass - # test identity + _copy() for #8410 - for col in ( - Element.__table__.c.data, - Element.__table__.c.data._copy(), + if ( + (in_map.value and lookup.B) + or in_map.no + or (in_map.yes and lookup.value) ): - if specify_nullable is True: - is_(col.nullable, True) - elif specify_identity: - is_(col.nullable, False) - elif specify_nullable is False: - is_(col.nullable, False) - elif not optional: - is_(col.nullable, False) - else: - is_(col.nullable, True) + with expect_raises_message( + orm_exc.MappedAnnotationError, + "Could not locate SQLAlchemy Core type when resolving " + "for Python type indicated by", + ): + declare() + else: + MyClass = declare() + eq_(MyClass.data.expression.type.precision, 10) - @testing.combinations( - ("default", lambda ctx: 10, lambda ctx: 15), - ("default", func.foo(), func.bar()), - ("onupdate", lambda ctx: 10, lambda ctx: 15), - ("onupdate", func.foo(), func.bar()), - ("server_onupdate", func.foo(), func.bar()), - ("server_default", func.foo(), func.bar()), - ("nullable", True, False), - ("nullable", False, True), - ("type", BigInteger(), Numeric()), - argnames="paramname, value, override_value", + @testing.variation( + "type_", + [ + "str_extension", + "str_typing", + "generic_extension", + "generic_typing", + "generic_typed_extension", + "generic_typed_typing", + ], ) - def test_dont_combine_args_from_pep593( - self, - decl_base: Type[DeclarativeBase], - paramname, - value, - override_value, + @testing.requires.python312 + def test_pep695_typealias_as_typemap_keys( + self, decl_base: Type[DeclarativeBase], type_ ): - # anno only: global intpk, element_ref - intpk = Annotated[int, mapped_column(primary_key=True)] - - args = [] - params = {} - override_args = [] - override_params = {} - if paramname == "type": - args.append(value) - override_args.append(override_value) - else: - params[paramname] = value - if paramname == "default": - override_params["insert_default"] = override_value - else: - override_params[paramname] = override_value + """test #10807, #12829""" - element_ref = Annotated[int, mapped_column(*args, **params)] - - class Element(decl_base): - __tablename__ = "element" + decl_base.registry.update_type_annotation_map( + { + _UnionPep695: JSON, + _StrPep695: String(30), + _TypingStrPep695: String(30), + _GenericPep695: String(30), + _TypingGenericPep695: String(30), + _GenericPep695Typed: String(30), + _TypingGenericPep695Typed: String(30), + } + ) - id: Mapped[intpk] + class Test(decl_base): + __tablename__ = "test" + id: Mapped[int] = mapped_column(primary_key=True) + if type_.str_extension: + data: Mapped[_StrPep695] + elif type_.str_typing: + data: Mapped[_TypingStrPep695] + elif type_.generic_extension: + data: Mapped[_GenericPep695] + elif type_.generic_typing: + data: Mapped[_TypingGenericPep695] + elif type_.generic_typed_extension: + data: Mapped[_GenericPep695Typed] + elif type_.generic_typed_typing: + data: Mapped[_TypingGenericPep695Typed] + else: + type_.fail() + structure: Mapped[_UnionPep695] - data: Mapped[element_ref] = mapped_column( - *override_args, **override_params - ) + eq_(Test.__table__.c.data.type._type_affinity, String) + eq_(Test.__table__.c.data.type.length, 30) + is_(Test.__table__.c.structure.type._type_affinity, JSON) - if paramname in ( - "default", - "onupdate", - "server_default", - "server_onupdate", - ): - default = getattr(Element.__table__.c.data, paramname) - is_(default.arg, override_value) - is_(default.column, Element.__table__.c.data) - elif paramname == "type": - assert type(Element.__table__.c.data.type) is type(override_value) - else: - is_(getattr(Element.__table__.c.data, paramname), override_value) + def test_pep484_newtypes_as_typemap_keys( + self, decl_base: Type[DeclarativeBase] + ): + # anno only: global str50, str30, str3050 - def test_use_existing_column_from_pep_593(self, decl_base): - """test #12787""" + str50 = NewType("str50", str) + str30 = NewType("str30", str) + str3050 = NewType("str30", str50) - # anno only: global Label - Label = Annotated[ - str, mapped_column(String(20), use_existing_column=True) - ] + decl_base.registry.update_type_annotation_map( + {str50: String(50), str30: String(30), str3050: String(150)} + ) - class A(decl_base): - __tablename__ = "table_a" + class MyClass(decl_base): + __tablename__ = "my_table" - id: Mapped[int] = mapped_column(primary_key=True) - discriminator: Mapped[int] + id: Mapped[str50] = mapped_column(primary_key=True) + data_one: Mapped[str30] + data_two: Mapped[str50] + data_three: Mapped[Optional[str30]] + data_four: Mapped[str3050] - __mapper_args__ = { - "polymorphic_on": "discriminator", - "polymorphic_abstract": True, - } + eq_(MyClass.__table__.c.data_one.type.length, 30) + is_false(MyClass.__table__.c.data_one.nullable) - class A_1(A): - label: Mapped[Label] + eq_(MyClass.__table__.c.data_two.type.length, 50) + is_false(MyClass.__table__.c.data_two.nullable) - __mapper_args__ = {"polymorphic_identity": 1} + eq_(MyClass.__table__.c.data_three.type.length, 30) + is_true(MyClass.__table__.c.data_three.nullable) - class A_2(A): - label: Mapped[Label] + eq_(MyClass.__table__.c.data_four.type.length, 150) + is_false(MyClass.__table__.c.data_four.nullable) - __mapper_args__ = {"polymorphic_identity": 2} + def test_newtype_missing_from_map(self, decl_base): + # anno only: global str50 - is_(A_1.label.property.columns[0], A_2.label.property.columns[0]) + str50 = NewType("str50", str) - eq_(A_1.label.property.columns[0].table, A.__table__) - eq_(A_2.label.property.columns[0].table, A.__table__) + with expect_raises_message( + orm_exc.MappedAnnotationError, + "Could not locate SQLAlchemy Core type when resolving for Python " + r"type indicated by '.*.str50' inside the Mapped\[\] annotation " + "for the 'data_one' attribute; the type object is not " + "resolvable by the registry", + ): + + class MyClass(decl_base): + __tablename__ = "my_table" + + id: Mapped[int] = mapped_column(primary_key=True) + data_one: Mapped[str50] @testing.variation( "union", @@ -2333,300 +2626,96 @@ class MappedColumnTest(fixtures.TestBase, testing.AssertsCompiledSQL): json: Mapped[Optional[Union[List[int], List[str]]]] = mc json2: Mapped[Optional[Union[list[int], list[str]]]] = mc2 elif option.optional_fwd_ref: - json: Mapped["Optional[Union[List[int], List[str]]]"] = mc - json2: Mapped["Optional[Union[list[int], list[str]]]"] = ( - mc2 - ) - elif option.union_none: - json: Mapped[Union[List[int], List[str], None]] = mc - json2: Mapped[Union[None, list[int], list[str]]] = mc2 - elif option.pep604: - json: Mapped[list[int] | list[str] | None] = mc - json2: Mapped[None | list[int] | list[str]] = mc2 - elif option.pep604_fwd_ref: - json: Mapped["list[int] | list[str] | None"] = mc - json2: Mapped["None | list[int] | list[str]"] = mc2 - else: - brackets.fail() - - is_(A.__table__.c.json.type._type_affinity, JSON) - if hasattr(A, "json2"): - is_(A.__table__.c.json2.type._type_affinity, JSON) - if option.not_optional: - is_false(A.__table__.c.json2.nullable) - else: - is_true(A.__table__.c.json2.nullable) - - if option.not_optional: - is_false(A.__table__.c.json.nullable) - else: - is_true(A.__table__.c.json.nullable) - - @testing.variation("optional", [True, False]) - @testing.variation("provide_type", [True, False]) - @testing.variation("add_to_type_map", [True, False]) - def test_recursive_type( - self, decl_base, optional, provide_type, add_to_type_map - ): - """test #9553""" - - global T - - T = Dict[str, Optional["T"]] - - if not provide_type and not add_to_type_map: - with expect_raises_message( - sa_exc.ArgumentError, - r"Could not locate SQLAlchemy.*" r".*ForwardRef\('T'\).*", - ): - - class TypeTest(decl_base): - __tablename__ = "my_table" - - id: Mapped[int] = mapped_column(primary_key=True) - if optional: - type_test: Mapped[Optional[T]] = mapped_column() - else: - type_test: Mapped[T] = mapped_column() - - return - - else: - if add_to_type_map: - decl_base.registry.update_type_annotation_map({T: JSON()}) - - class TypeTest(decl_base): - __tablename__ = "my_table" - - id: Mapped[int] = mapped_column(primary_key=True) - - if add_to_type_map: - if optional: - type_test: Mapped[Optional[T]] = mapped_column() - else: - type_test: Mapped[T] = mapped_column() - else: - if optional: - type_test: Mapped[Optional[T]] = mapped_column(JSON()) - else: - type_test: Mapped[T] = mapped_column(JSON()) - - if optional: - is_(TypeTest.__table__.c.type_test.nullable, True) - else: - is_(TypeTest.__table__.c.type_test.nullable, False) - - self.assert_compile( - select(TypeTest), - "SELECT my_table.id, my_table.type_test FROM my_table", - ) - - def test_missing_mapped_lhs(self, decl_base): - with expect_annotation_syntax_error("User.name"): - - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - name: str = mapped_column() # type: ignore - - def test_construct_lhs_separate_name(self, decl_base): - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - name: Mapped[str] = mapped_column() - data: Mapped[Optional[str]] = mapped_column("the_data") - - self.assert_compile( - select(User.data), "SELECT users.the_data FROM users" - ) - is_true(User.__table__.c.the_data.nullable) - - def test_construct_works_in_expr(self, decl_base): - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - - class Address(decl_base): - __tablename__ = "addresses" - - id: Mapped[int] = mapped_column(primary_key=True) - user_id: Mapped[int] = mapped_column(ForeignKey("users.id")) - - user = relationship(User, primaryjoin=user_id == User.id) - - self.assert_compile( - select(Address.user_id, User.id).join(Address.user), - "SELECT addresses.user_id, users.id FROM addresses " - "JOIN users ON addresses.user_id = users.id", - ) - - def test_construct_works_as_polymorphic_on(self, decl_base): - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - type: Mapped[str] = mapped_column() - - __mapper_args__ = {"polymorphic_on": type} - - decl_base.registry.configure() - is_(User.__table__.c.type, User.__mapper__.polymorphic_on) - - def test_construct_works_as_version_id_col(self, decl_base): - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - version_id: Mapped[int] = mapped_column() - - __mapper_args__ = {"version_id_col": version_id} - - decl_base.registry.configure() - is_(User.__table__.c.version_id, User.__mapper__.version_id_col) - - def test_construct_works_in_deferred(self, decl_base): - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[str] = deferred(mapped_column()) - - self.assert_compile(select(User), "SELECT users.id FROM users") - self.assert_compile( - select(User).options(undefer(User.data)), - "SELECT users.id, users.data FROM users", - ) - - def test_deferred_kw(self, decl_base): - class User(decl_base): - __tablename__ = "users" - - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[str] = mapped_column(deferred=True) - - self.assert_compile(select(User), "SELECT users.id FROM users") - self.assert_compile( - select(User).options(undefer(User.data)), - "SELECT users.id, users.data FROM users", - ) - - @testing.combinations( - (str, types.String), - (Decimal, types.Numeric), - (float, types.Float), - (datetime.datetime, types.DateTime), - (uuid.UUID, types.Uuid), - argnames="pytype_arg,sqltype", - ) - def test_datatype_lookups(self, decl_base, pytype_arg, sqltype): - # anno only: global pytype - pytype = pytype_arg - - class MyClass(decl_base): - __tablename__ = "mytable" - id: Mapped[int] = mapped_column(primary_key=True) - - data: Mapped[pytype] - - assert isinstance(MyClass.__table__.c.data.type, sqltype) + json: Mapped["Optional[Union[List[int], List[str]]]"] = mc + json2: Mapped["Optional[Union[list[int], list[str]]]"] = ( + mc2 + ) + elif option.union_none: + json: Mapped[Union[List[int], List[str], None]] = mc + json2: Mapped[Union[None, list[int], list[str]]] = mc2 + elif option.pep604: + json: Mapped[list[int] | list[str] | None] = mc + json2: Mapped[None | list[int] | list[str]] = mc2 + elif option.pep604_fwd_ref: + json: Mapped["list[int] | list[str] | None"] = mc + json2: Mapped["None | list[int] | list[str]"] = mc2 + else: + brackets.fail() - def test_dont_ignore_unresolvable(self, decl_base): - """test #8888""" + is_(A.__table__.c.json.type._type_affinity, JSON) + if hasattr(A, "json2"): + is_(A.__table__.c.json2.type._type_affinity, JSON) + if option.not_optional: + is_false(A.__table__.c.json2.nullable) + else: + is_true(A.__table__.c.json2.nullable) - with expect_raises_message( - sa_exc.ArgumentError, - r"Could not resolve all types within mapped annotation: " - r"\".*Mapped\[.*fake.*\]\". Ensure all types are written " - r"correctly and are imported within the module in use.", - ): + if option.not_optional: + is_false(A.__table__.c.json.nullable) + else: + is_true(A.__table__.c.json.nullable) - class A(decl_base): - __tablename__ = "a" + @testing.variation("optional", [True, False]) + @testing.variation("provide_type", [True, False]) + @testing.variation("add_to_type_map", [True, False]) + def test_recursive_type( + self, decl_base, optional, provide_type, add_to_type_map + ): + """test #9553""" - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped["fake"] # noqa + global T - def test_type_dont_mis_resolve_on_superclass(self): - """test for #8859. + T = Dict[str, Optional["T"]] - For subclasses of a type that's in the map, don't resolve this - by default, even though we do a search through __mro__. + if not provide_type and not add_to_type_map: + with expect_raises_message( + sa_exc.ArgumentError, + r"Could not locate SQLAlchemy.*" r".*ForwardRef\('T'\).*", + ): - """ - # anno only: global int_sub + class TypeTest(decl_base): + __tablename__ = "my_table" - class int_sub(int): - pass + id: Mapped[int] = mapped_column(primary_key=True) + if optional: + type_test: Mapped[Optional[T]] = mapped_column() + else: + type_test: Mapped[T] = mapped_column() - Base = declarative_base( - type_annotation_map={ - int: Integer, - } - ) + return - with expect_raises_message( - orm_exc.MappedAnnotationError, - "Could not locate SQLAlchemy Core type", - ): + else: + if add_to_type_map: + decl_base.registry.update_type_annotation_map({T: JSON()}) - class MyClass(Base): - __tablename__ = "mytable" + class TypeTest(decl_base): + __tablename__ = "my_table" id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[int_sub] - - @testing.variation("dict_key", ["typing", "plain"]) - def test_type_dont_mis_resolve_on_non_generic(self, dict_key): - """test for #8859. - For a specific generic type with arguments, don't do any MRO - lookup. + if add_to_type_map: + if optional: + type_test: Mapped[Optional[T]] = mapped_column() + else: + type_test: Mapped[T] = mapped_column() + else: + if optional: + type_test: Mapped[Optional[T]] = mapped_column(JSON()) + else: + type_test: Mapped[T] = mapped_column(JSON()) - """ + if optional: + is_(TypeTest.__table__.c.type_test.nullable, True) + else: + is_(TypeTest.__table__.c.type_test.nullable, False) - Base = declarative_base( - type_annotation_map={ - dict: String, - } + self.assert_compile( + select(TypeTest), + "SELECT my_table.id, my_table.type_test FROM my_table", ) - with expect_raises_message( - sa_exc.ArgumentError, "Could not locate SQLAlchemy Core type" - ): - - class MyClass(Base): - __tablename__ = "mytable" - - id: Mapped[int] = mapped_column(primary_key=True) - - if dict_key.plain: - data: Mapped[dict[str, str]] - elif dict_key.typing: - data: Mapped[Dict[str, str]] - - def test_type_secondary_resolution(self): - class MyString(String): - def _resolve_for_python_type( - self, python_type, matched_type, matched_on_flattened - ): - return String(length=42) - - Base = declarative_base(type_annotation_map={str: MyString}) - - class MyClass(Base): - __tablename__ = "mytable" - - id: Mapped[int] = mapped_column(primary_key=True) - data: Mapped[str] - - is_true(isinstance(MyClass.__table__.c.data.type, String)) - eq_(MyClass.__table__.c.data.type.length, 42) - -class EnumOrLiteralTypeMapTest(fixtures.TestBase, testing.AssertsCompiledSQL): +class ResolveToEnumTest(fixtures.TestBase, testing.AssertsCompiledSQL): __dialect__ = "default" @testing.variation("use_explicit_name", [True, False]) @@ -2757,6 +2846,102 @@ class EnumOrLiteralTypeMapTest(fixtures.TestBase, testing.AssertsCompiledSQL): is_(MyClass.__table__.c.data.type.enum_class, FooEnum) eq_(MyClass.__table__.c.data.type.name, "fooenum") # and not 'enum' + @testing.variation( + "type_", + [ + "literal", + "literal_typing", + "recursive", + "not_literal", + "not_literal_typing", + "generic", + "generic_typing", + "generic_typed", + "generic_typed_typing", + ], + ) + @testing.combinations(True, False, argnames="in_map") + @testing.requires.python312 + def test_pep695_literal_defaults_to_enum(self, decl_base, type_, in_map): + """test #11305.""" + + def declare(): + class Foo(decl_base): + __tablename__ = "footable" + + id: Mapped[int] = mapped_column(primary_key=True) + if type_.recursive: + status: Mapped[_RecursiveLiteral695] # noqa: F821 + elif type_.literal: + status: Mapped[_Literal695] # noqa: F821 + elif type_.literal_typing: + status: Mapped[_TypingLiteral695] # noqa: F821 + elif type_.not_literal: + status: Mapped[_StrPep695] # noqa: F821 + elif type_.not_literal_typing: + status: Mapped[_TypingStrPep695] # noqa: F821 + elif type_.generic: + status: Mapped[_GenericPep695] # noqa: F821 + elif type_.generic_typing: + status: Mapped[_TypingGenericPep695] # noqa: F821 + elif type_.generic_typed: + status: Mapped[_GenericPep695Typed] # noqa: F821 + elif type_.generic_typed_typing: + status: Mapped[_TypingGenericPep695Typed] # noqa: F821 + else: + type_.fail() + + return Foo + + if in_map: + decl_base.registry.update_type_annotation_map( + { + _Literal695: Enum(enum.Enum), # noqa: F821 + _TypingLiteral695: Enum(enum.Enum), # noqa: F821 + _RecursiveLiteral695: Enum(enum.Enum), # noqa: F821 + _StrPep695: Enum(enum.Enum), # noqa: F821 + _TypingStrPep695: Enum(enum.Enum), # noqa: F821 + _GenericPep695: Enum(enum.Enum), # noqa: F821 + _TypingGenericPep695: Enum(enum.Enum), # noqa: F821 + _GenericPep695Typed: Enum(enum.Enum), # noqa: F821 + _TypingGenericPep695Typed: Enum(enum.Enum), # noqa: F821 + } + ) + if type_.literal or type_.literal_typing: + Foo = declare() + col = Foo.__table__.c.status + is_true(isinstance(col.type, Enum)) + eq_(col.type.enums, ["to-do", "in-progress", "done"]) + is_(col.type.native_enum, False) + else: + with expect_raises_message( + exc.ArgumentError, + "Can't associate TypeAliasType '.+' to an Enum " + "since it's not a direct alias of a Literal. Only " + "aliases in this form `type my_alias = Literal.'a', " + "'b'.` are supported when generating Enums.", + ): + declare() + elif type_.literal or type_.literal_typing: + Foo = declare() + col = Foo.__table__.c.status + is_true(isinstance(col.type, Enum)) + eq_(col.type.enums, ["to-do", "in-progress", "done"]) + is_(col.type.native_enum, False) + elif type_.not_literal or type_.not_literal_typing: + Foo = declare() + col = Foo.__table__.c.status + is_true(isinstance(col.type, String)) + else: + with expect_raises_message( + orm_exc.MappedAnnotationError, + r"Could not locate SQLAlchemy Core type when resolving " + r"for Python type " + r"indicated by '.+' inside the Mapped\[\] " + r"annotation for the 'status' attribute", + ): + declare() + @testing.variation( "sqltype", [ @@ -3836,32 +4021,6 @@ class CompositeTest(fixtures.TestBase, testing.AssertsCompiledSQL): mapped_column(), mapped_column(), mapped_column("zip") ) - def test_extract_from_pep593(self, decl_base): - # anno only: global Address - - @dataclasses.dataclass - class Address: - street: str - state: str - zip_: str - - class User(decl_base): - __tablename__ = "user" - - id: Mapped[int] = mapped_column(primary_key=True) - name: Mapped[str] = mapped_column() - - address: Mapped[Annotated[Address, "foo"]] = composite( - mapped_column(), mapped_column(), mapped_column("zip") - ) - - self.assert_compile( - select(User), - 'SELECT "user".id, "user".name, "user".street, ' - '"user".state, "user".zip FROM "user"', - dialect="default", - ) - def test_cls_not_composite_compliant(self, decl_base): # anno only: global Address -- 2.47.3