]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- some cleanup to the unitofwork test suite (needs much more)
authorMike Bayer <mike_mp@zzzcomputing.com>
Thu, 15 Feb 2007 02:07:06 +0000 (02:07 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Thu, 15 Feb 2007 02:07:06 +0000 (02:07 +0000)
- fixed relationship deletion error when one-to-many child item is moved to a new
  parent in a single unit of work [ticket:478]

CHANGES
lib/sqlalchemy/orm/dependency.py
test/orm/unitofwork.py
test/tables.py

diff --git a/CHANGES b/CHANGES
index dd4f897bb266712168334b1ab59114f5516f59b5..6143acde0a8e8be8ae04dd25b3c0aedafa04e885 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -29,6 +29,8 @@
   - contains_eager('foo') automatically implies eagerload('foo')
   - fixed bug where cascade operations incorrectly included deleted collection
   items in the cascade [ticket:445]
+  - fixed relationship deletion error when one-to-many child item is moved to a new 
+  parent in a single unit of work [ticket:478]
   - fixed relationship deletion error where parent/child with a single column as PK/FK 
   on the child would raise a "blank out the primary key" error, if manually deleted
   or "delete" cascade without "delete-orphan" was used
index 4e4385ef12a21c2b2e1778297842e71616228935..a8d8ad507b911f5c4445d96d589bc553852f6731 100644 (file)
@@ -41,6 +41,10 @@ class DependencyProcessor(object):
 
         self._compile_synchronizers()
 
+    def _get_instrumented_attribute(self):
+        """return the InstrumentedAttribute handled by this DependencyProecssor"""
+        return getattr(self.parent.class_, self.key)
+        
     def register_dependencies(self, uowcommit):
         """tells a UOWTransaction what mappers are dependent on which, with regards
         to the two or three mappers handled by this PropertyLoader.
@@ -148,7 +152,7 @@ class OneToManyDP(DependencyProcessor):
                         self._synchronize(obj, child, None, False, uowcommit)
                         self._conditional_post_update(child, uowcommit, [obj])
                     for child in childlist.deleted_items():
-                        if not self.cascade.delete_orphan:
+                        if not self.cascade.delete_orphan and not self._get_instrumented_attribute().hasparent(child):
                             self._synchronize(obj, child, None, True, uowcommit)
 
     def preprocess_dependencies(self, task, deplist, uowcommit, delete = False):
index 5539df6b71496c93b38044b7b5ee35e5f109897c..b88325a7ff441cced48e3f28bd5c22c88aa1230b 100644 (file)
@@ -670,7 +670,216 @@ class DefaultTest(UnitOfWorkTest):
         h1.counter = 19
         ctx.current.flush()
         self.assert_(h1.foober == 'im the update')
-        
+
+class OneToManyTest(UnitOfWorkTest):
+    def setUpAll(self):
+        UnitOfWorkTest.setUpAll(self)
+        tables.create()
+    def tearDownAll(self):
+        tables.drop()
+        UnitOfWorkTest.tearDownAll(self)
+    def tearDown(self):
+        tables.delete()
+        UnitOfWorkTest.tearDown(self)
+
+    def testonetomany_1(self):
+        """test basic save of one to many."""
+        m = mapper(User, users, properties = dict(
+            addresses = relation(mapper(Address, addresses), lazy = True)
+        ))
+        u = User()
+        u.user_name = 'one2manytester'
+        u.addresses = []
+        a = Address()
+        a.email_address = 'one2many@test.org'
+        u.addresses.append(a)
+        a2 = Address()
+        a2.email_address = 'lala@test.org'
+        u.addresses.append(a2)
+        self.echo( repr(u.addresses))
+        self.echo( repr(u.addresses.added_items()))
+        ctx.current.flush()
+
+        usertable = users.select(users.c.user_id.in_(u.user_id)).execute().fetchall()
+        self.assertEqual(usertable[0].values(), [u.user_id, 'one2manytester'])
+        addresstable = addresses.select(addresses.c.address_id.in_(a.address_id, a2.address_id), order_by=[addresses.c.email_address]).execute().fetchall()
+        self.assertEqual(addresstable[0].values(), [a2.address_id, u.user_id, 'lala@test.org'])
+        self.assertEqual(addresstable[1].values(), [a.address_id, u.user_id, 'one2many@test.org'])
+
+        userid = u.user_id
+        addressid = a2.address_id
+
+        a2.email_address = 'somethingnew@foo.com'
+
+        ctx.current.flush()
+
+        addresstable = addresses.select(addresses.c.address_id == addressid).execute().fetchall()
+        self.assertEqual(addresstable[0].values(), [addressid, userid, 'somethingnew@foo.com'])
+        self.assert_(u.user_id == userid and a2.address_id == addressid)
+
+    def testonetomany_2(self):
+        """digs deeper into modifying the child items of an object to insure the correct
+        updates take place"""
+        m = mapper(User, users, properties = dict(
+            addresses = relation(mapper(Address, addresses), lazy = True)
+        ))
+        u1 = User()
+        u1.user_name = 'user1'
+        u1.addresses = []
+        a1 = Address()
+        a1.email_address = 'emailaddress1'
+        u1.addresses.append(a1)
+        u2 = User()
+        u2.user_name = 'user2'
+        u2.addresses = []
+        a2 = Address()
+        a2.email_address = 'emailaddress2'
+        u2.addresses.append(a2)
+
+        a3 = Address()
+        a3.email_address = 'emailaddress3'
+
+        ctx.current.flush()
+
+        # modify user2 directly, append an address to user1.
+        # upon commit, user2 should be updated, user1 should not
+        # both address1 and address3 should be updated
+        u2.user_name = 'user2modified'
+        u1.addresses.append(a3)
+        del u1.addresses[0]
+        self.assert_sql(db, lambda: ctx.current.flush(), 
+                [
+                    (
+                        "UPDATE users SET user_name=:user_name WHERE users.user_id = :users_user_id",
+                        {'users_user_id': u2.user_id, 'user_name': 'user2modified'}
+                    ),
+                    ("UPDATE email_addresses SET user_id=:user_id WHERE email_addresses.address_id = :email_addresses_address_id",
+                        {'user_id': None, 'email_addresses_address_id': a1.address_id}
+                    ),
+                    (
+                        "UPDATE email_addresses SET user_id=:user_id WHERE email_addresses.address_id = :email_addresses_address_id",
+                        {'user_id': u1.user_id, 'email_addresses_address_id': a3.address_id}
+                    ),
+                ])
+
+    def testchildmove(self):
+        """tests moving a child from one parent to the other, then deleting the first parent, properly
+        updates the child with the new parent.  this tests the 'trackparent' option in the attributes module."""
+        m = mapper(User, users, properties = dict(
+            addresses = relation(mapper(Address, addresses), lazy = True)
+        ))
+        u1 = User()
+        u1.user_name = 'user1'
+        u2 = User()
+        u2.user_name = 'user2'
+        a = Address()
+        a.email_address = 'address1'
+        u1.addresses.append(a)
+        ctx.current.flush()
+        del u1.addresses[0]
+        u2.addresses.append(a)
+        ctx.current.delete(u1)
+        ctx.current.flush()
+        ctx.current.clear()
+        u2 = ctx.current.get(User, u2.user_id)
+        assert len(u2.addresses) == 1
+
+    def testchildmove_2(self):
+        m = mapper(User, users, properties = dict(
+            addresses = relation(mapper(Address, addresses), lazy = True)
+        ))
+        u1 = User()
+        u1.user_name = 'user1'
+        u2 = User()
+        u2.user_name = 'user2'
+        a = Address()
+        a.email_address = 'address1'
+        u1.addresses.append(a)
+        ctx.current.flush()
+        del u1.addresses[0]
+        u2.addresses.append(a)
+        ctx.current.flush()
+        ctx.current.clear()
+        u2 = ctx.current.get(User, u2.user_id)
+        assert len(u2.addresses) == 1
+
+    def testo2mdeleteparent(self):
+        m = mapper(User, users, properties = dict(
+            address = relation(mapper(Address, addresses), lazy = True, uselist = False, private = False)
+        ))
+        u = User()
+        a = Address()
+        u.user_name = 'one2onetester'
+        u.address = a
+        u.address.email_address = 'myonlyaddress@foo.com'
+        ctx.current.flush()
+        ctx.current.delete(u)
+        ctx.current.flush()
+        self.assert_(a.address_id is not None and a.user_id is None and not ctx.current.identity_map.has_key(u._instance_key) and ctx.current.identity_map.has_key(a._instance_key))
+
+    def testonetoone(self):
+        m = mapper(User, users, properties = dict(
+            address = relation(mapper(Address, addresses), lazy = True, uselist = False)
+        ))
+        u = User()
+        u.user_name = 'one2onetester'
+        u.address = Address()
+        u.address.email_address = 'myonlyaddress@foo.com'
+        ctx.current.flush()
+        u.user_name = 'imnew'
+        ctx.current.flush()
+        u.address.email_address = 'imnew@foo.com'
+        ctx.current.flush()
+
+    def testbidirectional(self):
+        m1 = mapper(User, users)
+
+        m2 = mapper(Address, addresses, properties = dict(
+            user = relation(m1, lazy = False, backref='addresses')
+        ))
+
+
+        u = User()
+        print repr(u.addresses)
+        u.user_name = 'test'
+        a = Address()
+        a.email_address = 'testaddress'
+        a.user = u
+        ctx.current.flush()
+        print repr(u.addresses)
+        x = False
+        try:
+            u.addresses.append('hi')
+            x = True
+        except:
+            pass
+
+        if x:
+            self.assert_(False, "User addresses element should be scalar based")
+
+        ctx.current.delete(u)
+        ctx.current.flush()
+
+    def testdoublerelation(self):
+        m2 = mapper(Address, addresses)
+        m = mapper(User, users, properties={
+            'boston_addresses' : relation(m2, primaryjoin=
+                        and_(users.c.user_id==Address.c.user_id, 
+                        Address.c.email_address.like('%boston%'))),
+            'newyork_addresses' : relation(m2, primaryjoin=
+                        and_(users.c.user_id==Address.c.user_id, 
+                        Address.c.email_address.like('%newyork%'))),
+        })
+        u = User()
+        a = Address()
+        a.email_address = 'foo@boston.com'
+        b = Address()
+        b.email_address = 'bar@newyork.com'
+
+        u.boston_addresses.append(a)
+        u.newyork_addresses.append(b)
+        ctx.current.flush()
+
 class SaveTest(UnitOfWorkTest):
 
     def setUpAll(self):
@@ -693,9 +902,6 @@ class SaveTest(UnitOfWorkTest):
 
     def tearDown(self):
         tables.delete()
-
-        #self.assert_(len(ctx.current.new) == 0)
-        #self.assert_(len(ctx.current.dirty) == 0)
         UnitOfWorkTest.tearDown(self)
 
     def testbasic(self):
@@ -857,35 +1063,6 @@ class SaveTest(UnitOfWorkTest):
         assert addresses.count().scalar() == 0
         
             
-    def testm2mmultitable(self):
-        # many-to-many join on an association table
-        j = join(users, userkeywords, 
-                users.c.user_id==userkeywords.c.user_id).join(keywords, 
-                   userkeywords.c.keyword_id==keywords.c.keyword_id)
-
-        # a class 
-        class KeywordUser(object):
-            pass
-
-        # map to it - the identity of a KeywordUser object will be
-        # (user_id, keyword_id) since those are the primary keys involved
-        m = mapper(KeywordUser, j, properties={
-            'user_id':[users.c.user_id, userkeywords.c.user_id],
-            'keyword_id':[userkeywords.c.keyword_id, keywords.c.keyword_id],
-            'keyword_name':keywords.c.name
-            
-        })
-        
-        k = KeywordUser()
-        k.user_name = 'keyworduser'
-        k.keyword_name = 'a keyword'
-        ctx.current.flush()
-        print m.instance_key(k)
-        id = (k.user_id, k.keyword_id)
-        ctx.current.clear()
-        k = ctx.current.query(KeywordUser).get(id)
-        assert k.user_name == 'keyworduser'
-        assert k.keyword_name == 'a keyword'
     
     def testbatchmode(self):
         class TestExtension(MapperExtension):
@@ -913,63 +1090,19 @@ class SaveTest(UnitOfWorkTest):
         except AssertionError:
             assert True
         
-    def testonetoone(self):
-        m = mapper(User, users, properties = dict(
-            address = relation(mapper(Address, addresses), lazy = True, uselist = False)
-        ))
-        u = User()
-        u.user_name = 'one2onetester'
-        u.address = Address()
-        u.address.email_address = 'myonlyaddress@foo.com'
-        ctx.current.flush()
-        u.user_name = 'imnew'
-        ctx.current.flush()
-        u.address.email_address = 'imnew@foo.com'
-        ctx.current.flush()
-
-    def testchildmove(self):
-        """tests moving a child from one parent to the other, then deleting the first parent, properly
-        updates the child with the new parent.  this tests the 'trackparent' option in the attributes module."""
-        m = mapper(User, users, properties = dict(
-            addresses = relation(mapper(Address, addresses), lazy = True)
-        ))
-        u1 = User()
-        u1.user_name = 'user1'
-        u2 = User()
-        u2.user_name = 'user2'
-        a = Address()
-        a.email_address = 'address1'
-        u1.addresses.append(a)
-        ctx.current.flush()
-        del u1.addresses[0]
-        u2.addresses.append(a)
-        ctx.current.delete(u1)
-        ctx.current.flush()
-        ctx.current.clear()
-        u2 = ctx.current.get(User, u2.user_id)
-        assert len(u2.addresses) == 1
-    
-    def testdelete(self):
-        m = mapper(User, users, properties = dict(
-            address = relation(mapper(Address, addresses), lazy = True, uselist = False, private = False)
-        ))
-        u = User()
-        a = Address()
-        u.user_name = 'one2onetester'
-        u.address = a
-        u.address.email_address = 'myonlyaddress@foo.com'
-        ctx.current.flush()
-        self.echo("\n\n\n")
-        ctx.current.delete(u)
-        ctx.current.flush()
-        self.assert_(a.address_id is not None and a.user_id is None and not ctx.current.identity_map.has_key(u._instance_key) and ctx.current.identity_map.has_key(a._instance_key))
     
+class ManyToOneTest(UnitOfWorkTest):
+    def setUpAll(self):
+        UnitOfWorkTest.setUpAll(self)
+        tables.create()
+    def tearDownAll(self):
+        tables.drop()
+        UnitOfWorkTest.tearDownAll(self)
+    def tearDown(self):
+        tables.delete()
+        UnitOfWorkTest.tearDown(self)
     
-    def testbackwardsonetoone(self):
-        # test 'backwards'
-#        m = mapper(Address, addresses, properties = dict(
-#            user = relation(User, users, foreignkey = addresses.c.user_id, primaryjoin = users.c.user_id == addresses.c.user_id, lazy = True, uselist = False)
-#        ))
+    def testm2oonetoone(self):
         # TODO: put assertion in here !!!
         m = mapper(Address, addresses, properties = dict(
             user = relation(mapper(User, users), lazy = True, uselist = False)
@@ -1023,148 +1156,99 @@ class SaveTest(UnitOfWorkTest):
                 },
                 
         ])
-        l = sql.select([users, addresses], sql.and_(users.c.user_id==addresses.c.address_id, addresses.c.address_id==a.address_id)).execute()
-        self.echo( repr(l.fetchone().values()))
+        l = sql.select([users, addresses], sql.and_(users.c.user_id==addresses.c.user_id, addresses.c.address_id==a.address_id)).execute()
+        assert l.fetchone().values() == [a.user.user_id, 'asdf8d', a.address_id, a.user_id, 'theater@foo.com']
 
-        
 
-    def testonetomany(self):
-        """test basic save of one to many."""
-        m = mapper(User, users, properties = dict(
-            addresses = relation(mapper(Address, addresses), lazy = True)
+    def testmanytoone_1(self):
+        m = mapper(Address, addresses, properties = dict(
+            user = relation(mapper(User, users), lazy = True)
         ))
-        u = User()
-        u.user_name = 'one2manytester'
-        u.addresses = []
-        a = Address()
-        a.email_address = 'one2many@test.org'
-        u.addresses.append(a)
-        a2 = Address()
-        a2.email_address = 'lala@test.org'
-        u.addresses.append(a2)
-        self.echo( repr(u.addresses))
-        self.echo( repr(u.addresses.added_items()))
-        ctx.current.flush()
-
-        usertable = users.select(users.c.user_id.in_(u.user_id)).execute().fetchall()
-        self.assertEqual(usertable[0].values(), [u.user_id, 'one2manytester'])
-        addresstable = addresses.select(addresses.c.address_id.in_(a.address_id, a2.address_id), order_by=[addresses.c.email_address]).execute().fetchall()
-        self.assertEqual(addresstable[0].values(), [a2.address_id, u.user_id, 'lala@test.org'])
-        self.assertEqual(addresstable[1].values(), [a.address_id, u.user_id, 'one2many@test.org'])
-
-        userid = u.user_id
-        addressid = a2.address_id
+        a1 = Address()
+        a1.email_address = 'emailaddress1'
+        u1 = User()
+        u1.user_name='user1'
         
-        a2.email_address = 'somethingnew@foo.com'
-
+        a1.user = u1
         ctx.current.flush()
+        ctx.current.clear()
+        a1 = ctx.current.query(Address).get(a1.address_id)
+        u1 = ctx.current.query(User).get(u1.user_id)
+        assert a1.user is u1
 
-        
-        addresstable = addresses.select(addresses.c.address_id == addressid).execute().fetchall()
-        self.assertEqual(addresstable[0].values(), [addressid, userid, 'somethingnew@foo.com'])
-        self.assert_(u.user_id == userid and a2.address_id == addressid)
-
-    def testmapperswitch(self):
-        """test that, if we change mappers, the new one gets used fully. """
-        users.insert().execute(
-            dict(user_id = 7, user_name = 'jack'),
-            dict(user_id = 8, user_name = 'ed'),
-            dict(user_id = 9, user_name = 'fred')
-        )
-
-        # mapper with just users table
-        assign_mapper(User, users)
-        ctx.current.query(User).select()
-        oldmapper = User.mapper
-        # now a mapper with the users table plus a relation to the addresses
-        clear_mapper(User.mapper)
-        assign_mapper(User, users, properties = dict(
-            addresses = relation(mapper(Address, addresses), lazy = False)
-        ))
-        self.assert_(oldmapper is not User.mapper)
-        u = ctx.current.query(User).select()
-        u[0].addresses.append(Address())
-        u[0].addresses[0].email_address='hi'
-        
-        # insure that upon commit, the new mapper with the address relation is used
-        self.assert_sql(db, lambda: ctx.current.flush(), 
-                [
-                    (
-                    "INSERT INTO email_addresses (user_id, email_address) VALUES (:user_id, :email_address)",
-                    {'email_address': 'hi', 'user_id': 7}
-                    ),
-                ],
-                with_sequences=[
-                    (
-                    "INSERT INTO email_addresses (address_id, user_id, email_address) VALUES (:address_id, :user_id, :email_address)",
-                    lambda ctx:{'email_address': 'hi', 'user_id': 7, 'address_id':ctx.last_inserted_ids()[0]}
-                    ),
-                ]
-        )
+        a1.user = None
+        ctx.current.flush()
+        ctx.current.clear()
+        a1 = ctx.current.query(Address).get(a1.address_id)
+        u1 = ctx.current.query(User).get(u1.user_id)
+        assert a1.user is None
 
-    def testchildmanipulations(self):
-        """digs deeper into modifying the child items of an object to insure the correct
-        updates take place"""
-        m = mapper(User, users, properties = dict(
-            addresses = relation(mapper(Address, addresses), lazy = True)
+    def testmanytoone_2(self):
+        m = mapper(Address, addresses, properties = dict(
+            user = relation(mapper(User, users), lazy = True)
         ))
-        u1 = User()
-        u1.user_name = 'user1'
-        u1.addresses = []
         a1 = Address()
         a1.email_address = 'emailaddress1'
-        u1.addresses.append(a1)
-        u2 = User()
-        u2.user_name = 'user2'
-        u2.addresses = []
         a2 = Address()
         a2.email_address = 'emailaddress2'
-        u2.addresses.append(a2)
-
-        a3 = Address()
-        a3.email_address = 'emailaddress3'
+        u1 = User()
+        u1.user_name='user1'
 
+        a1.user = u1
         ctx.current.flush()
-        
-        self.echo("\n\n\n")
-        # modify user2 directly, append an address to user1.
-        # upon commit, user2 should be updated, user1 should not
-        # both address1 and address3 should be updated
-        u2.user_name = 'user2modified'
-        u1.addresses.append(a3)
-        del u1.addresses[0]
-        self.assert_sql(db, lambda: ctx.current.flush(), 
-                [
-                    (
-                        "UPDATE users SET user_name=:user_name WHERE users.user_id = :users_user_id",
-                        {'users_user_id': u2.user_id, 'user_name': 'user2modified'}
-                    ),
-                    ("UPDATE email_addresses SET user_id=:user_id WHERE email_addresses.address_id = :email_addresses_address_id",
-                        {'user_id': None, 'email_addresses_address_id': a1.address_id}
-                    ),
-                    (
-                        "UPDATE email_addresses SET user_id=:user_id WHERE email_addresses.address_id = :email_addresses_address_id",
-                        {'user_id': u1.user_id, 'email_addresses_address_id': a3.address_id}
-                    ),
-                ])
+        ctx.current.clear()
+        a1 = ctx.current.query(Address).get(a1.address_id)
+        a2 = ctx.current.query(Address).get(a2.address_id)
+        u1 = ctx.current.query(User).get(u1.user_id)
+        assert a1.user is u1
+        a1.user = None
+        a2.user = u1
+        ctx.current.flush()
+        ctx.current.clear()
+        a1 = ctx.current.query(Address).get(a1.address_id)
+        a2 = ctx.current.query(Address).get(a2.address_id)
+        u1 = ctx.current.query(User).get(u1.user_id)
+        assert a1.user is None
+        assert a2.user is u1
 
-    def testbackwardsmanipulations(self):
+    def testmanytoone_3(self):
         m = mapper(Address, addresses, properties = dict(
-            user = relation(mapper(User, users), lazy = True, uselist = False)
+            user = relation(mapper(User, users), lazy = True)
         ))
         a1 = Address()
         a1.email_address = 'emailaddress1'
         u1 = User()
         u1.user_name='user1'
-        
+        u2 = User()
+        u2.user_name='user2'
+
         a1.user = u1
         ctx.current.flush()
-
-        self.echo("\n\n\n")
-        ctx.current.delete(u1)
-        a1.user = None
+        ctx.current.clear()
+        a1 = ctx.current.query(Address).get(a1.address_id)
+        u1 = ctx.current.query(User).get(u1.user_id)
+        u2 = ctx.current.query(User).get(u2.user_id)
+        assert a1.user is u1
+        
+        a1.user = u2
         ctx.current.flush()
-
+        ctx.current.clear()
+        a1 = ctx.current.query(Address).get(a1.address_id)
+        u1 = ctx.current.query(User).get(u1.user_id)
+        u2 = ctx.current.query(User).get(u2.user_id)
+        assert a1.user is u2
+        
+class ManyToManyTest(UnitOfWorkTest):
+    def setUpAll(self):
+        UnitOfWorkTest.setUpAll(self)
+        tables.create()
+    def tearDownAll(self):
+        tables.drop()
+        UnitOfWorkTest.tearDownAll(self)
+    def tearDown(self):
+        tables.delete()
+        UnitOfWorkTest.tearDown(self)
+        
     def testmanytomany(self):
         items = orderitems
 
@@ -1374,54 +1458,36 @@ class SaveTest(UnitOfWorkTest):
         l = Query(m).select(items.c.item_name.in_(*[e['item_name'] for e in data[1:]]), order_by=[items.c.item_name, keywords.c.name])
         self.assert_result(l, *data)
 
-    def testbidirectional(self):
-        m1 = mapper(User, users)
-        
-        m2 = mapper(Address, addresses, properties = dict(
-            user = relation(m1, lazy = False, backref='addresses')
-        ))
-        
-        u = User()
-        print repr(u.addresses)
-        u.user_name = 'test'
-        a = Address()
-        a.email_address = 'testaddress'
-        a.user = u
-        ctx.current.flush()
-        print repr(u.addresses)
-        x = False
-        try:
-            u.addresses.append('hi')
-            x = True
-        except:
+    def testm2mmultitable(self):
+        # many-to-many join on an association table
+        j = join(users, userkeywords, 
+                users.c.user_id==userkeywords.c.user_id).join(keywords, 
+                   userkeywords.c.keyword_id==keywords.c.keyword_id)
+
+        # a class 
+        class KeywordUser(object):
             pass
-            
-        if x:
-            self.assert_(False, "User addresses element should be scalar based")
-        
-        ctx.current.delete(u)
-        ctx.current.flush()
 
-    def testdoublerelation(self):
-        m2 = mapper(Address, addresses)
-        m = mapper(User, users, properties={
-            'boston_addresses' : relation(m2, primaryjoin=
-                        and_(users.c.user_id==Address.c.user_id, 
-                        Address.c.email_address.like('%boston%'))),
-            'newyork_addresses' : relation(m2, primaryjoin=
-                        and_(users.c.user_id==Address.c.user_id, 
-                        Address.c.email_address.like('%newyork%'))),
+        # map to it - the identity of a KeywordUser object will be
+        # (user_id, keyword_id) since those are the primary keys involved
+        m = mapper(KeywordUser, j, properties={
+            'user_id':[users.c.user_id, userkeywords.c.user_id],
+            'keyword_id':[userkeywords.c.keyword_id, keywords.c.keyword_id],
+            'keyword_name':keywords.c.name
+
         })
-        u = User()
-        a = Address()
-        a.email_address = 'foo@boston.com'
-        b = Address()
-        b.email_address = 'bar@newyork.com'
-        
-        u.boston_addresses.append(a)
-        u.newyork_addresses.append(b)
+
+        k = KeywordUser()
+        k.user_name = 'keyworduser'
+        k.keyword_name = 'a keyword'
         ctx.current.flush()
+        print m.instance_key(k)
+        id = (k.user_id, k.keyword_id)
+        ctx.current.clear()
+        k = ctx.current.query(KeywordUser).get(id)
+        assert k.user_name == 'keyworduser'
+        assert k.keyword_name == 'a keyword'
+
     
 class SaveTest2(UnitOfWorkTest):
 
index 7f7cebd05ed7956a2c53c44eb2acbaddd37fd77f..9958724e1383d376b25f3050c81cea0596d45512 100644 (file)
@@ -127,43 +127,26 @@ def data():
         dict(keyword_id=6, item_id=3)
     )
 
-    
-class User(object):
+class BaseObject(object):
+    def __repr__(self):
+        return "%s(%s)" % (self.__class__.__name__, ",".join("%s=%s" % (k, repr(v)) for k, v in self.__dict__.iteritems() if k[0] != '_'))
+        
+class User(BaseObject):
     def __init__(self):
         self.user_id = None
-    def __repr__(self):
-        return (
-"""
-objid: %d
-User ID: %s
-User Name: %s
-email address ?: %s
-Addresses: %s
-Orders: %s
-Open Orders %s
-Closed Orderss %s
-------------------
-""" % tuple([id(self), self.user_id, repr(self.user_name), repr(getattr(self, 'email_address', None))] + [repr(getattr(self, attr, None)) for attr in ('addresses', 'orders', 'open_orders', 'closed_orders')])
-)
 
-class Address(object):
-    def __repr__(self):
-        return "Address: " + repr(getattr(self, 'address_id', None)) + " " + repr(getattr(self, 'user_id', None)) + " " + repr(self.email_address)
+class Address(BaseObject):
+    pass
 
-class Order(object):
+class Order(BaseObject):
     def __init__(self):
         self.isopen=0
-    def __repr__(self):
-        return "Order: " + repr(self.description) + " " + repr(self.isopen) + " " + repr(getattr(self, 'items', None))
 
-class Item(object):
-    def __repr__(self):
-#        return repr(self.__dict__)
-        return "Item: " + repr(self.item_name) + " " + repr(getattr(self, 'keywords', None))
+class Item(BaseObject):
+    pass
     
-class Keyword(object):
-    def __repr__(self):
-        return "Keyword: %s/%s" % (repr(getattr(self, 'keyword_id', None)),repr(self.name))
+class Keyword(BaseObject):
+    pass
 
 user_result = [{'user_id' : 7}, {'user_id' : 8}, {'user_id' : 9}]