-from testbase import PersistTest
+from testbase import PersistTest, AssertMixin
import unittest, sys, os
from sqlalchemy.mapper import *
import sqlalchemy.objectstore as objectstore
execfile("test/tables.py")
db.echo = True
-class User(object):
- def __init__(self):
- self.user_id = None
- def __repr__(self):
- return (
-"""
-objid: %d
-User ID: %s
-User Name: %s
-email address ?: %s
-Addresses: %s
-Orders: %s
-Open Orders %s
-Closed Orderss %s
-------------------
-""" % tuple([id(self), self.user_id, repr(self.user_name), repr(getattr(self, 'email_address', None))] + [repr(getattr(self, attr, None)) for attr in ('addresses', 'orders', 'open_orders', 'closed_orders')])
-)
-
-class Address(object):
- def __repr__(self):
- return "Address: " + repr(getattr(self, 'address_id', None)) + " " + repr(getattr(self, 'user_id', None)) + " " + repr(self.email_address)
-
-class Order(object):
- def __repr__(self):
- return "Order: " + repr(self.description) + " " + repr(self.isopen) + " " + repr(getattr(self, 'items', None))
-
-class Item(object):
- def __repr__(self):
- return "Item: " + repr(self.item_name) + " " +repr(getattr(self, 'keywords', None))
-
-class Keyword(object):
- def __repr__(self):
- return "Keyword: %s/%s" % (repr(getattr(self, 'keyword_id', None)),repr(self.name))
-
-class AssertMixin(PersistTest):
- def assert_result(self, result, class_, *objects):
- print repr(result)
- self.assert_list(result, class_, objects)
- def assert_list(self, result, class_, list):
- self.assert_(len(result) == len(list), "result list is not the same size as test list")
- for i in range(0, len(list)):
- self.assert_row(class_, result[i], list[i])
- def assert_row(self, class_, rowobj, desc):
- self.assert_(rowobj.__class__ is class_, "item class is not " + repr(class_))
- for key, value in desc.iteritems():
- if isinstance(value, tuple):
- if isinstance(value[1], list):
- self.assert_list(getattr(rowobj, key), value[0], value[1])
- else:
- self.assert_row(value[0], getattr(rowobj, key), value[1])
- else:
- self.assert_(getattr(rowobj, key) == value, "attribute %s value %s does not match %s" % (key, getattr(rowobj, key), value))
class MapperTest(AssertMixin):
])},
)
-class SaveTest(AssertMixin):
-
- def testbasic(self):
- # save two users
-
-
- u = User()
- u.user_name = 'savetester'
-
- m = mapper(User, users)
- u2 = User()
- u2.user_name = 'savetester2'
-
- objectstore.uow().register_new(u)
-
- objectstore.uow().commit(u)
- objectstore.uow().commit()
-
- # assert the first one retreives the same from the identity map
- nu = m.get(u.user_id)
- self.assert_(u is nu)
-
- # clear out the identity map, so next get forces a SELECT
- objectstore.clear()
-
-
- # check it again, identity should be different but ids the same
- nu = m.get(u.user_id)
- self.assert_(u is not nu and u.user_id == nu.user_id and nu.user_name == 'savetester')
-
- # change first users name and save
- u.user_name = 'modifiedname'
- objectstore.uow().commit()
-
- # select both
- #objectstore.clear()
- userlist = m.select(users.c.user_id.in_(u.user_id, u2.user_id), order_by=[users.c.user_name])
- self.assert_(u.user_id == userlist[0].user_id and userlist[0].user_name == 'modifiedname')
- self.assert_(u2.user_id == userlist[1].user_id and userlist[1].user_name == 'savetester2')
-
- def testmultitable(self):
- """tests a save of an object where each instance spans two tables. also tests
- redefinition of the keynames for the column properties."""
- usersaddresses = sql.join(users, addresses, users.c.user_id == addresses.c.user_id)
- m = mapper(User, usersaddresses, table = users,
- properties = dict(
- email = ColumnProperty(addresses.c.email_address),
- foo_id = ColumnProperty(users.c.user_id, addresses.c.user_id)
- )
- )
-
- u = User()
- u.user_name = 'multitester'
- u.email = 'multi@test.org'
-
-
- objectstore.uow().commit()
-
- print repr(u.__dict__)
- usertable = engine.ResultProxy(users.select(users.c.user_id.in_(u.foo_id)).execute()).fetchall()
- print repr(usertable[0].row)
- self.assert_(usertable[0].row == (u.foo_id, 'multitester'))
- addresstable = engine.ResultProxy(addresses.select(addresses.c.address_id.in_(u.address_id)).execute()).fetchall()
- print repr(addresstable[0].row)
- self.assert_(addresstable[0].row == (u.address_id, u.foo_id, 'multi@test.org'))
-
- u.email = 'lala@hey.com'
- u.user_name = 'imnew'
- objectstore.uow().commit()
-
- usertable = engine.ResultProxy(users.select(users.c.user_id.in_(u.foo_id)).execute()).fetchall()
- self.assert_(usertable[0].row == (u.foo_id, 'imnew'))
- addresstable = engine.ResultProxy(addresses.select(addresses.c.address_id.in_(u.address_id)).execute()).fetchall()
- self.assert_(addresstable[0].row == (u.address_id, u.foo_id, 'lala@hey.com'))
-
- u = m.select(users.c.user_id==u.foo_id)[0]
- print repr(u.__dict__)
-
- def testonetoone(self):
- m = mapper(User, users, properties = dict(
- address = relation(Address, addresses, lazy = True, uselist = False)
- ))
- u = User()
- u.user_name = 'one2onetester'
- u.address = Address()
- u.address.email_address = 'myonlyaddress@foo.com'
- objectstore.uow().commit()
- u.user_name = 'imnew'
- objectstore.uow().commit()
- u.address.email_address = 'imnew@foo.com'
- objectstore.uow().commit()
-
- def testbackwardsonetoone(self):
- # test 'backwards'
-# m = mapper(Address, addresses, properties = dict(
-# user = relation(User, users, foreignkey = addresses.c.user_id, primaryjoin = users.c.user_id == addresses.c.user_id, lazy = True, uselist = False)
-# ))
- m = mapper(Address, addresses, properties = dict(
- user = relation(User, users, lazy = True, uselist = False)
- ))
- data = [
- {'user_name' : 'thesub' , 'email_address' : 'bar@foo.com'},
- {'user_name' : 'assdkfj' , 'email_address' : 'thesdf@asdf.com'},
- {'user_name' : 'n4knd' , 'email_address' : 'asf3@bar.org'},
- {'user_name' : 'v88f4' , 'email_address' : 'adsd5@llala.net'},
- {'user_name' : 'asdf8d' , 'email_address' : 'theater@foo.com'}
- ]
- objects = []
- for elem in data:
- a = Address()
- a.email_address = elem['email_address']
- a.user = User()
- a.user.user_name = elem['user_name']
- objects.append(a)
-
- objectstore.uow().commit()
-
- objects[2].email_address = 'imnew@foo.bar'
- objects[3].user = User()
- objects[3].user.user_name = 'imnewlyadded'
-
- objectstore.uow().commit()
- return
- l = sql.select([users, addresses], sql.and_(users.c.user_id==addresses.c.address_id, addresses.c.address_id==a.address_id)).execute()
- r = engine.ResultProxy(l)
- print repr(r.fetchone().row)
-
- def testonetomany(self):
- """test basic save of one to many."""
- m = mapper(User, users, properties = dict(
- addresses = relation(Address, addresses, lazy = True)
- ))
- u = User()
- u.user_name = 'one2manytester'
- u.addresses = []
- a = Address()
- a.email_address = 'one2many@test.org'
- u.addresses.append(a)
- a2 = Address()
- a2.email_address = 'lala@test.org'
- u.addresses.append(a2)
-
- objectstore.uow().commit()
-
- usertable = engine.ResultProxy(users.select(users.c.user_id.in_(u.user_id)).execute()).fetchall()
- self.assert_(usertable[0].row == (u.user_id, 'one2manytester'))
- addresstable = engine.ResultProxy(addresses.select(addresses.c.address_id.in_(a.address_id, a2.address_id), order_by=[addresses.c.email_address]).execute()).fetchall()
- self.assert_(addresstable[0].row == (a2.address_id, u.user_id, 'lala@test.org'))
- self.assert_(addresstable[1].row == (a.address_id, u.user_id, 'one2many@test.org'))
-
- userid = u.user_id
- addressid = a2.address_id
-
- a2.email_address = 'somethingnew@foo.com'
-
- objectstore.uow().commit()
-
- addresstable = engine.ResultProxy(addresses.select(addresses.c.address_id == addressid).execute()).fetchall()
- self.assert_(addresstable[0].row == (addressid, userid, 'somethingnew@foo.com'))
- self.assert_(u.user_id == userid and a2.address_id == addressid)
-
- def _testalias(self):
- """tests that an alias of a table can be used in a mapper.
- the mapper has to locate the original table and columns to keep it all straight."""
- ualias = Alias(users, 'ualias')
- m = mapper(User, ualias)
- u = User()
- u.user_name = 'testalias'
- m.save(u)
-
- u2 = m.select(ualias.c.user_id == u.user_id)[0]
- self.assert_(u2 is u)
-
- def _testremove(self):
- m = mapper(User, users, properties = dict(
- addresses = relation(Address, addresses, lazy = True)
- ))
- u = User()
- u.user_name = 'one2manytester'
- u.addresses = []
- a = Address()
- a.email_address = 'one2many@test.org'
- u.addresses.append(a)
- a2 = Address()
- a2.email_address = 'lala@test.org'
- u.addresses.append(a2)
- m.save(u)
- addresstable = engine.ResultProxy(addresses.select(addresses.c.address_id.in_(a.address_id, a2.address_id)).execute()).fetchall()
- print repr(addresstable[0].row)
- self.assert_(addresstable[0].row == (a.address_id, u.user_id, 'one2many@test.org'))
- self.assert_(addresstable[1].row == (a2.address_id, u.user_id, 'lala@test.org'))
- del u.addresses[1]
- m.save(u)
- addresstable = engine.ResultProxy(addresses.select(addresses.c.address_id.in_(a.address_id, a2.address_id)).execute()).fetchall()
- print repr(addresstable)
- self.assert_(addresstable[0].row == (a.address_id, u.user_id, 'one2many@test.org'))
- self.assert_(addresstable[1].row == (a2.address_id, None, 'lala@test.org'))
-
- def testmanytomany(self):
- items = orderitems
-
- m = mapper(Item, items, properties = dict(
- keywords = relation(Keyword, keywords, itemkeywords, lazy = False),
- ))
-
- keywordmapper = mapper(Keyword, keywords)
-
- data = [Item,
- {'item_name': 'mm_item1', 'keywords' : (Keyword,[{'name': 'big'},{'name': 'green'}, {'name': 'purple'},{'name': 'round'}])},
- {'item_name': 'mm_item2', 'keywords' : (Keyword,[{'name':'blue'}, {'name':'imnew'},{'name':'round'}, {'name':'small'}])},
- {'item_name': 'mm_item3', 'keywords' : (Keyword,[])},
- {'item_name': 'mm_item4', 'keywords' : (Keyword,[{'name':'big'}, {'name':'blue'},])},
- {'item_name': 'mm_item5', 'keywords' : (Keyword,[{'name':'big'},{'name':'exacting'},{'name':'green'}])},
- {'item_name': 'mm_item6', 'keywords' : (Keyword,[{'name':'red'},{'name':'round'},{'name':'small'}])},
- ]
- objects = []
- for elem in data[1:]:
- item = Item()
- objects.append(item)
- item.item_name = elem['item_name']
- item.keywords = []
- if len(elem['keywords'][1]):
- klist = keywordmapper.select(keywords.c.name.in_(*[e['name'] for e in elem['keywords'][1]]))
- else:
- klist = []
- khash = {}
- for k in klist:
- khash[k.name] = k
- for kname in [e['name'] for e in elem['keywords'][1]]:
- try:
- k = khash[kname]
- except KeyError:
- k = Keyword()
- k.name = kname
- item.keywords.append(k)
-
- objectstore.uow().commit()
- print "OK!"
- l = m.select(items.c.item_name.in_(*[e['item_name'] for e in data[1:]]), order_by=[items.c.item_name, keywords.c.name])
- self.assert_result(l, *data)
- print "OK!"
-
- objects[4].item_name = 'item4updated'
- k = Keyword()
- k.name = 'yellow'
- objects[5].keywords.append(k)
-
- objectstore.uow().commit()
- print "OK!"
- return
- objects[2].keywords.append(k)
- print "added: " + repr(objects[2].keywords.added_items())
- objectstore.uow().commit()
-
- def testassociation(self):
- class IKAssociation(object):
- def __repr__(self):
- return "\nIKAssociation " + repr(self.item_id) + " " + repr(self.keyword)
-
- items = orderitems
-
- keywordmapper = mapper(Keyword, keywords)
-
- m = mapper(Item, items, properties = dict(
- keywords = relation(IKAssociation, itemkeywords, lazy = False, properties = dict(
- keyword = relation(Keyword, keywords, lazy = False, uselist = False)
- ), primary_keys = [itemkeywords.c.item_id, itemkeywords.c.keyword_id])
- ))
-
- data = [Item,
- {'item_name': 'a_item1', 'keywords' : (IKAssociation,
- [
- {'keyword' : (Keyword, {'name': 'big'})},
- {'keyword' : (Keyword, {'name': 'green'})},
- {'keyword' : (Keyword, {'name': 'purple'})},
- {'keyword' : (Keyword, {'name': 'round'})}
- ]
- )
- },
- {'item_name': 'a_item2', 'keywords' : (IKAssociation,
- [
- {'keyword' : (Keyword, {'name': 'huge'})},
- {'keyword' : (Keyword, {'name': 'violet'})},
- {'keyword' : (Keyword, {'name': 'yellow'})}
- ]
- )
- },
- {'item_name': 'a_item3', 'keywords' : (IKAssociation,
- [
- {'keyword' : (Keyword, {'name': 'big'})},
- {'keyword' : (Keyword, {'name': 'blue'})},
- ]
- )
- }
- ]
- for elem in data[1:]:
- item = Item()
- item.item_name = elem['item_name']
- item.keywords = []
- for kname in [e['keyword'][1]['name'] for e in elem['keywords'][1]]:
- try:
- k = keywordmapper.select(keywords.c.name == kname)[0]
- except IndexError:
- k = Keyword()
- k.name= kname
- ik = IKAssociation()
- ik.keyword = k
- item.keywords.append(ik)
-
- objectstore.uow().commit()
-
- l = m.select(items.c.item_name.in_(*[e['item_name'] for e in data[1:]]), order_by=[items.c.item_name, keywords.c.name])
- self.assert_result(l, *data)
if __name__ == "__main__":
unittest.main()