]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- added a failing-so-far test for #1671
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 1 Feb 2010 23:38:51 +0000 (23:38 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Mon, 1 Feb 2010 23:38:51 +0000 (23:38 +0000)
lib/sqlalchemy/orm/dependency.py
lib/sqlalchemy/orm/mapper.py
test/orm/test_naturalpks.py

index 8276acdd00bb9aaec5a9fb1be4c959728f35c44a..6a9c80ebf5d8a9dd59d196fae3a9d867e053b54c 100644 (file)
@@ -267,7 +267,8 @@ class OneToManyDP(DependencyProcessor):
                                     isdelete=True)
                 if self._pks_changed(uowcommit, state):
                     if not history:
-                        history = uowcommit.get_attribute_history(state, self.key, passive=self.passive_updates)
+                        history = uowcommit.get_attribute_history(
+                                            state, self.key, passive=self.passive_updates)
                     if history:
                         for child in history.unchanged:
                             if child is not None:
index 0d4e2adaaf4a2f4c6be19cdef387e519e4b49259..bde7fd2e1ac427530d9f655285cb1f7a5c4ca46c 100644 (file)
@@ -1400,16 +1400,25 @@ class Mapper(object):
                                     value_params[col] = history.added[0]
                                 else:
                                     params[col.key] = prop.get_col_value(col, history.added[0])
+
                                 if col in pks:
+                                    # TODO: there is one case we want to use history.added for
+                                    # the PK value - when we know that the PK has already been
+                                    # updated via CASCADE.   This information needs to get here
+                                    # somehow.  see [ticket:1671]
+                                    
                                     if history.deleted:
-                                        params[col._label] = prop.get_col_value(col, history.deleted[0])
+                                        # PK is changing - use the old value to locate the row
+                                        params[col._label] = \
+                                                prop.get_col_value(col, history.deleted[0])
                                         hasdata = True
                                     else:
                                         # row switch logic can reach us here
                                         # remove the pk from the update params so the update doesn't
                                         # attempt to include the pk in the update statement
                                         del params[col.key]
-                                        params[col._label] = prop.get_col_value(col, history.added[0])
+                                        params[col._label] = \
+                                                    prop.get_col_value(col, history.added[0])
                                 else:
                                     hasdata = True
                             elif col in pks:
index 919c8f83c5884abe925c38e3382a23eadc9efe4a..277d1ef24a6f8705dec95ad8f8439758978f4651 100644 (file)
@@ -557,3 +557,103 @@ class NonPKCascadeTest(_base.MappedTest):
         eq_(User(username='fred', fullname='jack'), u1)
 
 
+class CascadeToFKPKTest(_base.MappedTest):
+    """A primary key mutation cascades onto a foreign key that is itself a primary key."""
+    
+    @classmethod
+    def define_tables(cls, metadata):
+        if testing.against('oracle'):
+            fk_args = dict(deferrable=True, initially='deferred')
+        else:
+            fk_args = dict(onupdate='cascade')
+
+        Table('users', metadata,
+            Column('username', String(50), primary_key=True),
+            test_needs_fk=True)
+
+        Table('addresses', metadata,
+                Column('username', String(50), 
+                       ForeignKey('users.username', **fk_args),
+                       primary_key=True
+                       ),
+              Column('email', String(50), primary_key=True),
+                 test_needs_fk=True
+                 )
+
+    @classmethod
+    def setup_classes(cls):
+        class User(_base.ComparableEntity):
+            pass
+        class Address(_base.ComparableEntity):
+            pass
+    
+        
+    @testing.fails_on_everything_except('sqlite') # Ticket #1671
+    @testing.fails_on('oracle', 'oracle doesnt support ON UPDATE CASCADE')
+    def test_onetomany_passive(self):
+        self._test_onetomany(True)
+
+    @testing.fails_on_everything_except('sqlite') # Ticket #1671
+    def test_onetomany_nonpassive(self):
+        self._test_onetomany(False)
+        
+    def test_move_passive(self):
+        self._test_move(True)
+        
+    def test_move_nonpassive(self):
+        self._test_move(False)
+
+    @testing.resolve_artifact_names
+    def _test_move(self, passive_updates):
+        """Change the PK of a related entity to another.
+        
+        "on update cascade" is not involved here, so the mapper has 
+        to do the UPDATE itself.
+        
+        """
+        mapper(User, users, properties={
+            'addresses':relation(Address, passive_updates=passive_updates)})
+        mapper(Address, addresses)
+
+        sess = create_session()
+        a1 = Address(username='ed', email='ed@host1')
+        u1 = User(username='ed', addresses=[a1])
+        u2 = User(username='jack')
+        
+        sess.add_all([a1, u1, u2])
+        sess.flush()
+        
+        a1.username = 'jack'
+        sess.flush()
+        
+        
+        
+    @testing.resolve_artifact_names
+    def _test_onetomany(self, passive_updates):
+        """Change the PK of a related entity via foreign key cascade.
+        
+        For databases that require "on update cascade", the mapper 
+        has to identify the row by the new value, not the old, when
+        it does the update.
+        
+        """
+        mapper(User, users, properties={
+            'addresses':relation(Address, passive_updates=passive_updates)})
+        mapper(Address, addresses)
+    
+        sess = create_session()
+        a1, a2 = Address(username='ed', email='ed@host1'), Address(username='ed', email='ed@host2')
+        u1 = User(username='ed', addresses=[a1, a2])
+        sess.add(u1)
+        sess.flush()
+        eq_(a1.username, 'ed')
+        eq_(a2.username, 'ed')
+        eq_(sa.select([addresses.c.username]).execute().fetchall(), [('ed',), ('ed',)])
+        
+        u1.username = 'jack'
+        a2.email='ed@host3'
+        sess.flush()
+
+        eq_(a1.username, 'jack')
+        eq_(a2.username, 'jack')
+        eq_(sa.select([addresses.c.username]).execute().fetchall(), [('jack',), ('jack', )])