]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
return empty MappedColumn() at early pep-593 step
authorMike Bayer <mike_mp@zzzcomputing.com>
Thu, 11 May 2023 02:03:18 +0000 (22:03 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Thu, 11 May 2023 02:05:45 +0000 (22:05 -0400)
Fixed issue in new ORM Annotated Declarative where using a
:class:`_schema.ForeignKey` (or other column-level constraint) inside of
:func:`_orm.mapped_column` which is then copied out to models via pep-593
``Annotated`` would apply duplicates of each constraint to the
:class:`_schema.Column` as produced in the target :class:`_schema.Table`,
leading to incorrect CREATE TABLE DDL as well as migration directives under
Alembic.

Fixes: #9766
Change-Id: I8a3b2716bf393d1d2b5894f9f72b45fa59df1e08

doc/build/changelog/unreleased_20/9766.rst [new file with mode: 0644]
lib/sqlalchemy/orm/properties.py
test/orm/declarative/test_tm_future_annotations_sync.py
test/orm/declarative/test_typed_mapping.py

diff --git a/doc/build/changelog/unreleased_20/9766.rst b/doc/build/changelog/unreleased_20/9766.rst
new file mode 100644 (file)
index 0000000..588ce17
--- /dev/null
@@ -0,0 +1,11 @@
+.. change::
+    :tags: bug, orm
+    :tickets: 9766
+
+    Fixed issue in new ORM Annotated Declarative where using a
+    :class:`_schema.ForeignKey` (or other column-level constraint) inside of
+    :func:`_orm.mapped_column` which is then copied out to models via pep-593
+    ``Annotated`` would apply duplicates of each constraint to the
+    :class:`_schema.Column` as produced in the target :class:`_schema.Table`,
+    leading to incorrect CREATE TABLE DDL as well as migration directives under
+    Alembic.
index 916b9d901d88332c6a5fa5cfe30e7b5d14ce4bc5..dd53a953627a9476599668842b2a7dba1dbb6eee 100644 (file)
@@ -592,7 +592,6 @@ class MappedColumn(
             None,
             SchemaConst.NULL_UNSPECIFIED,
         )
-
         util.set_creation_order(self)
 
     def _copy(self, **kw: Any) -> Self:
@@ -649,7 +648,9 @@ class MappedColumn(
         return op(col._bind_param(op, other), col, **kwargs)  # type: ignore[return-value]  # noqa: E501
 
     def found_in_pep593_annotated(self) -> Any:
-        return self._copy()
+        # return a blank mapped_column().  This mapped_column()'s
+        # Column will be merged into it in _init_column_for_annotation().
+        return MappedColumn()
 
     def declarative_scan(
         self,
@@ -751,13 +752,15 @@ class MappedColumn(
 
         if is_pep593(our_type):
             our_type_is_pep593 = True
+
             pep_593_components = typing_get_args(our_type)
             raw_pep_593_type = pep_593_components[0]
             if is_optional_union(raw_pep_593_type):
+                raw_pep_593_type = de_optionalize_union_types(raw_pep_593_type)
+
                 nullable = True
                 if not self._has_nullable:
                     self.column.nullable = nullable
-                raw_pep_593_type = de_optionalize_union_types(raw_pep_593_type)
             for elem in pep_593_components[1:]:
                 if isinstance(elem, MappedColumn):
                     use_args_from = elem
@@ -772,6 +775,7 @@ class MappedColumn(
                 and use_args_from.column.default is not None
             ):
                 self.column.default = None
+
             use_args_from.column._merge(self.column)
             sqltype = self.column.type
 
index c1eecf9707e1d2126dc4b35260e90fc218b45239..dd2c1cc7e6e08c4c02d736791c3035ad4aa5d7c2 100644 (file)
@@ -812,8 +812,10 @@ class MappedColumnTest(fixtures.TestBase, testing.AssertsCompiledSQL):
         is_true(table.c.data_three.nullable)
         is_true(table.c.data_four.nullable)
 
+    @testing.variation("to_assert", ["ddl", "fkcount", "references"])
+    @testing.variation("assign_blank", [True, False])
     def test_extract_fk_col_from_pep593(
-        self, decl_base: Type[DeclarativeBase]
+        self, decl_base: Type[DeclarativeBase], to_assert, assign_blank
     ):
         global intpk, element_ref
         intpk = Annotated[int, mapped_column(primary_key=True)]
@@ -828,28 +830,58 @@ class MappedColumnTest(fixtures.TestBase, testing.AssertsCompiledSQL):
             __tablename__ = "refone"
 
             id: Mapped[intpk]
-            other_id: Mapped[element_ref]
+
+            if assign_blank:
+                other_id: Mapped[element_ref] = mapped_column()
+            else:
+                other_id: Mapped[element_ref]
 
         class RefElementTwo(decl_base):
             __tablename__ = "reftwo"
 
             id: Mapped[intpk]
-            some_id: Mapped[element_ref]
+            if assign_blank:
+                some_id: Mapped[element_ref] = mapped_column()
+            else:
+                some_id: Mapped[element_ref]
 
         assert Element.__table__ is not None
         assert RefElementOne.__table__ is not None
         assert RefElementTwo.__table__ is not None
 
-        is_true(
-            RefElementOne.__table__.c.other_id.references(
-                Element.__table__.c.id
+        if to_assert.fkcount:
+            # test #9766
+            eq_(len(RefElementOne.__table__.c.other_id.foreign_keys), 1)
+            eq_(len(RefElementTwo.__table__.c.some_id.foreign_keys), 1)
+        elif to_assert.references:
+            is_true(
+                RefElementOne.__table__.c.other_id.references(
+                    Element.__table__.c.id
+                )
             )
-        )
-        is_true(
-            RefElementTwo.__table__.c.some_id.references(
-                Element.__table__.c.id
+            is_true(
+                RefElementTwo.__table__.c.some_id.references(
+                    Element.__table__.c.id
+                )
             )
-        )
+
+        elif to_assert.ddl:
+            self.assert_compile(
+                CreateTable(RefElementOne.__table__),
+                "CREATE TABLE refone "
+                "(id INTEGER NOT NULL, other_id INTEGER NOT NULL, "
+                "PRIMARY KEY (id), "
+                "FOREIGN KEY(other_id) REFERENCES element (id))",
+            )
+            self.assert_compile(
+                CreateTable(RefElementTwo.__table__),
+                "CREATE TABLE reftwo "
+                "(id INTEGER NOT NULL, some_id INTEGER NOT NULL, "
+                "PRIMARY KEY (id), "
+                "FOREIGN KEY(some_id) REFERENCES element (id))",
+            )
+        else:
+            to_assert.fail()
 
     @testing.combinations(
         (collections.abc.Sequence, (str,), testing.requires.python310),
index 514a4335f2ff936ca2ea3f0b1acfac5124d3d8e9..bd7fa1667a1b281972a606c28a0b3f104b87e9ea 100644 (file)
@@ -803,8 +803,10 @@ class MappedColumnTest(fixtures.TestBase, testing.AssertsCompiledSQL):
         is_true(table.c.data_three.nullable)
         is_true(table.c.data_four.nullable)
 
+    @testing.variation("to_assert", ["ddl", "fkcount", "references"])
+    @testing.variation("assign_blank", [True, False])
     def test_extract_fk_col_from_pep593(
-        self, decl_base: Type[DeclarativeBase]
+        self, decl_base: Type[DeclarativeBase], to_assert, assign_blank
     ):
         # anno only: global intpk, element_ref
         intpk = Annotated[int, mapped_column(primary_key=True)]
@@ -819,28 +821,58 @@ class MappedColumnTest(fixtures.TestBase, testing.AssertsCompiledSQL):
             __tablename__ = "refone"
 
             id: Mapped[intpk]
-            other_id: Mapped[element_ref]
+
+            if assign_blank:
+                other_id: Mapped[element_ref] = mapped_column()
+            else:
+                other_id: Mapped[element_ref]
 
         class RefElementTwo(decl_base):
             __tablename__ = "reftwo"
 
             id: Mapped[intpk]
-            some_id: Mapped[element_ref]
+            if assign_blank:
+                some_id: Mapped[element_ref] = mapped_column()
+            else:
+                some_id: Mapped[element_ref]
 
         assert Element.__table__ is not None
         assert RefElementOne.__table__ is not None
         assert RefElementTwo.__table__ is not None
 
-        is_true(
-            RefElementOne.__table__.c.other_id.references(
-                Element.__table__.c.id
+        if to_assert.fkcount:
+            # test #9766
+            eq_(len(RefElementOne.__table__.c.other_id.foreign_keys), 1)
+            eq_(len(RefElementTwo.__table__.c.some_id.foreign_keys), 1)
+        elif to_assert.references:
+            is_true(
+                RefElementOne.__table__.c.other_id.references(
+                    Element.__table__.c.id
+                )
             )
-        )
-        is_true(
-            RefElementTwo.__table__.c.some_id.references(
-                Element.__table__.c.id
+            is_true(
+                RefElementTwo.__table__.c.some_id.references(
+                    Element.__table__.c.id
+                )
             )
-        )
+
+        elif to_assert.ddl:
+            self.assert_compile(
+                CreateTable(RefElementOne.__table__),
+                "CREATE TABLE refone "
+                "(id INTEGER NOT NULL, other_id INTEGER NOT NULL, "
+                "PRIMARY KEY (id), "
+                "FOREIGN KEY(other_id) REFERENCES element (id))",
+            )
+            self.assert_compile(
+                CreateTable(RefElementTwo.__table__),
+                "CREATE TABLE reftwo "
+                "(id INTEGER NOT NULL, some_id INTEGER NOT NULL, "
+                "PRIMARY KEY (id), "
+                "FOREIGN KEY(some_id) REFERENCES element (id))",
+            )
+        else:
+            to_assert.fail()
 
     @testing.combinations(
         (collections.abc.Sequence, (str,), testing.requires.python310),