]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
clarifying some cascade-based unit tests, adding a little more coverage,
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 4 May 2007 01:54:34 +0000 (01:54 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 4 May 2007 01:54:34 +0000 (01:54 +0000)
and trying to remove unneeded parts of dependency.py cascades.
also de-emphasizing the whole session.flush([oneobject]) thing since i dont really
agree it should be supported

lib/sqlalchemy/orm/dependency.py
lib/sqlalchemy/orm/unitofwork.py
test/orm/cascade.py
test/orm/session.py
test/orm/unitofwork.py

index 063d5aaa9dfd19752fb9bf51e7eb6a9a91c71a9d..e623fd2a1ddbb5a103bae36cab3fd04f347563c1 100644 (file)
@@ -180,9 +180,9 @@ class OneToManyDP(DependencyProcessor):
         if delete:
             # head object is being deleted, and we manage its list of child objects
             # the child objects have to have their foreign key to the parent set to NULL
-            
-            # TODO: this cascade should be "delete" cascade
-            if not self.cascade.delete_orphan or self.post_update:
+            # this phase can be called safely for any cascade but is unnecessary if delete cascade
+            # is on.
+            if not self.cascade.delete or self.post_update:
                 for obj in deplist:
                     childlist = self.get_object_dependencies(obj, uowcommit, passive=self.passive_deletes)
                     if childlist is not None:
@@ -211,25 +211,7 @@ class OneToManyDP(DependencyProcessor):
         if delete:
             # head object is being deleted, and we manage its list of child objects
             # the child objects have to have their foreign key to the parent set to NULL
-            if self.post_update:
-                pass
-            # TODO: this block based on "delete_orphan" should technically be "delete", but also
-            # is entirely not necessary
-            elif self.cascade.delete_orphan:
-                for obj in deplist:
-                    childlist = self.get_object_dependencies(obj, uowcommit, passive=self.passive_deletes)
-                    if childlist is not None:
-                        for child in childlist.deleted_items():
-                            if child is not None and childlist.hasparent(child) is False:
-                                uowcommit.register_object(child, isdelete=True)
-                                for c in self.mapper.cascade_iterator('delete', child):
-                                    uowcommit.register_object(c, isdelete=True)
-                        for child in childlist.unchanged_items():
-                            if child is not None:
-                                uowcommit.register_object(child, isdelete=True)
-                                for c in self.mapper.cascade_iterator('delete', child):
-                                    uowcommit.register_object(c, isdelete=True)
-            else:
+            if not self.post_update and not self.cascade.delete:
                 for obj in deplist:
                     childlist = self.get_object_dependencies(obj, uowcommit, passive=self.passive_deletes)
                     if childlist is not None:
index 732970cad2349b8a2e8aeeb317bd13ddb1dc7fbb..c6b0b2689c13cbdac05a715520c237d63a9e96df 100644 (file)
@@ -192,25 +192,16 @@ class UnitOfWork(object):
         # store objects whose fate has been decided
         processed = util.Set()
 
-
-        # put all saves/updates into the flush context.  detect orphans and throw them into deleted.
+        # put all saves/updates into the flush context.  detect top-level orphans and throw them into deleted.
         for obj in self.new.union(dirty).intersection(objset).difference(self.deleted):
             if obj in processed:
                 continue
-            if object_mapper(obj)._is_orphan(obj):
-                for c in [obj] + list(object_mapper(obj).cascade_iterator('delete', obj)):
-                    if c in processed:
-                        continue
-                    flush_context.register_object(c, isdelete=True)
-                    processed.add(c)
-            else:
-                flush_context.register_object(obj)
-                processed.add(obj)
+
+            flush_context.register_object(obj, isdelete=object_mapper(obj)._is_orphan(obj))
+            processed.add(obj)
 
         # put all remaining deletes into the flush context.
-        for obj in self.deleted:
-            if (objset is not None and not obj in objset) or obj in processed:
-                continue
+        for obj in self.deleted.intersection(objset).difference(processed):
             flush_context.register_object(obj, isdelete=True)
 
         trans = session.create_transaction(autoflush=False)
index 9b2b8528ccea14db816346b639951c18a0f4d978..3fed58dfa8cccbb4f074452f8b2b2ac1d3a2bd31 100644 (file)
@@ -16,12 +16,12 @@ class O2MCascadeTest(testbase.AssertMixin):
         global data
         tables.create()
         mapper(tables.User, tables.users, properties = dict(
-            address = relation(mapper(tables.Address, tables.addresses), lazy = False, uselist = False, private = True),
+            address = relation(mapper(tables.Address, tables.addresses), lazy=True, uselist = False, cascade="all, delete-orphan"),
             orders = relation(
                 mapper(tables.Order, tables.orders, properties = dict (
-                    items = relation(mapper(tables.Item, tables.orderitems), lazy = False, uselist =True, private = True)
+                    items = relation(mapper(tables.Item, tables.orderitems), lazy=True, uselist =True, cascade="all, delete-orphan")
                 )), 
-                lazy = True, uselist = True, private = True)
+                lazy = True, uselist = True, cascade="all, delete-orphan")
         ))
 
     def setUp(self):
