value_params[col] = history.added[0]
else:
params[col.key] = prop.get_col_value(col, history.added[0])
+
if col in pks:
+ # TODO: there is one case we want to use history.added for
+ # the PK value - when we know that the PK has already been
+ # updated via CASCADE. This information needs to get here
+ # somehow. see [ticket:1671]
+
if history.deleted:
- params[col._label] = prop.get_col_value(col, history.deleted[0])
+ # PK is changing - use the old value to locate the row
+ params[col._label] = \
+ prop.get_col_value(col, history.deleted[0])
hasdata = True
else:
# row switch logic can reach us here
# remove the pk from the update params so the update doesn't
# attempt to include the pk in the update statement
del params[col.key]
- params[col._label] = prop.get_col_value(col, history.added[0])
+ params[col._label] = \
+ prop.get_col_value(col, history.added[0])
else:
hasdata = True
elif col in pks:
eq_(User(username='fred', fullname='jack'), u1)
+class CascadeToFKPKTest(_base.MappedTest):
+ """A primary key mutation cascades onto a foreign key that is itself a primary key."""
+
+ @classmethod
+ def define_tables(cls, metadata):
+ if testing.against('oracle'):
+ fk_args = dict(deferrable=True, initially='deferred')
+ else:
+ fk_args = dict(onupdate='cascade')
+
+ Table('users', metadata,
+ Column('username', String(50), primary_key=True),
+ test_needs_fk=True)
+
+ Table('addresses', metadata,
+ Column('username', String(50),
+ ForeignKey('users.username', **fk_args),
+ primary_key=True
+ ),
+ Column('email', String(50), primary_key=True),
+ test_needs_fk=True
+ )
+
+ @classmethod
+ def setup_classes(cls):
+ class User(_base.ComparableEntity):
+ pass
+ class Address(_base.ComparableEntity):
+ pass
+
+
+ @testing.fails_on_everything_except('sqlite') # Ticket #1671
+ @testing.fails_on('oracle', 'oracle doesnt support ON UPDATE CASCADE')
+ def test_onetomany_passive(self):
+ self._test_onetomany(True)
+
+ @testing.fails_on_everything_except('sqlite') # Ticket #1671
+ def test_onetomany_nonpassive(self):
+ self._test_onetomany(False)
+
+ def test_move_passive(self):
+ self._test_move(True)
+
+ def test_move_nonpassive(self):
+ self._test_move(False)
+
+ @testing.resolve_artifact_names
+ def _test_move(self, passive_updates):
+ """Change the PK of a related entity to another.
+
+ "on update cascade" is not involved here, so the mapper has
+ to do the UPDATE itself.
+
+ """
+ mapper(User, users, properties={
+ 'addresses':relation(Address, passive_updates=passive_updates)})
+ mapper(Address, addresses)
+
+ sess = create_session()
+ a1 = Address(username='ed', email='ed@host1')
+ u1 = User(username='ed', addresses=[a1])
+ u2 = User(username='jack')
+
+ sess.add_all([a1, u1, u2])
+ sess.flush()
+
+ a1.username = 'jack'
+ sess.flush()
+
+
+
+ @testing.resolve_artifact_names
+ def _test_onetomany(self, passive_updates):
+ """Change the PK of a related entity via foreign key cascade.
+
+ For databases that require "on update cascade", the mapper
+ has to identify the row by the new value, not the old, when
+ it does the update.
+
+ """
+ mapper(User, users, properties={
+ 'addresses':relation(Address, passive_updates=passive_updates)})
+ mapper(Address, addresses)
+
+ sess = create_session()
+ a1, a2 = Address(username='ed', email='ed@host1'), Address(username='ed', email='ed@host2')
+ u1 = User(username='ed', addresses=[a1, a2])
+ sess.add(u1)
+ sess.flush()
+ eq_(a1.username, 'ed')
+ eq_(a2.username, 'ed')
+ eq_(sa.select([addresses.c.username]).execute().fetchall(), [('ed',), ('ed',)])
+
+ u1.username = 'jack'
+ a2.email='ed@host3'
+ sess.flush()
+
+ eq_(a1.username, 'jack')
+ eq_(a2.username, 'jack')
+ eq_(sa.select([addresses.c.username]).execute().fetchall(), [('jack',), ('jack', )])