instance which will require save/update/delete is properly added to the UOWTransaction."""
raise NotImplementedError()
- def _synchronize(self, obj, child, associationrow, clearkeys):
+ def _synchronize(self, obj, child, associationrow, clearkeys, uowcommit):
"""called during a flush to synchronize primary key identifier values between a parent/child object, as well as
to an associationrow in the case of many-to-many."""
raise NotImplementedError()
if childlist is not None:
for child in childlist.deleted_items():
if child is not None and childlist.hasparent(child) is False:
- self._synchronize(obj, child, None, True)
+ self._synchronize(obj, child, None, True, uowcommit)
self._conditional_post_update(child, uowcommit, [obj])
for child in childlist.unchanged_items():
if child is not None:
- self._synchronize(obj, child, None, True)
+ self._synchronize(obj, child, None, True, uowcommit)
self._conditional_post_update(child, uowcommit, [obj])
else:
for obj in deplist:
childlist = self.get_object_dependencies(obj, uowcommit, passive=True)
if childlist is not None:
for child in childlist.added_items():
- self._synchronize(obj, child, None, False)
+ self._synchronize(obj, child, None, False, uowcommit)
self._conditional_post_update(child, uowcommit, [obj])
for child in childlist.deleted_items():
if not self.cascade.delete_orphan:
- self._synchronize(obj, child, None, True)
+ self._synchronize(obj, child, None, True, uowcommit)
def preprocess_dependencies(self, task, deplist, uowcommit, delete = False):
#print self.mapper.mapped_table.name + " " + self.key + " " + repr(len(deplist)) + " preprocess_dep isdelete " + repr(delete) + " direction " + repr(self.direction)
for c in self.mapper.cascade_iterator('delete', child):
uowcommit.register_object(c, isdelete=True)
- def _synchronize(self, obj, child, associationrow, clearkeys):
+ def _synchronize(self, obj, child, associationrow, clearkeys, uowcommit):
source = obj
dest = child
- if dest is None:
+ if dest is None or (not self.post_update and uowcommit.is_deleted(dest)):
return
self.syncrules.execute(source, dest, obj, child, clearkeys)
# post_update means we have to update our row to not reference the child object
# before we can DELETE the row
for obj in deplist:
- self._synchronize(obj, None, None, True)
+ self._synchronize(obj, None, None, True, uowcommit)
childlist = self.get_object_dependencies(obj, uowcommit, passive=self.passive_deletes)
if childlist is not None:
self._conditional_post_update(obj, uowcommit, childlist.deleted_items() + childlist.unchanged_items() + childlist.added_items())
childlist = self.get_object_dependencies(obj, uowcommit, passive=True)
if childlist is not None:
for child in childlist.added_items():
- self._synchronize(obj, child, None, False)
+ self._synchronize(obj, child, None, False, uowcommit)
self._conditional_post_update(obj, uowcommit, childlist.deleted_items() + childlist.unchanged_items() + childlist.added_items())
-
def preprocess_dependencies(self, task, deplist, uowcommit, delete = False):
#print self.mapper.mapped_table.name + " " + self.key + " " + repr(len(deplist)) + " PRE process_dep isdelete " + repr(delete) + " direction " + repr(self.direction)
if self.post_update:
for c in self.mapper.cascade_iterator('delete', child):
uowcommit.register_object(c, isdelete=True)
- def _synchronize(self, obj, child, associationrow, clearkeys):
+ def _synchronize(self, obj, child, associationrow, clearkeys, uowcommit):
source = child
dest = obj
- if dest is None:
+ if dest is None or (not self.post_update and uowcommit.is_deleted(dest)):
return
self.syncrules.execute(source, dest, obj, child, clearkeys)
if childlist is not None:
for child in childlist.deleted_items() + childlist.unchanged_items():
associationrow = {}
- self._synchronize(obj, child, associationrow, False)
+ self._synchronize(obj, child, associationrow, False, uowcommit)
secondary_delete.append(associationrow)
else:
for obj in deplist:
if childlist is None: continue
for child in childlist.added_items():
associationrow = {}
- self._synchronize(obj, child, associationrow, False)
+ self._synchronize(obj, child, associationrow, False, uowcommit)
secondary_insert.append(associationrow)
for child in childlist.deleted_items():
associationrow = {}
- self._synchronize(obj, child, associationrow, False)
+ self._synchronize(obj, child, associationrow, False, uowcommit)
secondary_delete.append(associationrow)
if len(secondary_delete):
secondary_delete.sort()
uowcommit.register_object(child, isdelete=True)
for c in self.mapper.cascade_iterator('delete', child):
uowcommit.register_object(c, isdelete=True)
- def _synchronize(self, obj, child, associationrow, clearkeys):
+ def _synchronize(self, obj, child, associationrow, clearkeys, uowcommit):
dest = associationrow
source = None
if dest is None:
j.pages[1].current_version = 12
s.delete(j)
s.flush()
+
+class RelationTest4(testbase.ORMTest):
+ """test syncrules on foreign keys that are also primary"""
+ def define_tables(self, metadata):
+ global tableA, tableB
+ tableA = Table("A", metadata,
+ Column("id",Integer,primary_key=True),
+ Column("foo",Integer,),
+ )
+ tableB = Table("B",metadata,
+ Column("id",Integer,ForeignKey("A.id"),primary_key=True),
+ )
+ def test_no_delete_PK_AtoB(self):
+ """test that A cant be deleted without B because B would have no PK value"""
+ class A(object):pass
+ class B(object):pass
+ mapper(A, tableA, properties={
+ 'bs':relation(B, cascade="save-update")
+ })
+ mapper(B, tableB)
+ a1 = A()
+ a1.bs.append(B())
+ sess = create_session()
+ sess.save(a1)
+ sess.flush()
+ sess.delete(a1)
+ try:
+ sess.flush()
+ assert False
+ except exceptions.AssertionError, e:
+ assert str(e).startswith("Dependency rule tried to blank-out primary key column 'B.id' on instance ")
+
+ def test_no_delete_PK_BtoA(self):
+ class A(object):pass
+ class B(object):pass
+ mapper(B, tableB, properties={
+ 'a':relation(A, cascade="save-update")
+ })
+ mapper(A, tableA)
+ b1 = B()
+ a1 = A()
+ b1.a = a1
+ sess = create_session()
+ sess.save(b1)
+ sess.flush()
+ b1.a = None
+ try:
+ sess.flush()
+ assert False
+ except exceptions.AssertionError, e:
+ assert str(e).startswith("Dependency rule tried to blank-out primary key column 'B.id' on instance ")
+
+ def test_delete_cascade_BtoA(self):
+ """test that the 'blank the PK' error doesnt get raised when the child is to be deleted as part of a
+ cascade"""
+ class A(object):pass
+ class B(object):pass
+ for cascade in (
+ "save-update, delete",
+ "save-update, delete-orphan",
+ "save-update, delete, delete-orphan"):
+
+ mapper(B, tableB, properties={
+ 'a':relation(A, cascade=cascade)
+ })
+ mapper(A, tableA)
+ b1 = B()
+ a1 = A()
+ b1.a = a1
+ sess = create_session()
+ sess.save(b1)
+ sess.flush()
+ sess.delete(b1)
+ sess.flush()
+ assert a1 not in sess
+ assert b1 not in sess
+ sess.clear()
+ clear_mappers()
+
+ def test_delete_cascade_AtoB(self):
+ """test that the 'blank the PK' error doesnt get raised when the child is to be deleted as part of a
+ cascade"""
+ class A(object):pass
+ class B(object):pass
+ for cascade in (
+ "save-update, delete",
+ "save-update, delete-orphan",
+ "save-update, delete, delete-orphan"):
+ mapper(A, tableA, properties={
+ 'bs':relation(B, cascade=cascade)
+ })
+ mapper(B, tableB)
+ a1 = A()
+ b1 = B()
+ a1.bs.append(b1)
+ sess = create_session()
+ sess.save(a1)
+ sess.flush()
-
+ sess.delete(a1)
+ sess.flush()
+ assert a1 not in sess
+ assert b1 not in sess
+ sess.clear()
+ clear_mappers()
+
+ def test_delete_manual_AtoB(self):
+ class A(object):pass
+ class B(object):pass
+ mapper(A, tableA, properties={
+ 'bs':relation(B, cascade="none")
+ })
+ mapper(B, tableB)
+ a1 = A()
+ b1 = B()
+ a1.bs.append(b1)
+ sess = create_session()
+ sess.save(a1)
+ sess.save(b1)
+ sess.flush()
+
+ sess.delete(a1)
+ sess.delete(b1)
+ sess.flush()
+ assert a1 not in sess
+ assert b1 not in sess
+ sess.clear()
+
+ def test_delete_manual_BtoA(self):
+ class A(object):pass
+ class B(object):pass
+ mapper(B, tableB, properties={
+ 'a':relation(A, cascade="none")
+ })
+ mapper(A, tableA)
+ b1 = B()
+ a1 = A()
+ b1.a = a1
+ sess = create_session()
+ sess.save(b1)
+ sess.save(a1)
+ sess.flush()
+ sess.delete(b1)
+ sess.delete(a1)
+ sess.flush()
+ assert a1 not in sess
+ assert b1 not in sess
+
if __name__ == "__main__":
testbase.main()