From: Mike Bayer Date: Fri, 29 Jun 2007 21:40:59 +0000 (+0000) Subject: migrated most of mapper/EagerTest to updated eager_relations module X-Git-Tag: rel_0_4_6~157 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=239fe7c323f3a6d3d3133e4b22eb4e7d0689a219;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git migrated most of mapper/EagerTest to updated eager_relations module --- diff --git a/test/orm/alltests.py b/test/orm/alltests.py index 133d517661..5b3038fd94 100644 --- a/test/orm/alltests.py +++ b/test/orm/alltests.py @@ -5,9 +5,10 @@ import inheritance.alltests as inheritance def suite(): modules_to_test = ( - 'orm.attributes', - 'orm.query', - 'orm.lazy_relations', + 'orm.attributes', + 'orm.query', + 'orm.lazy_relations', + 'orm.eager_relations', 'orm.mapper', 'orm.generative', 'orm.lazytest1', diff --git a/test/orm/eager_relations.py b/test/orm/eager_relations.py new file mode 100644 index 0000000000..0649917238 --- /dev/null +++ b/test/orm/eager_relations.py @@ -0,0 +1,426 @@ +"""basic tests of eager loaded attributes""" + +from sqlalchemy import * +from sqlalchemy.orm import * +import testbase + +from fixtures import * +from query import QueryTest + +class EagerTest(QueryTest): + keep_mappers = False + + def setup_mappers(self): + pass + + def test_basic(self): + mapper(User, users, properties={ + 'addresses':relation(mapper(Address, addresses), lazy=False) + }) + sess = create_session() + q = sess.query(User) + + assert [User(id=7, addresses=[Address(id=1, email_address='jack@bean.com')])] == q.filter(users.c.id == 7).all() + + assert [ + User(id=7, addresses=[ + Address(id=1) + ]), + User(id=8, addresses=[ + Address(id=2, email_address='ed@wood.com'), + Address(id=3, email_address='ed@bettyboop.com'), + Address(id=4, email_address='ed@lala.com'), + ]), + User(id=9, addresses=[ + Address(id=5) + ]), + User(id=10, addresses=[]) + ] == q.all() + + def test_no_orphan(self): + """test that an eagerly loaded child object is not marked as an orphan""" + + mapper(User, users, properties={ + 'addresses':relation(Address, cascade="all,delete-orphan", lazy=False) + }) + mapper(Address, addresses) + + sess = create_session() + user = sess.query(User).get(7) + assert getattr(User, 'addresses').hasparent(user.addresses[0], optimistic=True) + assert not class_mapper(Address)._is_orphan(user.addresses[0]) + + def test_orderby(self): + mapper(User, users, properties = { + 'addresses':relation(mapper(Address, addresses), lazy=False, order_by=addresses.c.email_address), + }) + q = create_session().query(User) + assert [ + User(id=7, addresses=[ + Address(id=1) + ]), + User(id=8, addresses=[ + Address(id=3, email_address='ed@bettyboop.com'), + Address(id=4, email_address='ed@lala.com'), + Address(id=2, email_address='ed@wood.com') + ]), + User(id=9, addresses=[ + Address(id=5) + ]), + User(id=10, addresses=[]) + ] == q.all() + + def test_orderby_secondary(self): + """tests that a regular mapper select on a single table can order by a relation to a second table""" + + mapper(Address, addresses) + + mapper(User, users, properties = dict( + addresses = relation(Address, lazy=False), + )) + + q = create_session().query(User) + l = q.filter(users.c.id==addresses.c.user_id).order_by(addresses.c.email_address).all() + + assert [ + User(id=8, addresses=[ + Address(id=2, email_address='ed@wood.com'), + Address(id=3, email_address='ed@bettyboop.com'), + Address(id=4, email_address='ed@lala.com'), + ]), + User(id=9, addresses=[ + Address(id=5) + ]), + User(id=7, addresses=[ + Address(id=1) + ]), + ] == l + + def test_orderby_desc(self): + mapper(Address, addresses) + + mapper(User, users, properties = dict( + addresses = relation(Address, lazy=False, order_by=[desc(addresses.c.email_address)]), + )) + sess = create_session() + assert [ + User(id=7, addresses=[ + Address(id=1) + ]), + User(id=8, addresses=[ + Address(id=2, email_address='ed@wood.com'), + Address(id=4, email_address='ed@lala.com'), + Address(id=3, email_address='ed@bettyboop.com'), + ]), + User(id=9, addresses=[ + Address(id=5) + ]), + User(id=10, addresses=[]) + ] == sess.query(User).all() + + def test_many_to_many(self): + + mapper(Keyword, keywords) + mapper(Item, items, properties = dict( + keywords = relation(Keyword, secondary=item_keywords, lazy=False), + )) + + q = create_session().query(Item) + def go(): + assert [ + Item(id=1, keywords=[Keyword(name='red'), Keyword(name='big'), Keyword(name='round')]), + Item(id=2, keywords=[Keyword(name='red'), Keyword(name='small'), Keyword(name='square')]), + Item(id=3, keywords=[Keyword(name='green'), Keyword(name='big'), Keyword(name='round')]), + Item(id=4, keywords=[]), + Item(id=5, keywords=[]), + ] == q.all() + self.assert_sql_count(testbase.db, go, 1) + + def go(): + assert [ + Item(id=1, keywords=[Keyword(name='red'), Keyword(name='big'), Keyword(name='round')]), + Item(id=2, keywords=[Keyword(name='red'), Keyword(name='small'), Keyword(name='square')]), + ] == q.join('keywords').filter(keywords.c.name == 'red').all() + self.assert_sql_count(testbase.db, go, 1) + + + def test_eager_option(self): + mapper(Keyword, keywords) + mapper(Item, items, properties = dict( + keywords = relation(Keyword, secondary=item_keywords, lazy=True), + )) + + q = create_session().query(Item) + + def go(): + assert [ + Item(id=1, keywords=[Keyword(name='red'), Keyword(name='big'), Keyword(name='round')]), + Item(id=2, keywords=[Keyword(name='red'), Keyword(name='small'), Keyword(name='square')]), + ] == q.options(eagerload('keywords')).join('keywords').filter(keywords.c.name == 'red').all() + + self.assert_sql_count(testbase.db, go, 1) + + def test_cyclical(self): + """test that a circular eager relationship breaks the cycle with a lazy loader""" + + mapper(Address, addresses) + mapper(User, users, properties = dict( + addresses = relation(Address, lazy=False, backref=backref('user', lazy=False)) + )) + assert class_mapper(User).props['addresses'].lazy is False + assert class_mapper(Address).props['user'].lazy is False + + sess = create_session() + assert [ + User(id=7, addresses=[ + Address(id=1) + ]), + User(id=8, addresses=[ + Address(id=2, email_address='ed@wood.com'), + Address(id=3, email_address='ed@bettyboop.com'), + Address(id=4, email_address='ed@lala.com'), + ]), + User(id=9, addresses=[ + Address(id=5) + ]), + User(id=10, addresses=[]) + ] == sess.query(User).all() + + def test_double(self): + """tests lazy loading with two relations simulatneously, from the same table, using aliases. """ + openorders = alias(orders, 'openorders') + closedorders = alias(orders, 'closedorders') + + mapper(Address, addresses) + + mapper(User, users, properties = dict( + addresses = relation(Address, lazy=False), + open_orders = relation(mapper(Order, openorders, entity_name='open'), primaryjoin = and_(openorders.c.isopen == 1, users.c.id==openorders.c.user_id), lazy=False), + closed_orders = relation(mapper(Order, closedorders,entity_name='closed'), primaryjoin = and_(closedorders.c.isopen == 0, users.c.id==closedorders.c.user_id), lazy=False) + )) + q = create_session().query(User) + + def go(): + assert [ + User( + id=7, + addresses=[Address(id=1)], + open_orders = [Order(id=3)], + closed_orders = [Order(id=1), Order(id=5)] + ), + User( + id=8, + addresses=[Address(id=2), Address(id=3), Address(id=4)], + open_orders = [], + closed_orders = [] + ), + User( + id=9, + addresses=[Address(id=5)], + open_orders = [Order(id=4)], + closed_orders = [Order(id=2)] + ), + User(id=10) + + ] == q.all() + self.assert_sql_count(testbase.db, go, 1) + + def test_limit(self): + """test limit operations combined with lazy-load relationships.""" + + mapper(Item, items) + mapper(Order, orders, properties={ + 'items':relation(Item, secondary=order_items, lazy=False) + }) + mapper(User, users, properties={ + 'addresses':relation(mapper(Address, addresses), lazy=False), + 'orders':relation(Order, lazy=True) + }) + + sess = create_session() + q = sess.query(User) + + if testbase.db.engine.name == 'mssql': + l = q.limit(2).all() + assert self.user_all_result[:2] == l + else: + l = q.limit(2).offset(1).all() + print l + print self.user_all_result[1:3] + assert self.user_all_result[1:3] == l + + def test_distinct(self): + # this is an involved 3x union of the users table to get a lot of rows. + # then see if the "distinct" works its way out. you actually get the same + # result with or without the distinct, just via less or more rows. + u2 = users.alias('u2') + s = union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u') + + mapper(User, users, properties={ + 'addresses':relation(mapper(Address, addresses), lazy=False), + }) + + sess = create_session() + q = sess.query(User) + + def go(): + l = q.filter(s.c.u2_id==User.c.id).distinct().all() + assert [ + User(id=7, addresses=[ + Address(id=1) + ]), + User(id=8, addresses=[ + Address(id=2, email_address='ed@wood.com'), + Address(id=3, email_address='ed@bettyboop.com'), + Address(id=4, email_address='ed@lala.com'), + ]), + User(id=9, addresses=[ + Address(id=5) + ]), + User(id=10, addresses=[]) + ] == l + self.assert_sql_count(testbase.db, go, 1) + + def test_limit_2(self): + mapper(Keyword, keywords) + mapper(Item, items, properties = dict( + keywords = relation(Keyword, secondary=item_keywords, lazy=False, order_by=[keywords.c.id]), + )) + + sess = create_session() + q = sess.query(Item) + l = q.filter((Item.c.description=='item 2') | (Item.c.description=='item 5') | (Item.c.description=='item 3')).\ + order_by(Item.c.id).limit(2).all() + + assert [ + Item(id=2, keywords=[Keyword(name='red'), Keyword(name='small'), Keyword(name='square')]), + Item(id=3, keywords=[Keyword(name='green'), Keyword(name='big'), Keyword(name='round')]), + ] == l + + def test_limit_3(self): + """test that the ORDER BY is propigated from the inner select to the outer select, when using the + 'wrapped' select statement resulting from the combination of eager loading and limit/offset clauses.""" + + mapper(Item, items) + mapper(Order, orders, properties = dict( + items = relation(Item, secondary=order_items, lazy=False) + )) + + mapper(Address, addresses) + mapper(User, users, properties = dict( + addresses = relation(Address, lazy=False), + orders = relation(Order, lazy=False), + )) + sess = create_session() + + q = sess.query(User) + + if testbase.db.engine.name != 'mssql': + l = q.join('orders').order_by(desc(orders.c.user_id)).limit(2).offset(1) + assert [ + User(id=9, + orders=[Order(id=2), Order(id=4)], + addresses=[Address(id=5)] + ), + User(id=7, + orders=[Order(id=1), Order(id=3), Order(id=5)], + addresses=[Address(id=1)] + ) + ] == l.all() + + l = q.join('addresses').order_by(desc(addresses.c.email_address)).limit(1).offset(0) + assert [ + User(id=7, + orders=[Order(id=1), Order(id=3), Order(id=5)], + addresses=[Address(id=1)] + ) + ] == l.all() + + def test_one_to_many_scalar(self): + mapper(User, users, properties = dict( + address = relation(mapper(Address, addresses), lazy=False, uselist=False) + )) + q = create_session().query(User) + + def go(): + l = q.filter(users.c.id == 7).all() + assert [User(id=7, address=Address(id=1))] == l + self.assert_sql_count(testbase.db, go, 1) + + def test_many_to_one(self): + mapper(Address, addresses, properties = dict( + user = relation(mapper(User, users), lazy=False) + )) + sess = create_session() + q = sess.query(Address) + + def go(): + a = q.filter(addresses.c.id==1).one() + assert a.user is not None + u1 = sess.query(User).get(7) + assert a.user is u1 + self.assert_sql_count(testbase.db, go, 1) + + + def test_one_and_many(self): + """tests eager load for a parent object with a child object that + contains a many-to-many relationship to a third object.""" + + mapper(User, users, properties={ + 'orders':relation(Order, lazy=False) + }) + mapper(Item, items) + mapper(Order, orders, properties = dict( + items = relation(Item, secondary=order_items, lazy=False) + )) + + q = create_session().query(User) + + l = q.filter("users.id in (7, 8, 9)") + + def go(): + assert [ + User(id=7, orders=[ + Order(id=1, items=[Item(id=1), Item(id=2), Item(id=3)]), + Order(id=3, items=[Item(id=3), Item(id=4), Item(id=5)]), + Order(id=5, items=[Item(id=5)]), + ]), + User(id=8, orders=[]), + User(id=9, orders=[ + Order(id=2, items=[Item(id=1), Item(id=2), Item(id=3)]), + Order(id=4, items=[Item(id=1), Item(id=5)]), + ]), + ] == l.all() + self.assert_sql_count(testbase.db, go, 1) + + def test_double_with_aggregate(self): + + max_orders_by_user = select([func.max(orders.c.id).label('order_id')], group_by=[orders.c.user_id]).alias('max_orders_by_user') + + max_orders = orders.select(orders.c.id==max_orders_by_user.c.order_id).alias('max_orders') + + mapper(Order, orders) + mapper(User, users, properties={ + 'orders':relation(Order, backref='user', lazy=False), + 'max_order':relation(mapper(Order, max_orders, non_primary=True), lazy=False, uselist=False) + }) + q = create_session().query(User) + def go(): + assert [ + User(id=7, orders=[ + Order(id=1), + Order(id=3), + Order(id=5), + ], + max_order=Order(id=5) + ), + User(id=8, orders=[]), + User(id=9, orders=[Order(id=2),Order(id=4)], + max_order=Order(id=4) + ), + User(id=10), + ] == q.all() + self.assert_sql_count(testbase.db, go, 1) + +if __name__ == '__main__': + testbase.main() diff --git a/test/orm/fixtures.py b/test/orm/fixtures.py index 57991735e5..49652243a6 100644 --- a/test/orm/fixtures.py +++ b/test/orm/fixtures.py @@ -1,6 +1,7 @@ from sqlalchemy import * from testbase import Table, Column +_recursion_stack = util.Set() class Base(object): def __init__(self, **kwargs): for k in kwargs: @@ -15,26 +16,33 @@ class Base(object): only look at attributes that are present on the source object. """ - # use __dict__ to avoid instrumented properties - for attr in self.__dict__.keys(): - if attr[0] == '_': - continue - value = getattr(self, attr) - if hasattr(value, '__iter__') and not isinstance(value, basestring): - if len(value) == 0: + + if self in _recursion_stack: + return True + _recursion_stack.add(self) + try: + # use __dict__ to avoid instrumented properties + for attr in self.__dict__.keys(): + if attr[0] == '_': continue - for (us, them) in zip(value, getattr(other, attr)): - if us != them: - return False + value = getattr(self, attr) + if hasattr(value, '__iter__') and not isinstance(value, basestring): + if len(value) == 0: + continue + for (us, them) in zip(value, getattr(other, attr)): + if us != them: + return False + else: + continue else: - continue + if value is not None: + if value != getattr(other, attr): + return False else: - if value is not None: - if value != getattr(other, attr): - return False - else: - return True - + return True + finally: + _recursion_stack.remove(self) + class User(Base):pass class Order(Base):pass class Item(Base):pass diff --git a/test/orm/mapper.py b/test/orm/mapper.py index 370027a880..0dd40f6657 100644 --- a/test/orm/mapper.py +++ b/test/orm/mapper.py @@ -869,138 +869,6 @@ class MapperExtensionTest(MapperSuperTest): self.assert_result(l, User, *user_address_result) class EagerTest(MapperSuperTest): - def testbasic(self): - """tests a basic one-to-many eager load""" - m = mapper(Address, addresses) - - m = mapper(User, users, properties = dict( - addresses = relation(m, lazy = False), - )) - q = create_session().query(m) - l = q.select() - self.assert_result(l, User, *user_address_result) - - def testorderby(self): - m = mapper(Address, addresses) - - m = mapper(User, users, properties = dict( - addresses = relation(m, lazy = False, order_by=addresses.c.email_address), - )) - q = create_session().query(m) - l = q.select() - self.assert_result(l, User, - {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])}, - {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@bettyboop.com'}, {'email_address':'ed@lala.com'}, {'email_address':'ed@wood.com'}])}, - {'user_id' : 9, 'addresses' : (Address, [])} - ) - - - def testorderby_desc(self): - m = mapper(Address, addresses) - - m = mapper(User, users, properties = dict( - addresses = relation(m, lazy = False, order_by=[desc(addresses.c.email_address)]), - )) - q = create_session().query(m) - l = q.select() - - self.assert_result(l, User, - {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])}, - {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@wood.com'},{'email_address':'ed@lala.com'}, {'email_address':'ed@bettyboop.com'}, ])}, - {'user_id' : 9, 'addresses' : (Address, [])}, - ) - - def testlimit(self): - ordermapper = mapper(Order, orders, properties = dict( - items = relation(mapper(Item, orderitems), lazy = False) - )) - - m = mapper(User, users, properties = dict( - addresses = relation(mapper(Address, addresses), lazy = False), - orders = relation(ordermapper, primaryjoin = users.c.user_id==orders.c.user_id, lazy = False), - )) - sess = create_session() - q = sess.query(m) - - if db.engine.name == 'mssql': - l = q.select(limit=2) - self.assert_result(l, User, *user_all_result[:2]) - else: - l = q.select(limit=2, offset=1) - self.assert_result(l, User, *user_all_result[1:3]) - - # this is an involved 3x union of the users table to get a lot of rows. - # then see if the "distinct" works its way out. you actually get the same - # result with or without the distinct, just via less or more rows. - u2 = users.alias('u2') - s = union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u') - l = q.select(s.c.u2_user_id==User.c.user_id, distinct=True) - self.assert_result(l, User, *user_all_result) - sess.clear() - clear_mappers() - m = mapper(Item, orderitems, properties = dict( - keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = False, order_by=[keywords.c.keyword_id]), - )) - q = sess.query(m) - l = q.select((Item.c.item_name=='item 2') | (Item.c.item_name=='item 5') | (Item.c.item_name=='item 3'), order_by=[Item.c.item_id], limit=2) - self.assert_result(l, Item, *[item_keyword_result[1], item_keyword_result[2]]) - - def testmorelimit(self): - """test that the ORDER BY is propigated from the inner select to the outer select, when using the - 'wrapped' select statement resulting from the combination of eager loading and limit/offset clauses.""" - ordermapper = mapper(Order, orders, properties = dict( - items = relation(mapper(Item, orderitems), lazy = False) - )) - - m = mapper(User, users, properties = dict( - addresses = relation(mapper(Address, addresses), lazy = False), - orders = relation(ordermapper, primaryjoin = users.c.user_id==orders.c.user_id, lazy = False), - )) - sess = create_session() - q = sess.query(m) - - if db.engine.name != 'mssql': - l = q.select(q.join_to('orders'), order_by=desc(orders.c.user_id), limit=2, offset=1) - self.assert_result(l, User, *(user_all_result[2], user_all_result[0])) - - l = q.select(q.join_to('addresses'), order_by=desc(addresses.c.email_address), limit=1, offset=0) - self.assert_result(l, User, *(user_all_result[0],)) - - def testonetoone(self): - m = mapper(User, users, properties = dict( - address = relation(mapper(Address, addresses), lazy = False, uselist = False) - )) - q = create_session().query(m) - l = q.select(users.c.user_id == 7) - self.assert_result(l, User, - {'user_id' : 7, 'address' : (Address, {'address_id' : 1, 'email_address': 'jack@bean.com'})}, - ) - - def testbackwardsonetoone(self): - m = mapper(Address, addresses, properties = dict( - user = relation(mapper(User, users), lazy = False) - )).compile() - self.echo(repr(m.props['user'].uselist)) - q = create_session().query(m) - l = q.select(addresses.c.address_id == 1) - self.assert_result(l, Address, - {'address_id' : 1, 'email_address' : 'jack@bean.com', - 'user' : (User, {'user_id' : 7, 'user_name' : 'jack'}) - }, - ) - - def testorphanstate(self): - """test that an eagerly loaded child object is not marked as an orphan""" - m = mapper(User, users, properties={ - 'addresses':relation(Address, cascade="all,delete-orphan", lazy=False) - }) - mapper(Address, addresses) - - s = create_session() - q = s.query(m) - user = q.get(7) - assert getattr(User, 'addresses').hasparent(user.addresses[0], optimistic=True) - assert not class_mapper(Address)._is_orphan(user.addresses[0]) def testwithrepeat(self): """tests a one-to-many eager load where we also query on joined criterion, where the joined @@ -1015,17 +883,6 @@ class EagerTest(MapperSuperTest): {'user_id' : 8, 'addresses' : (Address, [{'address_id' : 2, 'email_address':'ed@wood.com'}, {'address_id':3, 'email_address':'ed@bettyboop.com'}, {'address_id':4, 'email_address':'ed@lala.com'}])}, ) - def testcircular(self): - """test that a circular eager relationship breaks the cycle with a lazy loader""" - m = mapper(User, users, properties = dict( - addresses = relation(mapper(Address, addresses), lazy=False, backref=backref('user', lazy=False)) - )) - assert class_mapper(User).props['addresses'].lazy is False - assert class_mapper(Address).props['user'].lazy is False - session = create_session() - l = session.query(User).select() - self.assert_result(l, User, *user_address_result) - def testonselect(self): """test eager loading of a mapper which is against a select""" @@ -1072,61 +929,6 @@ class EagerTest(MapperSuperTest): } ) - def testdouble(self): - """tests eager loading with two relations simultaneously, from the same table. """ - openorders = alias(orders, 'openorders') - closedorders = alias(orders, 'closedorders') - ordermapper = mapper(Order, orders) - m = mapper(User, users, properties = dict( - addresses = relation(mapper(Address, addresses), lazy = False), - open_orders = relation(mapper(Order, openorders, non_primary=True), primaryjoin = and_(openorders.c.isopen == 1, users.c.user_id==openorders.c.user_id), lazy = False), - closed_orders = relation(mapper(Order, closedorders, non_primary=True), primaryjoin = and_(closedorders.c.isopen == 0, users.c.user_id==closedorders.c.user_id), lazy = False) - )) - q = create_session().query(m) - l = q.select() - self.assert_result(l, User, - {'user_id' : 7, - 'addresses' : (Address, [{'address_id' : 1}]), - 'open_orders' : (Order, [{'order_id' : 3}]), - 'closed_orders' : (Order, [{'order_id' : 1},{'order_id' : 5},]) - }, - {'user_id' : 8, - 'addresses' : (Address, [{'address_id' : 2}, {'address_id' : 3}, {'address_id' : 4}]), - 'open_orders' : (Order, []), - 'closed_orders' : (Order, []) - }, - {'user_id' : 9, - 'addresses' : (Address, []), - 'open_orders' : (Order, [{'order_id' : 4}]), - 'closed_orders' : (Order, [{'order_id' : 2}]) - } - ) - - def testdoublewithscalar(self): - """tests eager loading with two relations from the same table, with one of them joining to the parent User. the other is the primary mapper. doesn't re-test addresses relation.""" - max_orders_by_user = select([func.max(orders.c.order_id).label('order_id')], group_by=[orders.c.user_id]).alias('max_orders_by_user') - max_orders = orders.select(orders.c.order_id==max_orders_by_user.c.order_id).alias('max_orders') - m = mapper(User, users, properties={ - 'orders':relation(mapper(Order, orders), backref='user', lazy=False), - 'max_order':relation(mapper(Order, max_orders, non_primary=True), lazy=False, uselist=False) - }) - q = create_session().query(m) - l = q.select() - self.assert_result(l, User, - {'user_id' : 7, - 'orders' : (Order, [{'order_id' : 1}, {'order_id' : 3},{'order_id' : 5},]), - 'max_order' : (Order, {'order_id' : 5}) - }, - {'user_id' : 8, - 'orders' : (Order, []), - 'max_order' : None, - }, - {'user_id' : 9, - 'orders' : (Order, [{'order_id' : 2},{'order_id' : 4}]), - 'max_order' : (Order, {'order_id' : 4}) - } - ) - def testnested(self): """tests eager loading of a parent item with two types of child items, where one of those child items eager loads its own child items.""" @@ -1142,71 +944,7 @@ class EagerTest(MapperSuperTest): l = q.select() self.assert_result(l, User, *user_all_result) - def testmanytomany(self): - items = orderitems - m = mapper(Item, items, properties = dict( - keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy=False, order_by=[keywords.c.keyword_id]), - )) - q = create_session().query(m) - l = q.select() - self.assert_result(l, Item, *item_keyword_result) - - l = q.select(and_(keywords.c.name == 'red', keywords.c.keyword_id == itemkeywords.c.keyword_id, items.c.item_id==itemkeywords.c.item_id)) - self.assert_result(l, Item, - {'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])}, - {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])}, - ) - - - def testmanytomanyoptions(self): - items = orderitems - m = mapper(Item, items, properties = dict( - keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy=True, order_by=[keywords.c.keyword_id]), - )) - q = create_session().query(m).options(eagerload('keywords')) - def go(): - l = q.select() - self.assert_result(l, Item, *item_keyword_result) - self.assert_sql_count(db, go, 1) - - def go(): - l = q.select(and_(keywords.c.name == 'red', keywords.c.keyword_id == itemkeywords.c.keyword_id, items.c.item_id==itemkeywords.c.item_id)) - self.assert_result(l, Item, - {'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])}, - {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])}, - ) - self.assert_sql_count(db, go, 1) - - def testoneandmany(self): - """tests eager load for a parent object with a child object that - contains a many-to-many relationship to a third object.""" - items = orderitems - - m = mapper(Item, items, - properties = dict( - keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = False, order_by=[keywords.c.keyword_id]), - )) - - m = mapper(Order, orders, properties = dict( - items = relation(m, lazy = False) - )) - q = create_session().query(m) - l = q.select("orders.order_id in (1,2,3)") - self.assert_result(l, Order, - {'order_id' : 1, 'items': (Item, [])}, - {'order_id' : 2, 'items': (Item, [ - {'item_id':1, 'item_name':'item 1', 'keywords': (Keyword, [{'keyword_id':2, 'name':'red'}, {'keyword_id':4, 'name':'big'}, {'keyword_id' : 6, 'name':'round'}])}, - {'item_id':2, 'item_name':'item 2','keywords' : (Keyword, [{'keyword_id' : 2, 'name':'red'}, {'keyword_id' : 5, 'name':'small'}, {'keyword_id' : 7, 'name':'square'}])} - ])}, - {'order_id' : 3, 'items': (Item, [ - {'item_id':3, 'item_name':'item 3', 'keywords' : (Keyword, [{'keyword_id' : 3, 'name':'green'}, {'keyword_id' : 4, 'name':'big'}, {'keyword_id' : 6, 'name':'round'}])}, - {'item_id':4, 'item_name':'item 4'}, - {'item_id':5, 'item_name':'item 5'} - ])}, - ) - - if __name__ == "__main__": testbase.main()