]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
(no commit message)
authorMike Bayer <mike_mp@zzzcomputing.com>
Sat, 17 Sep 2005 03:47:19 +0000 (03:47 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sat, 17 Sep 2005 03:47:19 +0000 (03:47 +0000)
test/objectstore.py [new file with mode: 0644]

diff --git a/test/objectstore.py b/test/objectstore.py
new file mode 100644 (file)
index 0000000..91e28b2
--- /dev/null
@@ -0,0 +1,325 @@
+from testbase import PersistTest, AssertMixin
+import unittest, sys, os
+from sqlalchemy.mapper import *
+import sqlalchemy.objectstore as objectstore
+
+#ECHO = True
+ECHO = False
+execfile("test/tables.py")
+db.echo = True
+
+
+class SaveTest(AssertMixin):
+
+    def testbasic(self):
+        # save two users
+        u = User()
+        u.user_name = 'savetester'
+
+        m = mapper(User, users)
+        u2 = User()
+        u2.user_name = 'savetester2'
+
+        objectstore.uow().register_new(u)
+        
+        objectstore.uow().commit(u)
+        objectstore.uow().commit()
+
+        # assert the first one retreives the same from the identity map
+        nu = m.get(u.user_id)
+        self.assert_(u is nu)
+        
+        # clear out the identity map, so next get forces a SELECT
+        objectstore.clear()
+
+
+        # check it again, identity should be different but ids the same
+        nu = m.get(u.user_id)
+        self.assert_(u is not nu and u.user_id == nu.user_id and nu.user_name == 'savetester')
+
+        # change first users name and save
+        u.user_name = 'modifiedname'
+        objectstore.uow().commit()
+
+        # select both
+        #objectstore.clear()
+        userlist = m.select(users.c.user_id.in_(u.user_id, u2.user_id), order_by=[users.c.user_name])
+        self.assert_(u.user_id == userlist[0].user_id and userlist[0].user_name == 'modifiedname')
+        self.assert_(u2.user_id == userlist[1].user_id and userlist[1].user_name == 'savetester2')
+
+    def testmultitable(self):
+        """tests a save of an object where each instance spans two tables. also tests
+        redefinition of the keynames for the column properties."""
+        usersaddresses = sql.join(users, addresses, users.c.user_id == addresses.c.user_id)
+        m = mapper(User, usersaddresses, table = users,  
+            properties = dict(
+                email = ColumnProperty(addresses.c.email_address), 
+                foo_id = ColumnProperty(users.c.user_id, addresses.c.user_id)
+                )
+            )
+            
+        u = User()
+        u.user_name = 'multitester'
+        u.email = 'multi@test.org'
+
+
+        objectstore.uow().commit()
+
+        print repr(u.__dict__)
+        usertable = engine.ResultProxy(users.select(users.c.user_id.in_(u.foo_id)).execute()).fetchall()
+        print repr(usertable[0].row)
+        self.assert_(usertable[0].row == (u.foo_id, 'multitester'))
+        addresstable = engine.ResultProxy(addresses.select(addresses.c.address_id.in_(u.address_id)).execute()).fetchall()
+        print repr(addresstable[0].row)
+        self.assert_(addresstable[0].row == (u.address_id, u.foo_id, 'multi@test.org'))
+
+        u.email = 'lala@hey.com'
+        u.user_name = 'imnew'
+        objectstore.uow().commit()
+
+        usertable = engine.ResultProxy(users.select(users.c.user_id.in_(u.foo_id)).execute()).fetchall()
+        self.assert_(usertable[0].row == (u.foo_id, 'imnew'))
+        addresstable = engine.ResultProxy(addresses.select(addresses.c.address_id.in_(u.address_id)).execute()).fetchall()
+        self.assert_(addresstable[0].row == (u.address_id, u.foo_id, 'lala@hey.com'))
+
+        u = m.select(users.c.user_id==u.foo_id)[0]
+        print repr(u.__dict__)
+
+    def testonetoone(self):
+        m = mapper(User, users, properties = dict(
+            address = relation(Address, addresses, lazy = True, uselist = False)
+        ))
+        u = User()
+        u.user_name = 'one2onetester'
+        u.address = Address()
+        u.address.email_address = 'myonlyaddress@foo.com'
+        objectstore.uow().commit()
+        u.user_name = 'imnew'
+        objectstore.uow().commit()
+        u.address.email_address = 'imnew@foo.com'
+        objectstore.uow().commit()
+
+    def testbackwardsonetoone(self):
+        # test 'backwards'
+#        m = mapper(Address, addresses, properties = dict(
+#            user = relation(User, users, foreignkey = addresses.c.user_id, primaryjoin = users.c.user_id == addresses.c.user_id, lazy = True, uselist = False)
+#        ))
+        m = mapper(Address, addresses, properties = dict(
+            user = relation(User, users, lazy = True, uselist = False)
+        ))
+        data = [
+            {'user_name' : 'thesub' , 'email_address' : 'bar@foo.com'},
+            {'user_name' : 'assdkfj' , 'email_address' : 'thesdf@asdf.com'},
+            {'user_name' : 'n4knd' , 'email_address' : 'asf3@bar.org'},
+            {'user_name' : 'v88f4' , 'email_address' : 'adsd5@llala.net'},
+            {'user_name' : 'asdf8d' , 'email_address' : 'theater@foo.com'}
+        ]
+        objects = []
+        for elem in data:
+            a = Address()
+            a.email_address = elem['email_address']
+            a.user = User()
+            a.user.user_name = elem['user_name']
+            objects.append(a)
+            
+        objectstore.uow().commit()
+        
+        objects[2].email_address = 'imnew@foo.bar'
+        objects[3].user = User()
+        objects[3].user.user_name = 'imnewlyadded'
+        
+        objectstore.uow().commit()
+        return
+        l = sql.select([users, addresses], sql.and_(users.c.user_id==addresses.c.address_id, addresses.c.address_id==a.address_id)).execute()
+        r = engine.ResultProxy(l)
+        print repr(r.fetchone().row)
+        
+    def testonetomany(self):
+        """test basic save of one to many."""
+        m = mapper(User, users, properties = dict(
+            addresses = relation(Address, addresses, lazy = True)
+        ))
+        u = User()
+        u.user_name = 'one2manytester'
+        u.addresses = []
+        a = Address()
+        a.email_address = 'one2many@test.org'
+        u.addresses.append(a)
+        a2 = Address()
+        a2.email_address = 'lala@test.org'
+        u.addresses.append(a2)
+
+        objectstore.uow().commit()
+
+        usertable = engine.ResultProxy(users.select(users.c.user_id.in_(u.user_id)).execute()).fetchall()
+        self.assert_(usertable[0].row == (u.user_id, 'one2manytester'))
+        addresstable = engine.ResultProxy(addresses.select(addresses.c.address_id.in_(a.address_id, a2.address_id), order_by=[addresses.c.email_address]).execute()).fetchall()
+        self.assert_(addresstable[0].row == (a2.address_id, u.user_id, 'lala@test.org'))
+        self.assert_(addresstable[1].row == (a.address_id, u.user_id, 'one2many@test.org'))
+
+        userid = u.user_id
+        addressid = a2.address_id
+        
+        a2.email_address = 'somethingnew@foo.com'
+
+        objectstore.uow().commit()
+        
+        addresstable = engine.ResultProxy(addresses.select(addresses.c.address_id == addressid).execute()).fetchall()
+        self.assert_(addresstable[0].row == (addressid, userid, 'somethingnew@foo.com'))
+        self.assert_(u.user_id == userid and a2.address_id == addressid)
+
+    def _testalias(self):
+        """tests that an alias of a table can be used in a mapper. 
+        the mapper has to locate the original table and columns to keep it all straight."""
+        ualias = Alias(users, 'ualias')
+        m = mapper(User, ualias)
+        u = User()
+        u.user_name = 'testalias'
+        m.save(u)
+        
+        u2 = m.select(ualias.c.user_id == u.user_id)[0]
+        self.assert_(u2 is u)
+
+    def _testremove(self):
+        m = mapper(User, users, properties = dict(
+            addresses = relation(Address, addresses, lazy = True)
+        ))
+        u = User()
+        u.user_name = 'one2manytester'
+        u.addresses = []
+        a = Address()
+        a.email_address = 'one2many@test.org'
+        u.addresses.append(a)
+        a2 = Address()
+        a2.email_address = 'lala@test.org'
+        u.addresses.append(a2)
+        m.save(u)
+        addresstable = engine.ResultProxy(addresses.select(addresses.c.address_id.in_(a.address_id, a2.address_id)).execute()).fetchall()
+        print repr(addresstable[0].row)
+        self.assert_(addresstable[0].row == (a.address_id, u.user_id, 'one2many@test.org'))
+        self.assert_(addresstable[1].row == (a2.address_id, u.user_id, 'lala@test.org'))
+        del u.addresses[1]
+        m.save(u)
+        addresstable = engine.ResultProxy(addresses.select(addresses.c.address_id.in_(a.address_id, a2.address_id)).execute()).fetchall()
+        print repr(addresstable)
+        self.assert_(addresstable[0].row == (a.address_id, u.user_id, 'one2many@test.org'))
+        self.assert_(addresstable[1].row == (a2.address_id, None, 'lala@test.org'))
+
+    def testmanytomany(self):
+        items = orderitems
+
+        m = mapper(Item, items, properties = dict(
+                keywords = relation(Keyword, keywords, itemkeywords, lazy = False),
+            ))
+
+        keywordmapper = mapper(Keyword, keywords)
+
+        data = [Item,
+            {'item_name': 'mm_item1', 'keywords' : (Keyword,[{'name': 'big'},{'name': 'green'}, {'name': 'purple'},{'name': 'round'}])},
+            {'item_name': 'mm_item2', 'keywords' : (Keyword,[{'name':'blue'}, {'name':'imnew'},{'name':'round'}, {'name':'small'}])},
+            {'item_name': 'mm_item3', 'keywords' : (Keyword,[])},
+            {'item_name': 'mm_item4', 'keywords' : (Keyword,[{'name':'big'}, {'name':'blue'},])},
+            {'item_name': 'mm_item5', 'keywords' : (Keyword,[{'name':'big'},{'name':'exacting'},{'name':'green'}])},
+            {'item_name': 'mm_item6', 'keywords' : (Keyword,[{'name':'red'},{'name':'round'},{'name':'small'}])},
+        ]
+        objects = []
+        for elem in data[1:]:
+            item = Item()
+            objects.append(item)
+            item.item_name = elem['item_name']
+            item.keywords = []
+            if len(elem['keywords'][1]):
+                klist = keywordmapper.select(keywords.c.name.in_(*[e['name'] for e in elem['keywords'][1]]))
+            else:
+                klist = []
+            khash = {}
+            for k in klist:
+                khash[k.name] = k
+            for kname in [e['name'] for e in elem['keywords'][1]]:
+                try:
+                    k = khash[kname]
+                except KeyError:
+                    k = Keyword()
+                    k.name = kname
+                item.keywords.append(k)
+
+        objectstore.uow().commit()
+        print "OK!"
+        l = m.select(items.c.item_name.in_(*[e['item_name'] for e in data[1:]]), order_by=[items.c.item_name, keywords.c.name])
+        self.assert_result(l, *data)
+        print "OK!"
+
+        objects[4].item_name = 'item4updated'
+        k = Keyword()
+        k.name = 'yellow'
+        objects[5].keywords.append(k)
+        
+        objectstore.uow().commit()
+        print "OK!"
+        return
+        objects[2].keywords.append(k)
+        print "added: " + repr(objects[2].keywords.added_items())
+        objectstore.uow().commit()
+        
+    def testassociation(self):
+        class IKAssociation(object):
+            def __repr__(self):
+                return "\nIKAssociation " + repr(self.item_id) + " " + repr(self.keyword)
+
+        items = orderitems
+
+        keywordmapper = mapper(Keyword, keywords)
+
+        m = mapper(Item, items, properties = dict(
+                keywords = relation(IKAssociation, itemkeywords, lazy = False, properties = dict(
+                    keyword = relation(Keyword, keywords, lazy = False, uselist = False)
+                ), primary_keys = [itemkeywords.c.item_id, itemkeywords.c.keyword_id])
+            ))
+
+        data = [Item,
+            {'item_name': 'a_item1', 'keywords' : (IKAssociation, 
+                                                    [
+                                                        {'keyword' : (Keyword, {'name': 'big'})},
+                                                        {'keyword' : (Keyword, {'name': 'green'})}, 
+                                                        {'keyword' : (Keyword, {'name': 'purple'})},
+                                                        {'keyword' : (Keyword, {'name': 'round'})}
+                                                    ]
+                                                 ) 
+            },
+            {'item_name': 'a_item2', 'keywords' : (IKAssociation, 
+                                                    [
+                                                        {'keyword' : (Keyword, {'name': 'huge'})},
+                                                        {'keyword' : (Keyword, {'name': 'violet'})}, 
+                                                        {'keyword' : (Keyword, {'name': 'yellow'})}
+                                                    ]
+                                                 ) 
+            },
+            {'item_name': 'a_item3', 'keywords' : (IKAssociation, 
+                                                    [
+                                                        {'keyword' : (Keyword, {'name': 'big'})},
+                                                        {'keyword' : (Keyword, {'name': 'blue'})}, 
+                                                    ]
+                                                 ) 
+            }
+        ]
+        for elem in data[1:]:
+            item = Item()
+            item.item_name = elem['item_name']
+            item.keywords = []
+            for kname in [e['keyword'][1]['name'] for e in elem['keywords'][1]]:
+                try:
+                    k = keywordmapper.select(keywords.c.name == kname)[0]
+                except IndexError:
+                    k = Keyword()
+                    k.name= kname
+                ik = IKAssociation()
+                ik.keyword = k
+                item.keywords.append(ik)
+
+        objectstore.uow().commit()
+
+        l = m.select(items.c.item_name.in_(*[e['item_name'] for e in data[1:]]), order_by=[items.c.item_name, keywords.c.name])
+        self.assert_result(l, *data)
+
+if __name__ == "__main__":
+    unittest.main()