if not type in self.cascade:
return
passive = type != 'delete' or self.passive_deletes
- childlist = sessionlib.attribute_manager.get_history(object, self.key, passive=passive)
- if childlist is None:
- return
mapper = self.mapper.primary_mapper()
- for c in childlist.added_items() + childlist.deleted_items() + childlist.unchanged_items():
+ for c in sessionlib.attribute_manager.get_as_list(object, self.key, passive=passive):
if c is not None and c not in recursive and (halt_on is None or not halt_on(c)):
if not isinstance(c, self.mapper.class_):
raise exceptions.AssertionError("Attribute '%s' on class '%s' doesn't handle objects of type '%s'" % (self.key, str(self.parent.class_), str(c.__class__)))
class O2MCascadeTest(testbase.AssertMixin):
def tearDown(self):
- ctx.current.clear()
tables.delete()
def tearDownAll(self):
tables.drop()
def setUpAll(self):
- global ctx, data
- ctx = SessionContext(lambda: create_session( ))
+ global data
tables.create()
mapper(tables.User, tables.users, properties = dict(
address = relation(mapper(tables.Address, tables.addresses), lazy = False, uselist = False, private = True),
}
]
+ sess = create_session()
for elem in data[1:]:
u = tables.User()
- ctx.current.save(u)
+ sess.save(u)
u.user_name = elem['user_name']
u.address = tables.Address()
u.address.email_address = elem['address'][1]['email_address']
i.item_name = item['item_name']
o.items.append(i)
- ctx.current.flush()
- ctx.current.clear()
+ sess.flush()
+ sess.clear()
def testdelete(self):
- l = ctx.current.query(tables.User).select()
+ sess = create_session()
+ l = sess.query(tables.User).select()
for u in l:
self.echo( repr(u.orders))
self.assert_result(l, data[0], *data[1:])
self.echo("\n\n\n")
ids = (l[0].user_id, l[2].user_id)
- ctx.current.delete(l[0])
- ctx.current.delete(l[2])
+ sess.delete(l[0])
+ sess.delete(l[2])
- ctx.current.flush()
+ sess.flush()
self.assert_(tables.orders.count(tables.orders.c.user_id.in_(*ids)).scalar() == 0)
self.assert_(tables.orderitems.count(tables.orders.c.user_id.in_(*ids) &(tables.orderitems.c.order_id==tables.orders.c.order_id)).scalar() == 0)
self.assert_(tables.addresses.count(tables.addresses.c.user_id.in_(*ids)).scalar() == 0)
self.assert_(tables.users.count(tables.users.c.user_id.in_(*ids)).scalar() == 0)
+
+ def testcascadecollection(self):
+ """test that cascade only reaches instances that are still part of the collection,
+ not those that have been removed"""
+ sess = create_session()
+
+ u = tables.User()
+ u.user_name = 'newuser'
+ o = tables.Order()
+ o.description = "some description"
+ u.orders.append(o)
+ sess.save(u)
+ sess.flush()
+
+ u.orders.remove(o)
+ sess.delete(u)
+ assert u in sess.deleted
+ assert o not in sess.deleted
def testorphan(self):
- l = ctx.current.query(tables.User).select()
+ sess = create_session()
+ l = sess.query(tables.User).select()
jack = l[1]
jack.orders[:] = []
self.assert_(tables.orders.count(tables.orders.c.user_id.in_(*ids)).scalar() == 1)
self.assert_(tables.orderitems.count(tables.orders.c.user_id.in_(*ids) &(tables.orderitems.c.order_id==tables.orders.c.order_id)).scalar() == 2)
- ctx.current.flush()
+ sess.flush()
self.assert_(tables.orders.count(tables.orders.c.user_id.in_(*ids)).scalar() == 0)
self.assert_(tables.orderitems.count(tables.orders.c.user_id.in_(*ids) &(tables.orderitems.c.order_id==tables.orders.c.order_id)).scalar() == 0)
assert atob.count().scalar() ==0
assert b.count().scalar() == 0
assert a.count().scalar() == 0
-
+
+
if __name__ == "__main__":
testbase.main()