]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- The "auto-attach" feature of constraints such as :class:`.UniqueConstraint`
authorMike Bayer <mike_mp@zzzcomputing.com>
Tue, 24 Mar 2015 14:55:29 +0000 (10:55 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Tue, 24 Mar 2015 14:55:29 +0000 (10:55 -0400)
and :class:`.CheckConstraint` has been further enhanced such that
when the constraint is associated with non-table-bound :class:`.Column`
objects, the constraint will set up event listeners with the
columns themselves such that the constraint auto attaches at the
same time the columns are associated with the table.  This in particular
helps in some edge cases in declarative but is also of general use.
fixes #3341

doc/build/changelog/changelog_10.rst
lib/sqlalchemy/sql/schema.py
test/ext/declarative/test_inheritance.py
test/sql/test_constraints.py

index 2caa48a5e8db7c1c0526f152f32e3c4e557e8cc1..3bcb4d6cfcb7b3ffc58720609e1845e6b43bf427 100644 (file)
 .. changelog::
     :version: 1.0.0b4
 
+    .. change::
+        :tags: feature, schema
+        :tickets: 3341
+
+        The "auto-attach" feature of constraints such as :class:`.UniqueConstraint`
+        and :class:`.CheckConstraint` has been further enhanced such that
+        when the constraint is associated with non-table-bound :class:`.Column`
+        objects, the constraint will set up event listeners with the
+        columns themselves such that the constraint auto attaches at the
+        same time the columns are associated with the table.  This in particular
+        helps in some edge cases in declarative but is also of general use.
+
     .. change::
         :tags: bug, sql
         :tickets: 3340
index e022c57682dcbca8150267a069c71dab7896f591..3aeba9804055a08a0b4f5962f85d662a23345b88 100644 (file)
@@ -2390,24 +2390,44 @@ class ColumnCollectionMixin(object):
         self._pending_colargs = [_to_schema_column_or_string(c)
                                  for c in columns]
         if _autoattach and self._pending_colargs:
-            columns = [
-                c for c in self._pending_colargs
-                if isinstance(c, Column) and
-                isinstance(c.table, Table)
-            ]
+            self._check_attach()
 
-            tables = set([c.table for c in columns])
-            if len(tables) == 1:
-                self._set_parent_with_dispatch(tables.pop())
-            elif len(tables) > 1 and not self._allow_multiple_tables:
-                table = columns[0].table
-                others = [c for c in columns[1:] if c.table is not table]
-                if others:
-                    raise exc.ArgumentError(
-                        "Column(s) %s are not part of table '%s'." %
-                        (", ".join("'%s'" % c for c in others),
-                            table.description)
-                    )
+    def _check_attach(self, evt=False):
+        col_objs = [
+            c for c in self._pending_colargs
+            if isinstance(c, Column)
+        ]
+        cols_w_table = [
+            c for c in col_objs if isinstance(c.table, Table)
+        ]
+        cols_wo_table = set(col_objs).difference(cols_w_table)
+
+        if cols_wo_table:
+            assert not evt, "Should not reach here on event call"
+
+            def _col_attached(column, table):
+                cols_wo_table.discard(column)
+                if not cols_wo_table:
+                    self._check_attach(evt=True)
+            self._cols_wo_table = cols_wo_table
+            for col in cols_wo_table:
+                col._on_table_attach(_col_attached)
+            return
+
+        columns = cols_w_table
+
+        tables = set([c.table for c in columns])
+        if len(tables) == 1:
+            self._set_parent_with_dispatch(tables.pop())
+        elif len(tables) > 1 and not self._allow_multiple_tables:
+            table = columns[0].table
+            others = [c for c in columns[1:] if c.table is not table]
+            if others:
+                raise exc.ArgumentError(
+                    "Column(s) %s are not part of table '%s'." %
+                    (", ".join("'%s'" % c for c in others),
+                        table.description)
+                )
 
     def _set_parent(self, table):
         for col in self._pending_colargs:
index 6ea37e4d3b13843e8fbc3567f551004e2892cf80..2ecee99fdd39eab34a2c4b3ac91b29d037cf0a31 100644 (file)
@@ -485,6 +485,41 @@ class DeclarativeInheritanceTest(DeclarativeTestBase):
                                            ).one(),
             Engineer(name='vlad', primary_language='cobol'))
 
