if delete:
# head object is being deleted, and we manage its list of child objects
# the child objects have to have their foreign key to the parent set to NULL
- if not self.post_update and not self.cascade.delete and not self.passive_deletes=='all':
+ if not self.post_update:
+ should_null_fks = not self.cascade.delete and not self.passive_deletes=='all'
for state in deplist:
(added, unchanged, deleted) = uowcommit.get_attribute_history(state, self.key,passive=self.passive_deletes)
if unchanged or deleted:
for child in deleted:
if child is not None and self.hasparent(child) is False:
- uowcommit.register_object(child)
- for child in unchanged:
- if child is not None:
- uowcommit.register_object(child)
+ if self.cascade.delete_orphan:
+ uowcommit.register_object(child, isdelete=True)
+ else:
+ uowcommit.register_object(child)
+ if should_null_fks:
+ for child in unchanged:
+ if child is not None:
+ uowcommit.register_object(child)
else:
for state in deplist:
(added, unchanged, deleted) = uowcommit.get_attribute_history(state, self.key,passive=True)
uowcommit.register_object(child)
def _synchronize(self, state, child, associationrow, clearkeys, uowcommit):
- if child is not None:
- child = getattr(child, '_state', child)
source = state
dest = child
if dest is None or (not self.post_update and uowcommit.is_deleted(dest)):
if self.post_update:
return
if delete:
- if self.cascade.delete:
+ if self.cascade.delete or self.cascade.delete_orphan:
for state in deplist:
(added, unchanged, deleted) = uowcommit.get_attribute_history(state, self.key,passive=self.passive_deletes)
- if deleted or unchanged:
- for child in deleted + unchanged:
- if child is not None and self.hasparent(child) is False:
- uowcommit.register_object(child, isdelete=True)
- for c, m in self.mapper.cascade_iterator('delete', child):
- uowcommit.register_object(c._state, isdelete=True)
+ if self.cascade.delete_orphan:
+ todelete = added + unchanged + deleted
+ else:
+ todelete = added + unchanged
+ for child in todelete:
+ if child is None:
+ continue
+ uowcommit.register_object(child, isdelete=True)
+ for c, m in self.mapper.cascade_iterator('delete', child):
+ uowcommit.register_object(c._state, isdelete=True)
else:
for state in deplist:
uowcommit.register_object(state)
assert users.count().scalar() == 1
assert orders.count().scalar() == 1
self.assertEquals(sess.query(User).all(), [User(name='newuser', orders=[Order(description='someorder')])])
+
+ def test_cascade_delete_plusorphans(self):
+ sess = create_session()
+ u = User(name='jack', orders=[Order(description='someorder'), Order(description='someotherorder')])
+ sess.save(u)
+ sess.flush()
+ assert users.count().scalar() == 1
+ assert orders.count().scalar() == 2
+
+ del u.orders[0]
+ sess.delete(u)
+ sess.flush()
+ assert users.count().scalar() == 0
+ assert orders.count().scalar() == 0
def test_collection_orphans(self):
sess = create_session()
assert users.count().scalar() == 1
assert orders.count().scalar() == 0
+class O2MCascadeNoOrphanTest(fixtures.FixtureTest):
+ keep_mappers = True
+ keep_data = False
+ refresh_data = False
+
+ def setup_mappers(self):
+ global User, Address, Order, users, orders, addresses
+ from testlib.fixtures import User, Address, Order, users, orders, addresses
+
+ mapper(User, users, properties = dict(
+ orders = relation(
+ mapper(Order, orders), cascade="all")
+ ))
+
+ def test_cascade_delete_noorphans(self):
+ sess = create_session()
+ u = User(name='jack', orders=[Order(description='someorder'), Order(description='someotherorder')])
+ sess.save(u)
+ sess.flush()
+ assert users.count().scalar() == 1
+ assert orders.count().scalar() == 2
+
+ del u.orders[0]
+ sess.delete(u)
+ sess.flush()
+ assert users.count().scalar() == 0
+ assert orders.count().scalar() == 1
+
class M2OCascadeTest(ORMTest):
keep_mappers = True
sess.flush()
self.assertEquals(sess.query(Pref).all(), [Pref(data="pref 1"), Pref(data="pref 3"), Pref(data="newpref")])
+class M2OCascadeDeleteTest(ORMTest):
+ keep_mappers = True
+
+ def define_tables(self, metadata):
+ global t1, t2, t3
+ t1 = Table('t1', metadata, Column('id', Integer, primary_key=True), Column('data', String(50)), Column('t2id', Integer, ForeignKey('t2.id')))
+ t2 = Table('t2', metadata, Column('id', Integer, primary_key=True), Column('data', String(50)), Column('t3id', Integer, ForeignKey('t3.id')))
+ t3 = Table('t3', metadata, Column('id', Integer, primary_key=True), Column('data', String(50)))
+
+ def setup_mappers(self):
+ global T1, T2, T3
+ class T1(fixtures.Base):pass
+ class T2(fixtures.Base):pass
+ class T3(fixtures.Base):pass
+
+ mapper(T1, t1, properties={'t2':relation(T2, cascade="all")})
+ mapper(T2, t2, properties={'t3':relation(T3, cascade="all")})
+ mapper(T3, t3)
+
+ def test_cascade_delete(self):
+ sess = create_session()
+
+ x = T1(data='t1a', t2=T2(data='t2a', t3=T3(data='t3a')))
+ sess.save(x)
+ sess.flush()
+
+ sess.delete(x)
+ sess.flush()
+ self.assertEquals(sess.query(T1).all(), [])
+ self.assertEquals(sess.query(T2).all(), [])
+ self.assertEquals(sess.query(T3).all(), [])
+
+ def test_cascade_delete_postappend_onelevel(self):
+ sess = create_session()
+
+ x1 = T1(data='t1', )
+ x2 = T2(data='t2')
+ x3 = T3(data='t3')
+ sess.save(x1)
+ sess.save(x2)
+ sess.save(x3)
+ sess.flush()
+
+ sess.delete(x1)
+ x1.t2 = x2
+ x2.t3 = x3
+ sess.flush()
+ self.assertEquals(sess.query(T1).all(), [])
+ self.assertEquals(sess.query(T2).all(), [])
+ self.assertEquals(sess.query(T3).all(), [])
+
+ def test_cascade_delete_postappend_twolevel(self):
+ sess = create_session()
+
+ x1 = T1(data='t1', t2=T2(data='t2'))
+ x3 = T3(data='t3')
+ sess.save(x1)
+ sess.save(x3)
+ sess.flush()
+
+ sess.delete(x1)
+ x1.t2.t3 = x3
+ sess.flush()
+ self.assertEquals(sess.query(T1).all(), [])
+ self.assertEquals(sess.query(T2).all(), [])
+ self.assertEquals(sess.query(T3).all(), [])
+
+ def test_preserves_orphans_onelevel(self):
+ sess = create_session()
+
+ x2 = T1(data='t1b', t2=T2(data='t2b', t3=T3(data='t3b')))
+ sess.save(x2)
+ sess.flush()
+ x2.t2 = None
+
+ sess.delete(x2)
+ sess.flush()
+ self.assertEquals(sess.query(T1).all(), [])
+ self.assertEquals(sess.query(T2).all(), [T2()])
+ self.assertEquals(sess.query(T3).all(), [T3()])
+
+ @testing.future
+ def test_preserves_orphans_onelevel_postremove(self):
+ sess = create_session()
+
+ x2 = T1(data='t1b', t2=T2(data='t2b', t3=T3(data='t3b')))
+ sess.save(x2)
+ sess.flush()
+
+ sess.delete(x2)
+ x2.t2 = None
+ sess.flush()
+ self.assertEquals(sess.query(T1).all(), [])
+ self.assertEquals(sess.query(T2).all(), [T2()])
+ self.assertEquals(sess.query(T3).all(), [T3()])
+
+ def test_preserves_orphans_twolevel(self):
+ sess = create_session()
+
+ x = T1(data='t1a', t2=T2(data='t2a', t3=T3(data='t3a')))
+ sess.save(x)
+ sess.flush()
+
+ x.t2.t3 = None
+ sess.delete(x)
+ sess.flush()
+ self.assertEquals(sess.query(T1).all(), [])
+ self.assertEquals(sess.query(T2).all(), [])
+ self.assertEquals(sess.query(T3).all(), [T3()])
+
+class M2OCascadeDeleteOrphanTest(ORMTest):
+ keep_mappers = True
+
+ def define_tables(self, metadata):
+ global t1, t2, t3
+ t1 = Table('t1', metadata, Column('id', Integer, primary_key=True), Column('data', String(50)), Column('t2id', Integer, ForeignKey('t2.id')))
+ t2 = Table('t2', metadata, Column('id', Integer, primary_key=True), Column('data', String(50)), Column('t3id', Integer, ForeignKey('t3.id')))
+ t3 = Table('t3', metadata, Column('id', Integer, primary_key=True), Column('data', String(50)))
+
+ def setup_mappers(self):
+ global T1, T2, T3
+ class T1(fixtures.Base):pass
+ class T2(fixtures.Base):pass
+ class T3(fixtures.Base):pass
+
+ mapper(T1, t1, properties={'t2':relation(T2, cascade="all, delete-orphan")})
+ mapper(T2, t2, properties={'t3':relation(T3, cascade="all, delete-orphan")})
+ mapper(T3, t3)
+
+ def test_cascade_delete(self):
+ sess = create_session()
+
+ x = T1(data='t1a', t2=T2(data='t2a', t3=T3(data='t3a')))
+ sess.save(x)
+ sess.flush()
+
+ sess.delete(x)
+ sess.flush()
+ self.assertEquals(sess.query(T1).all(), [])
+ self.assertEquals(sess.query(T2).all(), [])
+ self.assertEquals(sess.query(T3).all(), [])
+
+ def test_deletes_orphans_onelevel(self):
+ sess = create_session()
+
+ x2 = T1(data='t1b', t2=T2(data='t2b', t3=T3(data='t3b')))
+ sess.save(x2)
+ sess.flush()
+ x2.t2 = None
+
+ sess.delete(x2)
+ sess.flush()
+ self.assertEquals(sess.query(T1).all(), [])
+ self.assertEquals(sess.query(T2).all(), [])
+ self.assertEquals(sess.query(T3).all(), [])
+
+ def test_deletes_orphans_twolevel(self):
+ sess = create_session()
+
+ x = T1(data='t1a', t2=T2(data='t2a', t3=T3(data='t3a')))
+ sess.save(x)
+ sess.flush()
+
+ x.t2.t3 = None
+ sess.delete(x)
+ sess.flush()
+ self.assertEquals(sess.query(T1).all(), [])
+ self.assertEquals(sess.query(T2).all(), [])
+ self.assertEquals(sess.query(T3).all(), [])
+
+ def test_finds_orphans_twolevel(self):
+ sess = create_session()
+
+ x = T1(data='t1a', t2=T2(data='t2a', t3=T3(data='t3a')))
+ sess.save(x)
+ sess.flush()
+
+ x.t2.t3 = None
+ sess.flush()
+ self.assertEquals(sess.query(T1).all(), [T1()])
+ self.assertEquals(sess.query(T2).all(), [T2()])
+ self.assertEquals(sess.query(T3).all(), [])
+
class M2MCascadeTest(ORMTest):
def define_tables(self, metadata):
- global a, b, atob
+ global a, b, atob, c
a = Table('a', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(30))
atob = Table('atob', metadata,
Column('aid', Integer, ForeignKey('a.id')),
Column('bid', Integer, ForeignKey('b.id'))
-
)
-
+ c = Table('c', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(30)),
+ Column('bid', Integer, ForeignKey('b.id'))
+ )
+
def test_delete_orphan(self):
class A(fixtures.Base):
pass
assert b.count().scalar() == 0
assert a.count().scalar() == 1
+ def test_delete_orphan_cascades(self):
+ class A(fixtures.Base):
+ pass
+ class B(fixtures.Base):
+ pass
+ class C(fixtures.Base):
+ pass
+
+ mapper(A, a, properties={
+ # if no backref here, delete-orphan failed until [ticket:427] was fixed
+ 'bs':relation(B, secondary=atob, cascade="all, delete-orphan")
+ })
+ mapper(B, b, properties={'cs':relation(C, cascade="all, delete-orphan")})
+ mapper(C, c)
+
+ sess = create_session()
+ b1 = B(data='b1', cs=[C(data='c1')])
+ a1 = A(data='a1', bs=[b1])
+ sess.save(a1)
+ sess.flush()
+
+ a1.bs.remove(b1)
+ sess.flush()
+ assert atob.count().scalar() ==0
+ assert b.count().scalar() == 0
+ assert a.count().scalar() == 1
+ assert c.count().scalar() == 0
+
def test_cascade_delete(self):
class A(fixtures.Base):
pass