@@ -80,16 +80,27 @@ class O2MCascadeTest(testbase.AssertMixin):
             self.echo( repr(u.orders))
         self.assert_result(l, data[0], *data[1:])
 
-        self.echo("\n\n\n")
         ids = (l[0].user_id, l[2].user_id)
         sess.delete(l[0])
         sess.delete(l[2])
 
         sess.flush()
-        self.assert_(tables.orders.count(tables.orders.c.user_id.in_(*ids)).scalar() == 0)
-        self.assert_(tables.orderitems.count(tables.orders.c.user_id.in_(*ids)  &(tables.orderitems.c.order_id==tables.orders.c.order_id)).scalar() == 0)
-        self.assert_(tables.addresses.count(tables.addresses.c.user_id.in_(*ids)).scalar() == 0)
-        self.assert_(tables.users.count(tables.users.c.user_id.in_(*ids)).scalar() == 0)
+        assert tables.orders.count(tables.orders.c.user_id.in_(*ids)).scalar() == 0
+        assert tables.orderitems.count(tables.orders.c.user_id.in_(*ids)  &(tables.orderitems.c.order_id==tables.orders.c.order_id)).scalar() == 0
+        assert tables.addresses.count(tables.addresses.c.user_id.in_(*ids)).scalar() == 0
+        assert tables.users.count(tables.users.c.user_id.in_(*ids)).scalar() == 0
+
+    def testdelete2(self):
+        """test that unloaded collections are still included in a delete-cascade by default."""
+        
+        sess = create_session()
+        u = sess.query(tables.User).get_by(user_name='ed')
+        # assert 'addresses' collection not loaded
+        assert 'addresses' not in u.__dict__
+        sess.delete(u)
+        sess.flush()
+        assert tables.addresses.count(tables.addresses.c.email_address=='foo@bar.com').scalar() == 0
+        assert tables.orderitems.count(tables.orderitems.c.item_name.like('eds%')).scalar() == 0
 
     def testcascadecollection(self):
         """test that cascade only reaches instances that are still part of the collection,
@@ -137,9 +148,13 @@ class M2OCascadeTest(testbase.AssertMixin):
         metadata.drop_all()
         
     def setUpAll(self):
-        global ctx, data, metadata, User, Pref
+        global ctx, data, metadata, User, Pref, Extra
         ctx = SessionContext(create_session)
         metadata = BoundMetaData(testbase.db)
+        extra = Table("extra", metadata, 
+            Column("extra_id", Integer, Sequence("extra_id_seq", optional=True), primary_key=True),
+            Column("prefs_id", Integer, ForeignKey("prefs.prefs_id"))
+        )
         prefs = Table('prefs', metadata, 
             Column('prefs_id', Integer, Sequence('prefs_id_seq', optional=True), primary_key=True),
             Column('prefs_data', String(40)))
@@ -150,43 +165,65 @@ class M2OCascadeTest(testbase.AssertMixin):
             Column('pref_id', Integer, ForeignKey('prefs.prefs_id'))
         )
         class User(object):
-            pass
+            def __init__(self, name):
+                self.user_name = name
         class Pref(object):
+            def __init__(self, data):
+                self.prefs_data = data
+        class Extra(object):
             pass
         metadata.create_all()
+        mapper(Extra, extra)
+        mapper(Pref, prefs, properties=dict(
+            extra = relation(Extra, cascade="all, delete")
+        ))
         mapper(User, users, properties = dict(
-            pref = relation(mapper(Pref, prefs), lazy=False, cascade="all, delete-orphan")
+            pref = relation(Pref, lazy=False, cascade="all, delete-orphan")
         ))
 
     def setUp(self):
-        global data
-        data = [User,
-            {'user_name' : 'ed', 
-                'pref' : (Pref, {'prefs_data' : 'pref 1'}),
-            },
-            {'user_name' : 'jack', 
-                'pref' : (Pref, {'prefs_data' : 'pref 2'}),
-            },
-            {'user_name' : 'foo', 
-                'pref' : (Pref, {'prefs_data' : 'pref 3'}),
-            }        
-        ]
-
-        for elem in data[1:]:
-            u = User()
-            ctx.current.save(u)
-            u.user_name = elem['user_name']
-            u.pref = Pref()
-            u.pref.prefs_data = elem['pref'][1]['prefs_data']
+        u1 = User("ed")
+        u1.pref = Pref("pref 1")
+        u2 = User("jack")
+        u2.pref = Pref("pref 2")
+        u3 = User("foo")
+        u3.pref = Pref("pref 3")
+        u1.pref.extra.append(Extra())
+        u2.pref.extra.append(Extra())
+        u2.pref.extra.append(Extra())
 
+        ctx.current.save(u1)
+        ctx.current.save(u2)
+        ctx.current.save(u3)
         ctx.current.flush()
         ctx.current.clear()
 
     def testorphan(self):
-        l = ctx.current.query(User).select()
-        jack = l[1]
+        jack = ctx.current.query(User).get_by(user_name='jack')
+        p = jack.pref
+        e = jack.pref.extra[0]
         jack.pref = None
         ctx.current.flush()
+        assert p not in ctx.current
+        assert e not in ctx.current
+
+    def testorphan2(self):
+        jack = ctx.current.query(User).get_by(user_name='jack')
+        p = jack.pref
+        e = jack.pref.extra[0]
+        ctx.current.clear()
+
+        jack.pref = None
+        ctx.current.update(jack)
+        ctx.current.update(p)
+        ctx.current.update(e)
+        assert p in ctx.current
+        assert e in ctx.current
+        ctx.current.flush()
+        assert p not in ctx.current
+        assert e not in ctx.current
+    
+        
 
 class M2MCascadeTest(testbase.AssertMixin):
     def setUpAll(self):
@@ -262,6 +299,193 @@ class M2MCascadeTest(testbase.AssertMixin):
         assert atob.count().scalar() ==0
         assert b.count().scalar() == 0
         assert a.count().scalar() == 0
+
+class UnsavedOrphansTest(testbase.ORMTest):
+    """tests regarding pending entities that are orphans"""
+    
+    def define_tables(self, metadata):
+        global users, addresses, User, Address
+        users = Table('users', metadata,
+            Column('user_id', Integer, Sequence('user_id_seq', optional=True), primary_key = True),
+            Column('user_name', String(40)),
+        )
+
+        addresses = Table('email_addresses', metadata,
+            Column('address_id', Integer, Sequence('address_id_seq', optional=True), primary_key = True),
+            Column('user_id', Integer, ForeignKey(users.c.user_id)),
+            Column('email_address', String(40)),
+        )
+        class User(object):pass
+        class Address(object):pass
+        
+    def test_pending_orphan(self):
+        """test that an entity that never had a parent on a delete-orphan cascade cant be saved."""
+
+        mapper(Address, addresses)
+        mapper(User, users, properties=dict(
+            addresses=relation(Address, cascade="all,delete-orphan", backref="user")
+        ))
+        s = create_session()
+        a = Address()
+        s.save(a)
+        try:
+            s.flush()
+        except exceptions.FlushError, e:
+            pass
+        assert a.address_id is None, "Error: address should not be persistent"
+
+    def test_delete_new_object(self):
+        """test that an entity which is attached then detached from its 
+        parent with a delete-orphan cascade gets counted as an orphan"""
+
+        mapper(Address, addresses)
+        mapper(User, users, properties=dict(
+            addresses=relation(Address, cascade="all,delete-orphan", backref="user")
+        ))
+        s = create_session()
+
+        u = User()
+        s.save(u)
+        a = Address()
+        assert a not in s.new
+        u.addresses.append(a)
+        u.addresses.remove(a)
+        s.delete(u)
+        try:
+            s.flush() # (erroneously) causes "a" to be persisted
+            assert False
+        except exceptions.FlushError:
+            assert True
+        assert u.user_id is None, "Error: user should not be persistent"
+        assert a.address_id is None, "Error: address should not be persistent"
+
+
+class UnsavedOrphansTest2(testbase.ORMTest):
+    """same test as UnsavedOrphans only three levels deep"""
+
+    def define_tables(self, meta):
+        global orders, items, attributes
+        orders = Table('orders', meta,
+            Column('id', Integer, Sequence('order_id_seq'), primary_key = True),
+            Column('name', VARCHAR(50)),
+
+        )
+        items = Table('items', meta,
+            Column('id', Integer, Sequence('item_id_seq'), primary_key = True),
+            Column('order_id', Integer, ForeignKey(orders.c.id), nullable=False),
+            Column('name', VARCHAR(50)),
+
+        )
+        attributes = Table('attributes', meta,
+            Column('id', Integer, Sequence('attribute_id_seq'), primary_key = True),
+            Column('item_id', Integer, ForeignKey(items.c.id), nullable=False),
+            Column('name', VARCHAR(50)),
+
+        )
+
+    def testdeletechildwithchild(self):
+        """test that an entity which is attached then detached from its 
+        parent with a delete-orphan cascade gets counted as an orphan, as well
+        as its own child instances"""
+
+        class Order(object): pass
+        class Item(object): pass
+        class Attribute(object): pass
+
+        attrMapper = mapper(Attribute, attributes)
+        itemMapper = mapper(Item, items, properties=dict(
+            attributes=relation(attrMapper, cascade="all,delete-orphan", backref="item")
+        ))
+        orderMapper = mapper(Order, orders, properties=dict(
+            items=relation(itemMapper, cascade="all,delete-orphan", backref="order")
+        ))
+
+        s = create_session( )
+        order = Order()
+        s.save(order)
+
+        item = Item()
+        attr = Attribute()
+        item.attributes.append(attr)
+
+        order.items.append(item)
+        order.items.remove(item) # item is an orphan, but attr is not so flush() tries to save attr
+        try:
+            s.flush()
+            assert False
+        except exceptions.FlushError, e:
+            print e
+            assert True
+
+        assert item.id is None
+        assert attr.id is None
+
+class DoubleParentOrphanTest(testbase.AssertMixin):
+    """test orphan detection for an entity with two parent relations"""
+    
+    def setUpAll(self):
+        global metadata, address_table, businesses, homes
+        metadata = BoundMetaData(testbase.db)
+        address_table = Table('addresses', metadata,
+            Column('address_id', Integer, primary_key=True),
+            Column('street', String(30)),
+        )
+
+        homes = Table('homes', metadata,
+            Column('home_id', Integer, primary_key=True),
+            Column('description', String(30)),
+            Column('address_id', Integer, ForeignKey('addresses.address_id'), nullable=False),
+        )
+
+        businesses = Table('businesses', metadata,
+            Column('business_id', Integer, primary_key=True, key="id"),
+            Column('description', String(30), key="description"),
+            Column('address_id', Integer, ForeignKey('addresses.address_id'), nullable=False),
+        )
+        metadata.create_all()
+    def tearDown(self):
+        clear_mappers()
+    def tearDownAll(self):
+        metadata.drop_all()
+    def test_non_orphan(self):
+        """test that an entity can have two parent delete-orphan cascades, and persists normally."""
+        
+        class Address(object):pass
+        class Home(object):pass
+        class Business(object):pass
+        mapper(Address, address_table)
+        mapper(Home, homes, properties={'address':relation(Address, cascade="all,delete-orphan")})
+        mapper(Business, businesses, properties={'address':relation(Address, cascade="all,delete-orphan")})
+
+        session = create_session()
+        a1 = Address()
+        a2 = Address()
+        h1 = Home()
+        b1 = Business()
+        h1.address = a1
+        b1.address = a2
+        [session.save(x) for x in [h1,b1]]
+        session.flush()
+        
+    def test_orphan(self):
+        """test that an entity can have two parent delete-orphan cascades, and is detected as an orphan
+        when saved without a parent."""
+
+        class Address(object):pass
+        class Home(object):pass
+        class Business(object):pass
+        mapper(Address, address_table)
+        mapper(Home, homes, properties={'address':relation(Address, cascade="all,delete-orphan")})
+        mapper(Business, businesses, properties={'address':relation(Address, cascade="all,delete-orphan")})
+
+        session = create_session()
+        a1 = Address()
+        session.save(a1)
+        try:
+            session.flush()
+            assert False
+        except exceptions.FlushError, e:
+            assert True
     
             
 if __name__ == "__main__":
index 705e56e96c627966b76c241665cab0e4fcc02d82..d659d834bd2ff47318b05117577749ade2902d6e 100644 (file)
@@ -211,174 +211,6 @@ class SessionTest(AssertMixin):
         key = s.identity_key(User, row=row, entity_name="en")
         self._assert_key(key, (User, (1,), "en"))
         
-class OrphanDeletionTest(AssertMixin):
-
-    def setUpAll(self):
-        tables.create()
-        tables.data()
-    def tearDownAll(self):
-        tables.drop()
-    def tearDown(self):
-        clear_mappers()
-    def setUp(self):
-        pass
-
-    def test_orphan(self):
-        mapper(Address, addresses)
-        mapper(User, users, properties=dict(
-            addresses=relation(Address, cascade="all,delete-orphan", backref="user")
-        ))
-        s = create_session()
-        a = Address()
-        s.save(a)
-        try:
-            s.flush()
-        except exceptions.FlushError, e:
-            pass
-        assert a.address_id is None, "Error: address should not be persistent"
-        
-    def test_delete_new_object(self):
-        mapper(Address, addresses)
-        mapper(User, users, properties=dict(
-            addresses=relation(Address, cascade="all,delete-orphan", backref="user")
-        ))
-        s = create_session()
-
-        u = User()
-        s.save(u)
-        a = Address()
-        assert a not in s.new
-        u.addresses.append(a)
-        u.addresses.remove(a)
-        s.delete(u)
-        try:
-            s.flush() # (erroneously) causes "a" to be persisted
-            assert False
-        except exceptions.FlushError:
-            assert True
-        assert u.user_id is None, "Error: user should not be persistent"
-        assert a.address_id is None, "Error: address should not be persistent"
-
-
-class CascadingOrphanDeletionTest(AssertMixin):
-    def setUpAll(self):
-        global meta, orders, items, attributes
-        meta = BoundMetaData(db)
-
-        orders = Table('orders', meta,
-            Column('id', Integer, Sequence('order_id_seq'), primary_key = True),
-            Column('name', VARCHAR(50)),
-
-        )
-        items = Table('items', meta,
-            Column('id', Integer, Sequence('item_id_seq'), primary_key = True),
-            Column('order_id', Integer, ForeignKey(orders.c.id), nullable=False),
-            Column('name', VARCHAR(50)),
-
-        )
-        attributes = Table('attributes', meta,
-            Column('id', Integer, Sequence('attribute_id_seq'), primary_key = True),
-            Column('item_id', Integer, ForeignKey(items.c.id), nullable=False),
-            Column('name', VARCHAR(50)),
-
-        )
-
-    def setUp(self):
-        meta.create_all()
-    def tearDown(self):
-        meta.drop_all()
-
-    def testdeletechildwithchild(self):
-        class Order(object): pass
-        class Item(object): pass
-        class Attribute(object): pass
-
-        attrMapper = mapper(Attribute, attributes)
-        itemMapper = mapper(Item, items, properties=dict(
-            attributes=relation(attrMapper, cascade="all,delete-orphan", backref="item")
-        ))
-        orderMapper = mapper(Order, orders, properties=dict(
-            items=relation(itemMapper, cascade="all,delete-orphan", backref="order")
-        ))
-
-        s = create_session( )
-        order = Order()
-        s.save(order)
-
-        item = Item()
-        attr = Attribute()
-        item.attributes.append(attr)
-
-        order.items.append(item)
-        order.items.remove(item) # item is an orphan, but attr is not so flush() tries to save attr
-        try:
-            s.flush()
-            assert False
-        except exceptions.FlushError, e:
-            print e
-            assert True
-
-        assert item.id is None
-        assert attr.id is None
-
-class DoubleOrphanTest(testbase.AssertMixin):
-    def setUpAll(self):
-        global metadata, address_table, businesses, homes
-        metadata = BoundMetaData(testbase.db)
-        address_table = Table('addresses', metadata,
-            Column('address_id', Integer, primary_key=True),
-            Column('street', String(30)),
-        )
-
-        homes = Table('homes', metadata,
-            Column('home_id', Integer, primary_key=True),
-            Column('description', String(30)),
-            Column('address_id', Integer, ForeignKey('addresses.address_id'), nullable=False),
-        )
-
-        businesses = Table('businesses', metadata,
-            Column('business_id', Integer, primary_key=True, key="id"),
-            Column('description', String(30), key="description"),
-            Column('address_id', Integer, ForeignKey('addresses.address_id'), nullable=False),
-        )
-        metadata.create_all()
-    def tearDown(self):
-        clear_mappers()
-    def tearDownAll(self):
-        metadata.drop_all()
-    def test_non_orphan(self):
-        class Address(object):pass
-        class Home(object):pass
-        class Business(object):pass
-        mapper(Address, address_table)
-        mapper(Home, homes, properties={'address':relation(Address, cascade="all,delete-orphan")})
-        mapper(Business, businesses, properties={'address':relation(Address, cascade="all,delete-orphan")})
-        
-        session = create_session()
-        a1 = Address()
-        a2 = Address()
-        h1 = Home()
-        b1 = Business()
-        h1.address = a1
-        b1.address = a2
-        [session.save(x) for x in [h1,b1]]
-        session.flush()
-    def test_orphan(self):
-        class Address(object):pass
-        class Home(object):pass
-        class Business(object):pass
-        mapper(Address, address_table)
-        mapper(Home, homes, properties={'address':relation(Address, cascade="all,delete-orphan")})
-        mapper(Business, businesses, properties={'address':relation(Address, cascade="all,delete-orphan")})
-        
-        session = create_session()
-        a1 = Address()
-        session.save(a1)
-        try:
-            session.flush()
-            assert False
-        except exceptions.FlushError, e:
-            assert True
         
 if __name__ == "__main__":    
     testbase.main()
index f4da9cf61f2eabebf4471facfb2e04124ac3a064..62b1db8604124023a0bfc0f0b1569382e42a986c 100644 (file)
@@ -513,82 +513,6 @@ class PassiveDeletesTest(UnitOfWorkTest):
         
 
         
-class PrivateAttrTest(UnitOfWorkTest):
-    """tests various things to do with private=True mappers"""
-    def setUpAll(self):
-        UnitOfWorkTest.setUpAll(self)
-        global a_table, b_table
-        a_table = Table('a',testbase.db,
-            Column('a_id', Integer, Sequence('next_a_id'), primary_key=True),
-            Column('data', String(10)),
-            )
-    
-        b_table = Table('b',testbase.db,
-            Column('b_id',Integer,Sequence('next_b_id'),primary_key=True),
-            Column('a_id',Integer,ForeignKey('a.a_id')),
-            Column('data',String(10)))
-        a_table.create()
-        b_table.create()
-    def tearDownAll(self):
-        b_table.drop()
-        a_table.drop()
-        UnitOfWorkTest.tearDownAll(self)
-    
-    def testsinglecommit(self):
-        """tests that a commit of a single object deletes private relationships"""
-        class A(object):pass
-        class B(object):pass
-    
-        assign_mapper(B,b_table)
-        assign_mapper(A,a_table,properties= {'bs' : relation(B.mapper,private=True)})
-    
-        # create some objects
-        a = A(data='a1')
-        a.bs = []
-    
-        # add a 'B' instance
-        b1 = B(data='1111')
-        a.bs.append(b1)
-    
-        # add another one
-        b2 = B(data='2222')
-        a.bs.append(b2)
-    
-        # inserts both A and Bs
-        ctx.current.flush([a])
-    
-        ctx.current.delete(a)
-        print ctx.current.deleted
-        ctx.current.flush([a])
-#        ctx.current.flush()
-        
-        assert b_table.count().scalar() == 0
-
-    def testswitchparent(self):
-        """tests that you can switch the parent of an object in a backref scenario"""
-        class A(object):pass
-        class B(object):pass
-    
-        assign_mapper(B,b_table)
-        assign_mapper(A,a_table,properties= {
-            'bs' : relation (B.mapper,private=True, backref='a')}
-        )
-        a1 = A(data='testa1')
-        a2 = A(data='testa2')
-        b = B(data='testb')
-        b.a = a1
-        ctx.current.flush()
-        ctx.current.clear()
-        sess = ctx.current
-        a1 = Query(A).get(a1.a_id)
-        a2 = Query(A).get(a2.a_id)
-        assert a1.bs[0].a is a1
-        b = a1.bs[0]
-        b.a = a2
-        assert b not in sess.deleted
-        ctx.current.flush()
-        assert b in sess.identity_map.values()
-                
 class DefaultTest(UnitOfWorkTest):
     """tests that when saving objects whose table contains DefaultGenerators, either python-side, preexec or database-side,
     the newly saved instances receive all the default values either through a post-fetch or getting the pre-exec'ed