From 3de35606c6dac008c9d00ccb01a7fe51fda3fead Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Sat, 17 Sep 2005 07:03:04 +0000 Subject: [PATCH] --- lib/sqlalchemy/mapper.py | 12 +- test/mapper.py | 367 +-------------------------------------- 2 files changed, 7 insertions(+), 372 deletions(-) diff --git a/lib/sqlalchemy/mapper.py b/lib/sqlalchemy/mapper.py index fd6d50f211..8a8de42101 100644 --- a/lib/sqlalchemy/mapper.py +++ b/lib/sqlalchemy/mapper.py @@ -516,13 +516,13 @@ class PropertyLoader(MapperProperty): else: raise " no foreign key ?" - def process_dependencies(self, deplist, uow): + def process_dependencies(self, deplist, uowcommit): def getlist(obj): if self.uselist: - return uow.register_list_attribute(obj, self.key) + return uowcommit.uow.register_list_attribute(obj, self.key) else: - return uow.register_attribute(obj, self.key) + return uowcommit.uow.register_attribute(obj, self.key) clearkeys = False @@ -546,7 +546,7 @@ class PropertyLoader(MapperProperty): self.primaryjoin.accept_visitor(setter) self.secondaryjoin.accept_visitor(setter) secondary_delete.append(associationrow) - uow.register_saved_list(childlist) + uowcommit.register_saved_list(childlist) if len(secondary_delete): statement = self.secondary.delete(sql.and_(*[c == sql.bindparam(c.key) for c in self.secondary.c])) statement.execute(*secondary_delete) @@ -559,7 +559,7 @@ class PropertyLoader(MapperProperty): for child in childlist.added_items(): associationrow = {} self.primaryjoin.accept_visitor(setter) - uow.register_saved_list(childlist) + uowcommit.register_saved_list(childlist) # TODO: deleted items elif self.foreignkey.table == self.parent.table: for child in deplist: @@ -567,7 +567,7 @@ class PropertyLoader(MapperProperty): for obj in childlist.added_items(): associationrow = {} self.primaryjoin.accept_visitor(setter) - uow.register_saved_list(childlist) + uowcommit.register_saved_list(childlist) # TODO: deleted items else: raise " no foreign key ?" diff --git a/test/mapper.py b/test/mapper.py index 3b5becd9d0..2d457542d2 100644 --- a/test/mapper.py +++ b/test/mapper.py @@ -1,4 +1,4 @@ -from testbase import PersistTest +from testbase import PersistTest, AssertMixin import unittest, sys, os from sqlalchemy.mapper import * import sqlalchemy.objectstore as objectstore @@ -8,58 +8,6 @@ ECHO = False execfile("test/tables.py") db.echo = True -class User(object): - def __init__(self): - self.user_id = None - def __repr__(self): - return ( -""" -objid: %d -User ID: %s -User Name: %s -email address ?: %s -Addresses: %s -Orders: %s -Open Orders %s -Closed Orderss %s ------------------- -""" % tuple([id(self), self.user_id, repr(self.user_name), repr(getattr(self, 'email_address', None))] + [repr(getattr(self, attr, None)) for attr in ('addresses', 'orders', 'open_orders', 'closed_orders')]) -) - -class Address(object): - def __repr__(self): - return "Address: " + repr(getattr(self, 'address_id', None)) + " " + repr(getattr(self, 'user_id', None)) + " " + repr(self.email_address) - -class Order(object): - def __repr__(self): - return "Order: " + repr(self.description) + " " + repr(self.isopen) + " " + repr(getattr(self, 'items', None)) - -class Item(object): - def __repr__(self): - return "Item: " + repr(self.item_name) + " " +repr(getattr(self, 'keywords', None)) - -class Keyword(object): - def __repr__(self): - return "Keyword: %s/%s" % (repr(getattr(self, 'keyword_id', None)),repr(self.name)) - -class AssertMixin(PersistTest): - def assert_result(self, result, class_, *objects): - print repr(result) - self.assert_list(result, class_, objects) - def assert_list(self, result, class_, list): - self.assert_(len(result) == len(list), "result list is not the same size as test list") - for i in range(0, len(list)): - self.assert_row(class_, result[i], list[i]) - def assert_row(self, class_, rowobj, desc): - self.assert_(rowobj.__class__ is class_, "item class is not " + repr(class_)) - for key, value in desc.iteritems(): - if isinstance(value, tuple): - if isinstance(value[1], list): - self.assert_list(getattr(rowobj, key), value[0], value[1]) - else: - self.assert_row(value[0], getattr(rowobj, key), value[1]) - else: - self.assert_(getattr(rowobj, key) == value, "attribute %s value %s does not match %s" % (key, getattr(rowobj, key), value)) class MapperTest(AssertMixin): @@ -361,319 +309,6 @@ class EagerTest(AssertMixin): ])}, ) -class SaveTest(AssertMixin): - - def testbasic(self): - # save two users - - - u = User() - u.user_name = 'savetester' - - m = mapper(User, users) - u2 = User() - u2.user_name = 'savetester2' - - objectstore.uow().register_new(u) - - objectstore.uow().commit(u) - objectstore.uow().commit() - - # assert the first one retreives the same from the identity map - nu = m.get(u.user_id) - self.assert_(u is nu) - - # clear out the identity map, so next get forces a SELECT - objectstore.clear() - - - # check it again, identity should be different but ids the same - nu = m.get(u.user_id) - self.assert_(u is not nu and u.user_id == nu.user_id and nu.user_name == 'savetester') - - # change first users name and save - u.user_name = 'modifiedname' - objectstore.uow().commit() - - # select both - #objectstore.clear() - userlist = m.select(users.c.user_id.in_(u.user_id, u2.user_id), order_by=[users.c.user_name]) - self.assert_(u.user_id == userlist[0].user_id and userlist[0].user_name == 'modifiedname') - self.assert_(u2.user_id == userlist[1].user_id and userlist[1].user_name == 'savetester2') - - def testmultitable(self): - """tests a save of an object where each instance spans two tables. also tests - redefinition of the keynames for the column properties.""" - usersaddresses = sql.join(users, addresses, users.c.user_id == addresses.c.user_id) - m = mapper(User, usersaddresses, table = users, - properties = dict( - email = ColumnProperty(addresses.c.email_address), - foo_id = ColumnProperty(users.c.user_id, addresses.c.user_id) - ) - ) - - u = User() - u.user_name = 'multitester' - u.email = 'multi@test.org' - - - objectstore.uow().commit() - - print repr(u.__dict__) - usertable = engine.ResultProxy(users.select(users.c.user_id.in_(u.foo_id)).execute()).fetchall() - print repr(usertable[0].row) - self.assert_(usertable[0].row == (u.foo_id, 'multitester')) - addresstable = engine.ResultProxy(addresses.select(addresses.c.address_id.in_(u.address_id)).execute()).fetchall() - print repr(addresstable[0].row) - self.assert_(addresstable[0].row == (u.address_id, u.foo_id, 'multi@test.org')) - - u.email = 'lala@hey.com' - u.user_name = 'imnew' - objectstore.uow().commit() - - usertable = engine.ResultProxy(users.select(users.c.user_id.in_(u.foo_id)).execute()).fetchall() - self.assert_(usertable[0].row == (u.foo_id, 'imnew')) - addresstable = engine.ResultProxy(addresses.select(addresses.c.address_id.in_(u.address_id)).execute()).fetchall() - self.assert_(addresstable[0].row == (u.address_id, u.foo_id, 'lala@hey.com')) - - u = m.select(users.c.user_id==u.foo_id)[0] - print repr(u.__dict__) - - def testonetoone(self): - m = mapper(User, users, properties = dict( - address = relation(Address, addresses, lazy = True, uselist = False) - )) - u = User() - u.user_name = 'one2onetester' - u.address = Address() - u.address.email_address = 'myonlyaddress@foo.com' - objectstore.uow().commit() - u.user_name = 'imnew' - objectstore.uow().commit() - u.address.email_address = 'imnew@foo.com' - objectstore.uow().commit() - - def testbackwardsonetoone(self): - # test 'backwards' -# m = mapper(Address, addresses, properties = dict( -# user = relation(User, users, foreignkey = addresses.c.user_id, primaryjoin = users.c.user_id == addresses.c.user_id, lazy = True, uselist = False) -# )) - m = mapper(Address, addresses, properties = dict( - user = relation(User, users, lazy = True, uselist = False) - )) - data = [ - {'user_name' : 'thesub' , 'email_address' : 'bar@foo.com'}, - {'user_name' : 'assdkfj' , 'email_address' : 'thesdf@asdf.com'}, - {'user_name' : 'n4knd' , 'email_address' : 'asf3@bar.org'}, - {'user_name' : 'v88f4' , 'email_address' : 'adsd5@llala.net'}, - {'user_name' : 'asdf8d' , 'email_address' : 'theater@foo.com'} - ] - objects = [] - for elem in data: - a = Address() - a.email_address = elem['email_address'] - a.user = User() - a.user.user_name = elem['user_name'] - objects.append(a) - - objectstore.uow().commit() - - objects[2].email_address = 'imnew@foo.bar' - objects[3].user = User() - objects[3].user.user_name = 'imnewlyadded' - - objectstore.uow().commit() - return - l = sql.select([users, addresses], sql.and_(users.c.user_id==addresses.c.address_id, addresses.c.address_id==a.address_id)).execute() - r = engine.ResultProxy(l) - print repr(r.fetchone().row) - - def testonetomany(self): - """test basic save of one to many.""" - m = mapper(User, users, properties = dict( - addresses = relation(Address, addresses, lazy = True) - )) - u = User() - u.user_name = 'one2manytester' - u.addresses = [] - a = Address() - a.email_address = 'one2many@test.org' - u.addresses.append(a) - a2 = Address() - a2.email_address = 'lala@test.org' - u.addresses.append(a2) - - objectstore.uow().commit() - - usertable = engine.ResultProxy(users.select(users.c.user_id.in_(u.user_id)).execute()).fetchall() - self.assert_(usertable[0].row == (u.user_id, 'one2manytester')) - addresstable = engine.ResultProxy(addresses.select(addresses.c.address_id.in_(a.address_id, a2.address_id), order_by=[addresses.c.email_address]).execute()).fetchall() - self.assert_(addresstable[0].row == (a2.address_id, u.user_id, 'lala@test.org')) - self.assert_(addresstable[1].row == (a.address_id, u.user_id, 'one2many@test.org')) - - userid = u.user_id - addressid = a2.address_id - - a2.email_address = 'somethingnew@foo.com' - - objectstore.uow().commit() - - addresstable = engine.ResultProxy(addresses.select(addresses.c.address_id == addressid).execute()).fetchall() - self.assert_(addresstable[0].row == (addressid, userid, 'somethingnew@foo.com')) - self.assert_(u.user_id == userid and a2.address_id == addressid) - - def _testalias(self): - """tests that an alias of a table can be used in a mapper. - the mapper has to locate the original table and columns to keep it all straight.""" - ualias = Alias(users, 'ualias') - m = mapper(User, ualias) - u = User() - u.user_name = 'testalias' - m.save(u) - - u2 = m.select(ualias.c.user_id == u.user_id)[0] - self.assert_(u2 is u) - - def _testremove(self): - m = mapper(User, users, properties = dict( - addresses = relation(Address, addresses, lazy = True) - )) - u = User() - u.user_name = 'one2manytester' - u.addresses = [] - a = Address() - a.email_address = 'one2many@test.org' - u.addresses.append(a) - a2 = Address() - a2.email_address = 'lala@test.org' - u.addresses.append(a2) - m.save(u) - addresstable = engine.ResultProxy(addresses.select(addresses.c.address_id.in_(a.address_id, a2.address_id)).execute()).fetchall() - print repr(addresstable[0].row) - self.assert_(addresstable[0].row == (a.address_id, u.user_id, 'one2many@test.org')) - self.assert_(addresstable[1].row == (a2.address_id, u.user_id, 'lala@test.org')) - del u.addresses[1] - m.save(u) - addresstable = engine.ResultProxy(addresses.select(addresses.c.address_id.in_(a.address_id, a2.address_id)).execute()).fetchall() - print repr(addresstable) - self.assert_(addresstable[0].row == (a.address_id, u.user_id, 'one2many@test.org')) - self.assert_(addresstable[1].row == (a2.address_id, None, 'lala@test.org')) - - def testmanytomany(self): - items = orderitems - - m = mapper(Item, items, properties = dict( - keywords = relation(Keyword, keywords, itemkeywords, lazy = False), - )) - - keywordmapper = mapper(Keyword, keywords) - - data = [Item, - {'item_name': 'mm_item1', 'keywords' : (Keyword,[{'name': 'big'},{'name': 'green'}, {'name': 'purple'},{'name': 'round'}])}, - {'item_name': 'mm_item2', 'keywords' : (Keyword,[{'name':'blue'}, {'name':'imnew'},{'name':'round'}, {'name':'small'}])}, - {'item_name': 'mm_item3', 'keywords' : (Keyword,[])}, - {'item_name': 'mm_item4', 'keywords' : (Keyword,[{'name':'big'}, {'name':'blue'},])}, - {'item_name': 'mm_item5', 'keywords' : (Keyword,[{'name':'big'},{'name':'exacting'},{'name':'green'}])}, - {'item_name': 'mm_item6', 'keywords' : (Keyword,[{'name':'red'},{'name':'round'},{'name':'small'}])}, - ] - objects = [] - for elem in data[1:]: - item = Item() - objects.append(item) - item.item_name = elem['item_name'] - item.keywords = [] - if len(elem['keywords'][1]): - klist = keywordmapper.select(keywords.c.name.in_(*[e['name'] for e in elem['keywords'][1]])) - else: - klist = [] - khash = {} - for k in klist: - khash[k.name] = k - for kname in [e['name'] for e in elem['keywords'][1]]: - try: - k = khash[kname] - except KeyError: - k = Keyword() - k.name = kname - item.keywords.append(k) - - objectstore.uow().commit() - print "OK!" - l = m.select(items.c.item_name.in_(*[e['item_name'] for e in data[1:]]), order_by=[items.c.item_name, keywords.c.name]) - self.assert_result(l, *data) - print "OK!" - - objects[4].item_name = 'item4updated' - k = Keyword() - k.name = 'yellow' - objects[5].keywords.append(k) - - objectstore.uow().commit() - print "OK!" - return - objects[2].keywords.append(k) - print "added: " + repr(objects[2].keywords.added_items()) - objectstore.uow().commit() - - def testassociation(self): - class IKAssociation(object): - def __repr__(self): - return "\nIKAssociation " + repr(self.item_id) + " " + repr(self.keyword) - - items = orderitems - - keywordmapper = mapper(Keyword, keywords) - - m = mapper(Item, items, properties = dict( - keywords = relation(IKAssociation, itemkeywords, lazy = False, properties = dict( - keyword = relation(Keyword, keywords, lazy = False, uselist = False) - ), primary_keys = [itemkeywords.c.item_id, itemkeywords.c.keyword_id]) - )) - - data = [Item, - {'item_name': 'a_item1', 'keywords' : (IKAssociation, - [ - {'keyword' : (Keyword, {'name': 'big'})}, - {'keyword' : (Keyword, {'name': 'green'})}, - {'keyword' : (Keyword, {'name': 'purple'})}, - {'keyword' : (Keyword, {'name': 'round'})} - ] - ) - }, - {'item_name': 'a_item2', 'keywords' : (IKAssociation, - [ - {'keyword' : (Keyword, {'name': 'huge'})}, - {'keyword' : (Keyword, {'name': 'violet'})}, - {'keyword' : (Keyword, {'name': 'yellow'})} - ] - ) - }, - {'item_name': 'a_item3', 'keywords' : (IKAssociation, - [ - {'keyword' : (Keyword, {'name': 'big'})}, - {'keyword' : (Keyword, {'name': 'blue'})}, - ] - ) - } - ] - for elem in data[1:]: - item = Item() - item.item_name = elem['item_name'] - item.keywords = [] - for kname in [e['keyword'][1]['name'] for e in elem['keywords'][1]]: - try: - k = keywordmapper.select(keywords.c.name == kname)[0] - except IndexError: - k = Keyword() - k.name= kname - ik = IKAssociation() - ik.keyword = k - item.keywords.append(ik) - - objectstore.uow().commit() - - l = m.select(items.c.item_name.in_(*[e['item_name'] for e in data[1:]]), order_by=[items.c.item_name, keywords.c.name]) - self.assert_result(l, *data) if __name__ == "__main__": unittest.main() -- 2.47.2