+    def test_single_constraint_on_sub(self):
+        """test the somewhat unusual case of [ticket:3341]"""
+
+        class Person(Base, fixtures.ComparableEntity):
+
+            __tablename__ = 'people'
+            id = Column(Integer, primary_key=True,
+                        test_needs_autoincrement=True)
+            name = Column(String(50))
+            discriminator = Column('type', String(50))
+            __mapper_args__ = {'polymorphic_on': discriminator}
+
+        class Engineer(Person):
+
+            __mapper_args__ = {'polymorphic_identity': 'engineer'}
+            primary_language = Column(String(50))
+
+            __hack_args_one__ = sa.UniqueConstraint(
+                Person.name, primary_language)
+            __hack_args_two__ = sa.CheckConstraint(
+                Person.name != primary_language)
+
+        uq = [c for c in Person.__table__.constraints
+              if isinstance(c, sa.UniqueConstraint)][0]
+        ck = [c for c in Person.__table__.constraints
+              if isinstance(c, sa.CheckConstraint)][0]
+        eq_(
+            list(uq.columns),
+            [Person.__table__.c.name, Person.__table__.c.primary_language]
+        )
+        eq_(
+            list(ck.columns),
+            [Person.__table__.c.name, Person.__table__.c.primary_language]
+        )
+
     @testing.skip_if(lambda: testing.against('oracle'),
                      "Test has an empty insert in it at the moment")
     def test_columns_single_inheritance_conflict_resolution(self):
index eb558fc9510595ff5e0aaf84df21af25b7882d53..d024e1a27f7d52b72fa1eb813466f58b98ba9c05 100644 (file)
@@ -1052,6 +1052,103 @@ class ConstraintAPITest(fixtures.TestBase):
         assert c not in t.constraints
         assert c not in t2.constraints
 
+    def test_auto_append_ck_on_col_attach_one(self):
+        m = MetaData()
+
+        a = Column('a', Integer)
+        b = Column('b', Integer)
+        ck = CheckConstraint(a > b)
+
+        t = Table('tbl', m, a, b)
+        assert ck in t.constraints
+
+    def test_auto_append_ck_on_col_attach_two(self):
+        m = MetaData()
+
+        a = Column('a', Integer)
+        b = Column('b', Integer)
+        c = Column('c', Integer)
+        ck = CheckConstraint(a > b + c)
+
+        t = Table('tbl', m, a)
+        assert ck not in t.constraints
+
+        t.append_column(b)
+        assert ck not in t.constraints
+
+        t.append_column(c)
+        assert ck in t.constraints
+
+    def test_auto_append_ck_on_col_attach_three(self):
+        m = MetaData()
+
+        a = Column('a', Integer)
+        b = Column('b', Integer)
+        c = Column('c', Integer)
+        ck = CheckConstraint(a > b + c)
+
+        t = Table('tbl', m, a)
+        assert ck not in t.constraints
+
+        t.append_column(b)
+        assert ck not in t.constraints
+
+        t2 = Table('t2', m)
+        t2.append_column(c)
+
+        # two different tables, so CheckConstraint does nothing.
+        assert ck not in t.constraints
+
+    def test_auto_append_uq_on_col_attach_one(self):
+        m = MetaData()
+
+        a = Column('a', Integer)
+        b = Column('b', Integer)
+        uq = UniqueConstraint(a, b)
+
+        t = Table('tbl', m, a, b)
+        assert uq in t.constraints
+
+    def test_auto_append_uq_on_col_attach_two(self):
+        m = MetaData()
+
+        a = Column('a', Integer)
+        b = Column('b', Integer)
+        c = Column('c', Integer)
+        uq = UniqueConstraint(a, b, c)
+
+        t = Table('tbl', m, a)
+        assert uq not in t.constraints
+
+        t.append_column(b)
+        assert uq not in t.constraints
+
+        t.append_column(c)
+        assert uq in t.constraints
+
+    def test_auto_append_uq_on_col_attach_three(self):
+        m = MetaData()
+
+        a = Column('a', Integer)
+        b = Column('b', Integer)
+        c = Column('c', Integer)
+        uq = UniqueConstraint(a, b, c)
+
+        t = Table('tbl', m, a)
+        assert uq not in t.constraints
+
+        t.append_column(b)
+        assert uq not in t.constraints
+
+        t2 = Table('t2', m)
+
+        # two different tables, so UniqueConstraint raises
+        assert_raises_message(
+            exc.ArgumentError,
+            r"Column\(s\) 't2\.c' are not part of table 'tbl'\.",
+            t2.append_column, c
+        )
+
     def test_index_asserts_cols_standalone(self):
         metadata = MetaData()