]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
[ticket:728] foreign key checks for existing reflected FK and replaces itself
authorMike Bayer <mike_mp@zzzcomputing.com>
Sat, 8 Sep 2007 21:09:50 +0000 (21:09 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sat, 8 Sep 2007 21:09:50 +0000 (21:09 +0000)
CHANGES
lib/sqlalchemy/schema.py
test/engine/reflection.py

diff --git a/CHANGES b/CHANGES
index c7e7b4db78d1f83389cd1e89a6d8471e70dcaf82..8a8b897a2a823abd929946cb6eb76373bdf8ccd7 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -37,7 +37,7 @@ CHANGES
   as it does in 0.3 since ~(x==y) compiles to "x != y", but still applies
   to operators like BETWEEN.
 
-- other tickets: [ticket:768]
+- other tickets: [ticket:768], [ticket:728]
 
 0.4.0beta5
 ----------
index 1dacad3de98ff635602f50261e9499ebdca0ec34..212fdac82acf9334b5ef431d531877a26466c2d7 100644 (file)
@@ -507,6 +507,7 @@ class Column(SchemaItem, expression._ColumnClause):
         if getattr(self, 'table', None) is not None:
             raise exceptions.ArgumentError("this Column already has a table!")
         if not self._is_oid:
+            self._pre_existing_column = table._columns.get(self.key)
             table._columns.add(self)
         if self.primary_key:
             table.primary_key.add(self)
@@ -674,7 +675,14 @@ class ForeignKey(SchemaItem):
 
     def _set_parent(self, column):
         self.parent = column
-
+        
+        if self.parent._pre_existing_column is not None:
+            # remove existing FK which matches us
+            for fk in self.parent._pre_existing_column.foreign_keys:
+                if fk._colspec == self._colspec:
+                    self.parent.table.foreign_keys.remove(fk)
+                    self.parent.table.constraints.remove(fk.constraint)
+            
         if self.constraint is None and isinstance(self.parent.table, Table):
             self.constraint = ForeignKeyConstraint([],[], use_alter=self.use_alter, name=self.name, onupdate=self.onupdate, ondelete=self.ondelete)
             self.parent.table.append_constraint(self.constraint)
index c70dc3f2eb833dbe100589cdba4bca2c14312863..1f2b4f56ace008ca877b12f07a52f9cd9d4a6b25 100644 (file)
@@ -125,8 +125,10 @@ class ReflectionTest(PersistTest):
         finally:
             meta.drop_all()
             
-    def testoverridecolumns(self):
-        """test that you can override columns which contain foreign keys to other reflected tables"""
+    def test_override_create_fkcols(self):
+        """test that you can override columns and create new foreign keys to other reflected tables.
+        this is common with MySQL MyISAM tables."""
+        
         meta = MetaData(testbase.db)
         users = Table('users', meta, 
             Column('id', Integer, primary_key=True),
@@ -144,7 +146,10 @@ class ReflectionTest(PersistTest):
                 autoload=True)
             u2 = Table('users', meta2, autoload=True)
             
-            assert len(a2.c.user_id.foreign_keys)>0
+            assert len(a2.c.user_id.foreign_keys) == 1
+            assert len(a2.foreign_keys) == 1
+            assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
+            assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id]
             assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id
             assert u2.join(a2).onclause == u2.c.id==a2.c.user_id
 
@@ -176,7 +181,7 @@ class ReflectionTest(PersistTest):
         finally:
             meta.drop_all()
 
-    def testoverridecolumns2(self):
+    def test_override_fkandpkcol(self):
         """test that you can override columns which contain foreign keys to other reflected tables,
         where the foreign key column is also a primary key column"""
         meta = MetaData(testbase.db)
@@ -222,6 +227,40 @@ class ReflectionTest(PersistTest):
         finally:
             meta.drop_all()
     
+    def test_override_existing_fkcols(self):
+        """test that you can override columns and specify new foreign keys to other reflected tables,
+        on columns which *do* already have that foreign key, and that the FK is not duped.
+        """
+        
+        meta = MetaData(testbase.db)
+        users = Table('users', meta, 
+            Column('id', Integer, primary_key=True),
+            Column('name', String(30)),
+            test_needs_fk=True)
+        addresses = Table('addresses', meta,
+            Column('id', Integer,primary_key=True),
+            Column('user_id', Integer, ForeignKey('users.id')),
+            test_needs_fk=True)
+            
+
+        meta.create_all()            
+        try:
+            meta2 = MetaData(testbase.db)
+            a2 = Table('addresses', meta2, 
+                Column('user_id',Integer, ForeignKey('users.id')),
+                autoload=True)
+            u2 = Table('users', meta2, autoload=True)
+            
+            assert len(a2.foreign_keys) == 1
+            assert len(a2.c.user_id.foreign_keys) == 1
+            assert len(a2.constraints) == 2
+            assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
+            assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id]
+            assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id
+            assert u2.join(a2).onclause == u2.c.id==a2.c.user_id
+        finally:
+            meta.drop_all()
+        
     def test_pks_not_uniques(self):
         """test that primary key reflection not tripped up by unique indexes"""
         testbase.db.execute("""