=======
0.6.4
=====
+- orm
+ - Moving an o2m object from one collection to
+ another, or vice versa changing the referenced
+ object by an m2o, where the foreign key is also a
+ member of the primary key, will now be more
+ carefully checked during flush if the change in
+ value of the foreign key on the "many" side is the
+ result of a change in the primary key of the "one"
+ side, or if the "one" is just a different object.
+ In one case, a cascade-capable DB would have
+ cascaded the value already and we need to look at
+ the "new" PK value to do an UPDATE, in the other we
+ need to continue looking at the "old". We now look
+ at the "old", assuming passive_updates=True,
+ unless we know it was a PK switch that
+ triggered the change. [ticket:1856]
+
- sql
- Changed the scheme used to generate truncated
"auto" index names when using the "index=True"
self._synchronize(
state,
child,
- None, True, uowcommit)
+ None, True,
+ uowcommit, False)
if self.post_update and child:
self._post_update(child, uowcommit, [state])
self._synchronize(
state,
child,
- None, True, uowcommit)
+ None, True,
+ uowcommit, False)
if self.post_update and child:
self._post_update(child,
uowcommit,
passive=True)
if history:
for child in history.added:
- self._synchronize(state, child, None, False, uowcommit)
+ self._synchronize(state, child, None,
+ False, uowcommit, False)
if child is not None and self.post_update:
self._post_update(child, uowcommit, [state])
for child in history.deleted:
if not self.cascade.delete_orphan and \
not self.hasparent(child):
- self._synchronize(state, child, None, True, uowcommit)
+ self._synchronize(state, child, None, True,
+ uowcommit, False)
if self._pks_changed(uowcommit, state):
for child in history.unchanged:
self._synchronize(state, child, None,
- False, uowcommit)
+ False, uowcommit, True)
- def _synchronize(self, state, child, associationrow,
- clearkeys, uowcommit):
+ def _synchronize(self, state, child,
+ associationrow, clearkeys, uowcommit,
+ pks_changed):
source = state
dest = child
if dest is None or \
else:
sync.populate(source, self.parent, dest, self.mapper,
self.prop.synchronize_pairs, uowcommit,
- self.passive_updates)
+ self.passive_updates and pks_changed)
def _pks_changed(self, uowcommit, state):
return sync.source_modified(
self.parent,
self.prop.synchronize_pairs,
uowcommit,
- self.passive_updates
- )
+ False)
class DetectKeySwitch(DependencyProcessor):
"""For many-to-one relationships with no one-to-many backref,
from sqlalchemy.orm import exc, util as mapperutil
def populate(source, source_mapper, dest, dest_mapper,
- synchronize_pairs, uowcommit, passive_updates):
+ synchronize_pairs, uowcommit, flag_cascaded_pks):
for l, r in synchronize_pairs:
try:
value = source_mapper._get_state_attr_by_column(source, source.dict, l)
# reasons, since we only need this info for a primary key
# destination.
if l.primary_key and r.primary_key and \
- r.references(l) and passive_updates:
+ r.references(l) and flag_cascaded_pks:
uowcommit.attributes[("pk_cascaded", dest, r)] = True
def clear(dest, dest_mapper, synchronize_pairs):
def test_onetomany_nonpassive(self):
self._test_onetomany(False)
- def test_move_passive(self):
- self._test_move(True)
+ def test_o2m_change_passive(self):
+ self._test_o2m_change(True)
- def test_move_nonpassive(self):
- self._test_move(False)
+ def test_o2m_change_nonpassive(self):
+ self._test_o2m_change(False)
@testing.resolve_artifact_names
- def _test_move(self, passive_updates):
+ def _test_o2m_change(self, passive_updates):
"""Change the PK of a related entity to another.
"on update cascade" is not involved here, so the mapper has
a1.username = 'jack'
sess.flush()
+
+ def test_o2m_move_passive(self):
+ self._test_o2m_move(True)
+
+ def test_o2m_move_nonpassive(self):
+ self._test_o2m_move(False)
+
+ @testing.resolve_artifact_names
+ def _test_o2m_move(self, passive_updates):
+ """Move the related entity to a different collection,
+ changing its PK.
+
+ """
+ mapper(User, users, properties={
+ 'addresses':relationship(Address,
+ passive_updates=passive_updates)})
+ mapper(Address, addresses)
+
+ sess = create_session()
+ a1 = Address(username='ed', email='ed@host1')
+ u1 = User(username='ed', addresses=[a1])
+ u2 = User(username='jack')
+
+ sess.add_all([a1, u1, u2])
+ sess.flush()
+
+ u1.addresses.remove(a1)
+ u2.addresses.append(a1)
+ sess.flush()
+
+ def test_change_m2o_passive(self):
+ self._test_change_m2o(True)
+
+ @testing.fails_on_everything_except('sqlite', 'oracle', '+zxjdbc')
+ def test_change_m2o_nonpassive(self):
+ self._test_change_m2o(False)
+
+ @testing.resolve_artifact_names
+ def _test_change_m2o(self, passive_updates):
+ mapper(User, users)
+ mapper(Address, addresses, properties={
+ 'user':relationship(User, passive_updates=passive_updates)
+ })
+
+ sess = create_session()
+ u1 = User(username='jack')
+ a1 = Address(user=u1, email='foo@bar')
+ sess.add_all([u1, a1])
+ sess.flush()
+ u1.username='edmodified'
+ sess.flush()
+ eq_(a1.username, 'edmodified')
+
+ def test_move_m2o_passive(self):
+ self._test_move_m2o(True)
+
+ def test_move_m2o_nonpassive(self):
+ self._test_move_m2o(False)
+
+ @testing.resolve_artifact_names
+ def _test_move_m2o(self, passive_updates):
+ # tests [ticket:1856]
+ mapper(User, users)
+ mapper(Address, addresses, properties={
+ 'user':relationship(User, passive_updates=passive_updates)
+ })
+
+ sess = create_session()
+ u1 = User(username='jack')
+ u2 = User(username='ed')
+ a1 = Address(user=u1, email='foo@bar')
+ sess.add_all([u1, u2, a1])
+ sess.flush()
+
+ a1.user = u2
+ sess.flush()
+
+
@testing.resolve_artifact_names
def test_rowswitch_doesntfire(self):
mapper(User, users)