]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- fixed bug where cascade operations incorrectly included deleted collection
authorMike Bayer <mike_mp@zzzcomputing.com>
Tue, 30 Jan 2007 00:16:42 +0000 (00:16 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Tue, 30 Jan 2007 00:16:42 +0000 (00:16 +0000)
items in the cascade [ticket:445]

CHANGES
lib/sqlalchemy/orm/properties.py
test/orm/cascade.py

diff --git a/CHANGES b/CHANGES
index 44604120eacc76e4a17d9f84ab79a30ab6e6c889..0bba9d1884a836aaaa6f411054294e89db80da60 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -15,6 +15,8 @@
   the relationship.
   - eager loading is slightly more strict about detecting "self-referential"
   relationships, specifically between polymorphic mappers.
+  - fixed bug where cascade operations incorrectly included deleted collection
+  items in the cascade [ticket:445]
 - oracle:
   - when returning "rowid" as the ORDER BY column or in use with ROW_NUMBER OVER,
   oracle dialect checks the selectable its being applied to and will switch to 
index 42c017bc5eb4ecf047dc6d0261d6ac8fe3a9a7d7..09c90c6acc948bc0e1497d85af51ff7da128d6c4 100644 (file)
@@ -151,11 +151,8 @@ class PropertyLoader(StrategizedProperty):
         if not type in self.cascade:
             return
         passive = type != 'delete' or self.passive_deletes
-        childlist = sessionlib.attribute_manager.get_history(object, self.key, passive=passive)
-        if childlist is None:
-            return
         mapper = self.mapper.primary_mapper()
-        for c in childlist.added_items() + childlist.deleted_items() + childlist.unchanged_items():
+        for c in sessionlib.attribute_manager.get_as_list(object, self.key, passive=passive):
             if c is not None and c not in recursive and (halt_on is None or not halt_on(c)):
                 if not isinstance(c, self.mapper.class_):
                     raise exceptions.AssertionError("Attribute '%s' on class '%s' doesn't handle objects of type '%s'" % (self.key, str(self.parent.class_), str(c.__class__)))
index 16453974b777602dd802f0adbe958487b990a4f9..9b2b8528ccea14db816346b639951c18a0f4d978 100644 (file)
@@ -6,7 +6,6 @@ from sqlalchemy import *
 
 class O2MCascadeTest(testbase.AssertMixin):
     def tearDown(self):
-        ctx.current.clear()
         tables.delete()
 
     def tearDownAll(self):
@@ -14,8 +13,7 @@ class O2MCascadeTest(testbase.AssertMixin):
         tables.drop()
 
     def setUpAll(self):
-        global ctx, data
-        ctx = SessionContext(lambda: create_session( ))
+        global data
         tables.create()
         mapper(tables.User, tables.users, properties = dict(
             address = relation(mapper(tables.Address, tables.addresses), lazy = False, uselist = False, private = True),
@@ -52,9 +50,10 @@ class O2MCascadeTest(testbase.AssertMixin):
             }        
         ]
 
+        sess = create_session()
         for elem in data[1:]:
             u = tables.User()
-            ctx.current.save(u)
+            sess.save(u)
             u.user_name = elem['user_name']
             u.address = tables.Address()
             u.address.email_address = elem['address'][1]['email_address']
@@ -70,30 +69,50 @@ class O2MCascadeTest(testbase.AssertMixin):
                     i.item_name = item['item_name']
                     o.items.append(i)
 
-        ctx.current.flush()
-        ctx.current.clear()
+        sess.flush()
+        sess.clear()
 
         
     def testdelete(self):
-        l = ctx.current.query(tables.User).select()
+        sess = create_session()
+        l = sess.query(tables.User).select()
         for u in l:
             self.echo( repr(u.orders))
         self.assert_result(l, data[0], *data[1:])
 
         self.echo("\n\n\n")
         ids = (l[0].user_id, l[2].user_id)
-        ctx.current.delete(l[0])
-        ctx.current.delete(l[2])
+        sess.delete(l[0])
+        sess.delete(l[2])
 
-        ctx.current.flush()
+        sess.flush()
         self.assert_(tables.orders.count(tables.orders.c.user_id.in_(*ids)).scalar() == 0)
         self.assert_(tables.orderitems.count(tables.orders.c.user_id.in_(*ids)  &(tables.orderitems.c.order_id==tables.orders.c.order_id)).scalar() == 0)
         self.assert_(tables.addresses.count(tables.addresses.c.user_id.in_(*ids)).scalar() == 0)
         self.assert_(tables.users.count(tables.users.c.user_id.in_(*ids)).scalar() == 0)
+
+    def testcascadecollection(self):
+        """test that cascade only reaches instances that are still part of the collection,
+        not those that have been removed"""
+        sess = create_session()
+        
+        u = tables.User()
+        u.user_name = 'newuser'
+        o = tables.Order()
+        o.description = "some description"
+        u.orders.append(o)
+        sess.save(u)
+        sess.flush()
+        
+        u.orders.remove(o)
+        sess.delete(u)
+        assert u in sess.deleted
+        assert o not in sess.deleted
     
 
     def testorphan(self):
-        l = ctx.current.query(tables.User).select()
+        sess = create_session()
+        l = sess.query(tables.User).select()
         jack = l[1]
         jack.orders[:] = []
 
@@ -101,7 +120,7 @@ class O2MCascadeTest(testbase.AssertMixin):
         self.assert_(tables.orders.count(tables.orders.c.user_id.in_(*ids)).scalar() == 1)
         self.assert_(tables.orderitems.count(tables.orders.c.user_id.in_(*ids)  &(tables.orderitems.c.order_id==tables.orders.c.order_id)).scalar() == 2)
 
-        ctx.current.flush()
+        sess.flush()
 
         self.assert_(tables.orders.count(tables.orders.c.user_id.in_(*ids)).scalar() == 0)
         self.assert_(tables.orderitems.count(tables.orders.c.user_id.in_(*ids)  &(tables.orderitems.c.order_id==tables.orders.c.order_id)).scalar() == 0)
@@ -243,6 +262,7 @@ class M2MCascadeTest(testbase.AssertMixin):
         assert atob.count().scalar() ==0
         assert b.count().scalar() == 0
         assert a.count().scalar() == 0
-        
+    
+            
 if __name__ == "__main__":
     testbase.main()