def suite():
modules_to_test = (
- 'orm.attributes',
- 'orm.query',
- 'orm.lazy_relations',
+ 'orm.attributes',
+ 'orm.query',
+ 'orm.lazy_relations',
+ 'orm.eager_relations',
'orm.mapper',
'orm.generative',
'orm.lazytest1',
--- /dev/null
+"""basic tests of eager loaded attributes"""
+
+from sqlalchemy import *
+from sqlalchemy.orm import *
+import testbase
+
+from fixtures import *
+from query import QueryTest
+
+class EagerTest(QueryTest):
+ keep_mappers = False
+
+ def setup_mappers(self):
+ pass
+
+ def test_basic(self):
+ mapper(User, users, properties={
+ 'addresses':relation(mapper(Address, addresses), lazy=False)
+ })
+ sess = create_session()
+ q = sess.query(User)
+
+ assert [User(id=7, addresses=[Address(id=1, email_address='jack@bean.com')])] == q.filter(users.c.id == 7).all()
+
+ assert [
+ User(id=7, addresses=[
+ Address(id=1)
+ ]),
+ User(id=8, addresses=[
+ Address(id=2, email_address='ed@wood.com'),
+ Address(id=3, email_address='ed@bettyboop.com'),
+ Address(id=4, email_address='ed@lala.com'),
+ ]),
+ User(id=9, addresses=[
+ Address(id=5)
+ ]),
+ User(id=10, addresses=[])
+ ] == q.all()
+
+ def test_no_orphan(self):
+ """test that an eagerly loaded child object is not marked as an orphan"""
+
+ mapper(User, users, properties={
+ 'addresses':relation(Address, cascade="all,delete-orphan", lazy=False)
+ })
+ mapper(Address, addresses)
+
+ sess = create_session()
+ user = sess.query(User).get(7)
+ assert getattr(User, 'addresses').hasparent(user.addresses[0], optimistic=True)
+ assert not class_mapper(Address)._is_orphan(user.addresses[0])
+
+ def test_orderby(self):
+ mapper(User, users, properties = {
+ 'addresses':relation(mapper(Address, addresses), lazy=False, order_by=addresses.c.email_address),
+ })
+ q = create_session().query(User)
+ assert [
+ User(id=7, addresses=[
+ Address(id=1)
+ ]),
+ User(id=8, addresses=[
+ Address(id=3, email_address='ed@bettyboop.com'),
+ Address(id=4, email_address='ed@lala.com'),
+ Address(id=2, email_address='ed@wood.com')
+ ]),
+ User(id=9, addresses=[
+ Address(id=5)
+ ]),
+ User(id=10, addresses=[])
+ ] == q.all()
+
+ def test_orderby_secondary(self):
+ """tests that a regular mapper select on a single table can order by a relation to a second table"""
+
+ mapper(Address, addresses)
+
+ mapper(User, users, properties = dict(
+ addresses = relation(Address, lazy=False),
+ ))
+
+ q = create_session().query(User)
+ l = q.filter(users.c.id==addresses.c.user_id).order_by(addresses.c.email_address).all()
+
+ assert [
+ User(id=8, addresses=[
+ Address(id=2, email_address='ed@wood.com'),
+ Address(id=3, email_address='ed@bettyboop.com'),
+ Address(id=4, email_address='ed@lala.com'),
+ ]),
+ User(id=9, addresses=[
+ Address(id=5)
+ ]),
+ User(id=7, addresses=[
+ Address(id=1)
+ ]),
+ ] == l
+
+ def test_orderby_desc(self):
+ mapper(Address, addresses)
+
+ mapper(User, users, properties = dict(
+ addresses = relation(Address, lazy=False, order_by=[desc(addresses.c.email_address)]),
+ ))
+ sess = create_session()
+ assert [
+ User(id=7, addresses=[
+ Address(id=1)
+ ]),
+ User(id=8, addresses=[
+ Address(id=2, email_address='ed@wood.com'),
+ Address(id=4, email_address='ed@lala.com'),
+ Address(id=3, email_address='ed@bettyboop.com'),
+ ]),
+ User(id=9, addresses=[
+ Address(id=5)
+ ]),
+ User(id=10, addresses=[])
+ ] == sess.query(User).all()
+
+ def test_many_to_many(self):
+
+ mapper(Keyword, keywords)
+ mapper(Item, items, properties = dict(
+ keywords = relation(Keyword, secondary=item_keywords, lazy=False),
+ ))
+
+ q = create_session().query(Item)
+ def go():
+ assert [
+ Item(id=1, keywords=[Keyword(name='red'), Keyword(name='big'), Keyword(name='round')]),
+ Item(id=2, keywords=[Keyword(name='red'), Keyword(name='small'), Keyword(name='square')]),
+ Item(id=3, keywords=[Keyword(name='green'), Keyword(name='big'), Keyword(name='round')]),
+ Item(id=4, keywords=[]),
+ Item(id=5, keywords=[]),
+ ] == q.all()
+ self.assert_sql_count(testbase.db, go, 1)
+
+ def go():
+ assert [
+ Item(id=1, keywords=[Keyword(name='red'), Keyword(name='big'), Keyword(name='round')]),
+ Item(id=2, keywords=[Keyword(name='red'), Keyword(name='small'), Keyword(name='square')]),
+ ] == q.join('keywords').filter(keywords.c.name == 'red').all()
+ self.assert_sql_count(testbase.db, go, 1)
+
+
+ def test_eager_option(self):
+ mapper(Keyword, keywords)
+ mapper(Item, items, properties = dict(
+ keywords = relation(Keyword, secondary=item_keywords, lazy=True),
+ ))
+
+ q = create_session().query(Item)
+
+ def go():
+ assert [
+ Item(id=1, keywords=[Keyword(name='red'), Keyword(name='big'), Keyword(name='round')]),
+ Item(id=2, keywords=[Keyword(name='red'), Keyword(name='small'), Keyword(name='square')]),
+ ] == q.options(eagerload('keywords')).join('keywords').filter(keywords.c.name == 'red').all()
+
+ self.assert_sql_count(testbase.db, go, 1)
+
+ def test_cyclical(self):
+ """test that a circular eager relationship breaks the cycle with a lazy loader"""
+
+ mapper(Address, addresses)
+ mapper(User, users, properties = dict(
+ addresses = relation(Address, lazy=False, backref=backref('user', lazy=False))
+ ))
+ assert class_mapper(User).props['addresses'].lazy is False
+ assert class_mapper(Address).props['user'].lazy is False
+
+ sess = create_session()
+ assert [
+ User(id=7, addresses=[
+ Address(id=1)
+ ]),
+ User(id=8, addresses=[
+ Address(id=2, email_address='ed@wood.com'),
+ Address(id=3, email_address='ed@bettyboop.com'),
+ Address(id=4, email_address='ed@lala.com'),
+ ]),
+ User(id=9, addresses=[
+ Address(id=5)
+ ]),
+ User(id=10, addresses=[])
+ ] == sess.query(User).all()
+
+ def test_double(self):
+ """tests lazy loading with two relations simulatneously, from the same table, using aliases. """
+ openorders = alias(orders, 'openorders')
+ closedorders = alias(orders, 'closedorders')
+
+ mapper(Address, addresses)
+
+ mapper(User, users, properties = dict(
+ addresses = relation(Address, lazy=False),
+ open_orders = relation(mapper(Order, openorders, entity_name='open'), primaryjoin = and_(openorders.c.isopen == 1, users.c.id==openorders.c.user_id), lazy=False),
+ closed_orders = relation(mapper(Order, closedorders,entity_name='closed'), primaryjoin = and_(closedorders.c.isopen == 0, users.c.id==closedorders.c.user_id), lazy=False)
+ ))
+ q = create_session().query(User)
+
+ def go():
+ assert [
+ User(
+ id=7,
+ addresses=[Address(id=1)],
+ open_orders = [Order(id=3)],
+ closed_orders = [Order(id=1), Order(id=5)]
+ ),
+ User(
+ id=8,
+ addresses=[Address(id=2), Address(id=3), Address(id=4)],
+ open_orders = [],
+ closed_orders = []
+ ),
+ User(
+ id=9,
+ addresses=[Address(id=5)],
+ open_orders = [Order(id=4)],
+ closed_orders = [Order(id=2)]
+ ),
+ User(id=10)
+
+ ] == q.all()
+ self.assert_sql_count(testbase.db, go, 1)
+
+ def test_limit(self):
+ """test limit operations combined with lazy-load relationships."""
+
+ mapper(Item, items)
+ mapper(Order, orders, properties={
+ 'items':relation(Item, secondary=order_items, lazy=False)
+ })
+ mapper(User, users, properties={
+ 'addresses':relation(mapper(Address, addresses), lazy=False),
+ 'orders':relation(Order, lazy=True)
+ })
+
+ sess = create_session()
+ q = sess.query(User)
+
+ if testbase.db.engine.name == 'mssql':
+ l = q.limit(2).all()
+ assert self.user_all_result[:2] == l
+ else:
+ l = q.limit(2).offset(1).all()
+ print l
+ print self.user_all_result[1:3]
+ assert self.user_all_result[1:3] == l
+
+ def test_distinct(self):
+ # this is an involved 3x union of the users table to get a lot of rows.
+ # then see if the "distinct" works its way out. you actually get the same
+ # result with or without the distinct, just via less or more rows.
+ u2 = users.alias('u2')
+ s = union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u')
+
+ mapper(User, users, properties={
+ 'addresses':relation(mapper(Address, addresses), lazy=False),
+ })
+
+ sess = create_session()
+ q = sess.query(User)
+
+ def go():
+ l = q.filter(s.c.u2_id==User.c.id).distinct().all()
+ assert [
+ User(id=7, addresses=[
+ Address(id=1)
+ ]),
+ User(id=8, addresses=[
+ Address(id=2, email_address='ed@wood.com'),
+ Address(id=3, email_address='ed@bettyboop.com'),
+ Address(id=4, email_address='ed@lala.com'),
+ ]),
+ User(id=9, addresses=[
+ Address(id=5)
+ ]),
+ User(id=10, addresses=[])
+ ] == l
+ self.assert_sql_count(testbase.db, go, 1)
+
+ def test_limit_2(self):
+ mapper(Keyword, keywords)
+ mapper(Item, items, properties = dict(
+ keywords = relation(Keyword, secondary=item_keywords, lazy=False, order_by=[keywords.c.id]),
+ ))
+
+ sess = create_session()
+ q = sess.query(Item)
+ l = q.filter((Item.c.description=='item 2') | (Item.c.description=='item 5') | (Item.c.description=='item 3')).\
+ order_by(Item.c.id).limit(2).all()
+
+ assert [
+ Item(id=2, keywords=[Keyword(name='red'), Keyword(name='small'), Keyword(name='square')]),
+ Item(id=3, keywords=[Keyword(name='green'), Keyword(name='big'), Keyword(name='round')]),
+ ] == l
+
+ def test_limit_3(self):
+ """test that the ORDER BY is propigated from the inner select to the outer select, when using the
+ 'wrapped' select statement resulting from the combination of eager loading and limit/offset clauses."""
+
+ mapper(Item, items)
+ mapper(Order, orders, properties = dict(
+ items = relation(Item, secondary=order_items, lazy=False)
+ ))
+
+ mapper(Address, addresses)
+ mapper(User, users, properties = dict(
+ addresses = relation(Address, lazy=False),
+ orders = relation(Order, lazy=False),
+ ))
+ sess = create_session()
+
+ q = sess.query(User)
+
+ if testbase.db.engine.name != 'mssql':
+ l = q.join('orders').order_by(desc(orders.c.user_id)).limit(2).offset(1)
+ assert [
+ User(id=9,
+ orders=[Order(id=2), Order(id=4)],
+ addresses=[Address(id=5)]
+ ),
+ User(id=7,
+ orders=[Order(id=1), Order(id=3), Order(id=5)],
+ addresses=[Address(id=1)]
+ )
+ ] == l.all()
+
+ l = q.join('addresses').order_by(desc(addresses.c.email_address)).limit(1).offset(0)
+ assert [
+ User(id=7,
+ orders=[Order(id=1), Order(id=3), Order(id=5)],
+ addresses=[Address(id=1)]
+ )
+ ] == l.all()
+
+ def test_one_to_many_scalar(self):
+ mapper(User, users, properties = dict(
+ address = relation(mapper(Address, addresses), lazy=False, uselist=False)
+ ))
+ q = create_session().query(User)
+
+ def go():
+ l = q.filter(users.c.id == 7).all()
+ assert [User(id=7, address=Address(id=1))] == l
+ self.assert_sql_count(testbase.db, go, 1)
+
+ def test_many_to_one(self):
+ mapper(Address, addresses, properties = dict(
+ user = relation(mapper(User, users), lazy=False)
+ ))
+ sess = create_session()
+ q = sess.query(Address)
+
+ def go():
+ a = q.filter(addresses.c.id==1).one()
+ assert a.user is not None
+ u1 = sess.query(User).get(7)
+ assert a.user is u1
+ self.assert_sql_count(testbase.db, go, 1)
+
+
+ def test_one_and_many(self):
+ """tests eager load for a parent object with a child object that
+ contains a many-to-many relationship to a third object."""
+
+ mapper(User, users, properties={
+ 'orders':relation(Order, lazy=False)
+ })
+ mapper(Item, items)
+ mapper(Order, orders, properties = dict(
+ items = relation(Item, secondary=order_items, lazy=False)
+ ))
+
+ q = create_session().query(User)
+
+ l = q.filter("users.id in (7, 8, 9)")
+
+ def go():
+ assert [
+ User(id=7, orders=[
+ Order(id=1, items=[Item(id=1), Item(id=2), Item(id=3)]),
+ Order(id=3, items=[Item(id=3), Item(id=4), Item(id=5)]),
+ Order(id=5, items=[Item(id=5)]),
+ ]),
+ User(id=8, orders=[]),
+ User(id=9, orders=[
+ Order(id=2, items=[Item(id=1), Item(id=2), Item(id=3)]),
+ Order(id=4, items=[Item(id=1), Item(id=5)]),
+ ]),
+ ] == l.all()
+ self.assert_sql_count(testbase.db, go, 1)
+
+ def test_double_with_aggregate(self):
+
+ max_orders_by_user = select([func.max(orders.c.id).label('order_id')], group_by=[orders.c.user_id]).alias('max_orders_by_user')
+
+ max_orders = orders.select(orders.c.id==max_orders_by_user.c.order_id).alias('max_orders')
+
+ mapper(Order, orders)
+ mapper(User, users, properties={
+ 'orders':relation(Order, backref='user', lazy=False),
+ 'max_order':relation(mapper(Order, max_orders, non_primary=True), lazy=False, uselist=False)
+ })
+ q = create_session().query(User)
+ def go():
+ assert [
+ User(id=7, orders=[
+ Order(id=1),
+ Order(id=3),
+ Order(id=5),
+ ],
+ max_order=Order(id=5)
+ ),
+ User(id=8, orders=[]),
+ User(id=9, orders=[Order(id=2),Order(id=4)],
+ max_order=Order(id=4)
+ ),
+ User(id=10),
+ ] == q.all()
+ self.assert_sql_count(testbase.db, go, 1)
+
+if __name__ == '__main__':
+ testbase.main()
from sqlalchemy import *
from testbase import Table, Column
+_recursion_stack = util.Set()
class Base(object):
def __init__(self, **kwargs):
for k in kwargs:
only look at attributes that are present on the source object.
"""
- # use __dict__ to avoid instrumented properties
- for attr in self.__dict__.keys():
- if attr[0] == '_':
- continue
- value = getattr(self, attr)
- if hasattr(value, '__iter__') and not isinstance(value, basestring):
- if len(value) == 0:
+
+ if self in _recursion_stack:
+ return True
+ _recursion_stack.add(self)
+ try:
+ # use __dict__ to avoid instrumented properties
+ for attr in self.__dict__.keys():
+ if attr[0] == '_':
continue
- for (us, them) in zip(value, getattr(other, attr)):
- if us != them:
- return False
+ value = getattr(self, attr)
+ if hasattr(value, '__iter__') and not isinstance(value, basestring):
+ if len(value) == 0:
+ continue
+ for (us, them) in zip(value, getattr(other, attr)):
+ if us != them:
+ return False
+ else:
+ continue
else:
- continue
+ if value is not None:
+ if value != getattr(other, attr):
+ return False
else:
- if value is not None:
- if value != getattr(other, attr):
- return False
- else:
- return True
-
+ return True
+ finally:
+ _recursion_stack.remove(self)
+
class User(Base):pass
class Order(Base):pass
class Item(Base):pass
self.assert_result(l, User, *user_address_result)
class EagerTest(MapperSuperTest):
- def testbasic(self):
- """tests a basic one-to-many eager load"""
- m = mapper(Address, addresses)
-
- m = mapper(User, users, properties = dict(
- addresses = relation(m, lazy = False),
- ))
- q = create_session().query(m)
- l = q.select()
- self.assert_result(l, User, *user_address_result)
-
- def testorderby(self):
- m = mapper(Address, addresses)
-
- m = mapper(User, users, properties = dict(
- addresses = relation(m, lazy = False, order_by=addresses.c.email_address),
- ))
- q = create_session().query(m)
- l = q.select()
- self.assert_result(l, User,
- {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])},
- {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@bettyboop.com'}, {'email_address':'ed@lala.com'}, {'email_address':'ed@wood.com'}])},
- {'user_id' : 9, 'addresses' : (Address, [])}
- )
-
-
- def testorderby_desc(self):
- m = mapper(Address, addresses)
-
- m = mapper(User, users, properties = dict(
- addresses = relation(m, lazy = False, order_by=[desc(addresses.c.email_address)]),
- ))
- q = create_session().query(m)
- l = q.select()
-
- self.assert_result(l, User,
- {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])},
- {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@wood.com'},{'email_address':'ed@lala.com'}, {'email_address':'ed@bettyboop.com'}, ])},
- {'user_id' : 9, 'addresses' : (Address, [])},
- )
-
- def testlimit(self):
- ordermapper = mapper(Order, orders, properties = dict(
- items = relation(mapper(Item, orderitems), lazy = False)
- ))
-
- m = mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses), lazy = False),
- orders = relation(ordermapper, primaryjoin = users.c.user_id==orders.c.user_id, lazy = False),
- ))
- sess = create_session()
- q = sess.query(m)
-
- if db.engine.name == 'mssql':
- l = q.select(limit=2)
- self.assert_result(l, User, *user_all_result[:2])
- else:
- l = q.select(limit=2, offset=1)
- self.assert_result(l, User, *user_all_result[1:3])
-
- # this is an involved 3x union of the users table to get a lot of rows.
- # then see if the "distinct" works its way out. you actually get the same
- # result with or without the distinct, just via less or more rows.
- u2 = users.alias('u2')
- s = union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u')
- l = q.select(s.c.u2_user_id==User.c.user_id, distinct=True)
- self.assert_result(l, User, *user_all_result)
- sess.clear()
- clear_mappers()
- m = mapper(Item, orderitems, properties = dict(
- keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = False, order_by=[keywords.c.keyword_id]),
- ))
- q = sess.query(m)
- l = q.select((Item.c.item_name=='item 2') | (Item.c.item_name=='item 5') | (Item.c.item_name=='item 3'), order_by=[Item.c.item_id], limit=2)
- self.assert_result(l, Item, *[item_keyword_result[1], item_keyword_result[2]])
-
- def testmorelimit(self):
- """test that the ORDER BY is propigated from the inner select to the outer select, when using the
- 'wrapped' select statement resulting from the combination of eager loading and limit/offset clauses."""
- ordermapper = mapper(Order, orders, properties = dict(
- items = relation(mapper(Item, orderitems), lazy = False)
- ))
-
- m = mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses), lazy = False),
- orders = relation(ordermapper, primaryjoin = users.c.user_id==orders.c.user_id, lazy = False),
- ))
- sess = create_session()
- q = sess.query(m)
-
- if db.engine.name != 'mssql':
- l = q.select(q.join_to('orders'), order_by=desc(orders.c.user_id), limit=2, offset=1)
- self.assert_result(l, User, *(user_all_result[2], user_all_result[0]))
-
- l = q.select(q.join_to('addresses'), order_by=desc(addresses.c.email_address), limit=1, offset=0)
- self.assert_result(l, User, *(user_all_result[0],))
-
- def testonetoone(self):
- m = mapper(User, users, properties = dict(
- address = relation(mapper(Address, addresses), lazy = False, uselist = False)
- ))
- q = create_session().query(m)
- l = q.select(users.c.user_id == 7)
- self.assert_result(l, User,
- {'user_id' : 7, 'address' : (Address, {'address_id' : 1, 'email_address': 'jack@bean.com'})},
- )
-
- def testbackwardsonetoone(self):
- m = mapper(Address, addresses, properties = dict(
- user = relation(mapper(User, users), lazy = False)
- )).compile()
- self.echo(repr(m.props['user'].uselist))
- q = create_session().query(m)
- l = q.select(addresses.c.address_id == 1)
- self.assert_result(l, Address,
- {'address_id' : 1, 'email_address' : 'jack@bean.com',
- 'user' : (User, {'user_id' : 7, 'user_name' : 'jack'})
- },
- )
-
- def testorphanstate(self):
- """test that an eagerly loaded child object is not marked as an orphan"""
- m = mapper(User, users, properties={
- 'addresses':relation(Address, cascade="all,delete-orphan", lazy=False)
- })
- mapper(Address, addresses)
-
- s = create_session()
- q = s.query(m)
- user = q.get(7)
- assert getattr(User, 'addresses').hasparent(user.addresses[0], optimistic=True)
- assert not class_mapper(Address)._is_orphan(user.addresses[0])
def testwithrepeat(self):
"""tests a one-to-many eager load where we also query on joined criterion, where the joined
{'user_id' : 8, 'addresses' : (Address, [{'address_id' : 2, 'email_address':'ed@wood.com'}, {'address_id':3, 'email_address':'ed@bettyboop.com'}, {'address_id':4, 'email_address':'ed@lala.com'}])},
)
- def testcircular(self):
- """test that a circular eager relationship breaks the cycle with a lazy loader"""
- m = mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses), lazy=False, backref=backref('user', lazy=False))
- ))
- assert class_mapper(User).props['addresses'].lazy is False
- assert class_mapper(Address).props['user'].lazy is False
- session = create_session()
- l = session.query(User).select()
- self.assert_result(l, User, *user_address_result)
-
def testonselect(self):
"""test eager loading of a mapper which is against a select"""
}
)
- def testdouble(self):
- """tests eager loading with two relations simultaneously, from the same table. """
- openorders = alias(orders, 'openorders')
- closedorders = alias(orders, 'closedorders')
- ordermapper = mapper(Order, orders)
- m = mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses), lazy = False),
- open_orders = relation(mapper(Order, openorders, non_primary=True), primaryjoin = and_(openorders.c.isopen == 1, users.c.user_id==openorders.c.user_id), lazy = False),
- closed_orders = relation(mapper(Order, closedorders, non_primary=True), primaryjoin = and_(closedorders.c.isopen == 0, users.c.user_id==closedorders.c.user_id), lazy = False)
- ))
- q = create_session().query(m)
- l = q.select()
- self.assert_result(l, User,
- {'user_id' : 7,
- 'addresses' : (Address, [{'address_id' : 1}]),
- 'open_orders' : (Order, [{'order_id' : 3}]),
- 'closed_orders' : (Order, [{'order_id' : 1},{'order_id' : 5},])
- },
- {'user_id' : 8,
- 'addresses' : (Address, [{'address_id' : 2}, {'address_id' : 3}, {'address_id' : 4}]),
- 'open_orders' : (Order, []),
- 'closed_orders' : (Order, [])
- },
- {'user_id' : 9,
- 'addresses' : (Address, []),
- 'open_orders' : (Order, [{'order_id' : 4}]),
- 'closed_orders' : (Order, [{'order_id' : 2}])
- }
- )
-
- def testdoublewithscalar(self):
- """tests eager loading with two relations from the same table, with one of them joining to the parent User. the other is the primary mapper. doesn't re-test addresses relation."""
- max_orders_by_user = select([func.max(orders.c.order_id).label('order_id')], group_by=[orders.c.user_id]).alias('max_orders_by_user')
- max_orders = orders.select(orders.c.order_id==max_orders_by_user.c.order_id).alias('max_orders')
- m = mapper(User, users, properties={
- 'orders':relation(mapper(Order, orders), backref='user', lazy=False),
- 'max_order':relation(mapper(Order, max_orders, non_primary=True), lazy=False, uselist=False)
- })
- q = create_session().query(m)
- l = q.select()
- self.assert_result(l, User,
- {'user_id' : 7,
- 'orders' : (Order, [{'order_id' : 1}, {'order_id' : 3},{'order_id' : 5},]),
- 'max_order' : (Order, {'order_id' : 5})
- },
- {'user_id' : 8,
- 'orders' : (Order, []),
- 'max_order' : None,
- },
- {'user_id' : 9,
- 'orders' : (Order, [{'order_id' : 2},{'order_id' : 4}]),
- 'max_order' : (Order, {'order_id' : 4})
- }
- )
-
def testnested(self):
"""tests eager loading of a parent item with two types of child items,
where one of those child items eager loads its own child items."""
l = q.select()
self.assert_result(l, User, *user_all_result)
- def testmanytomany(self):
- items = orderitems
- m = mapper(Item, items, properties = dict(
- keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy=False, order_by=[keywords.c.keyword_id]),
- ))
- q = create_session().query(m)
- l = q.select()
- self.assert_result(l, Item, *item_keyword_result)
-
- l = q.select(and_(keywords.c.name == 'red', keywords.c.keyword_id == itemkeywords.c.keyword_id, items.c.item_id==itemkeywords.c.item_id))
- self.assert_result(l, Item,
- {'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])},
- {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])},
- )
-
-
- def testmanytomanyoptions(self):
- items = orderitems
- m = mapper(Item, items, properties = dict(
- keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy=True, order_by=[keywords.c.keyword_id]),
- ))
- q = create_session().query(m).options(eagerload('keywords'))
- def go():
- l = q.select()
- self.assert_result(l, Item, *item_keyword_result)
- self.assert_sql_count(db, go, 1)
-
- def go():
- l = q.select(and_(keywords.c.name == 'red', keywords.c.keyword_id == itemkeywords.c.keyword_id, items.c.item_id==itemkeywords.c.item_id))
- self.assert_result(l, Item,
- {'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])},
- {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])},
- )
- self.assert_sql_count(db, go, 1)
-
- def testoneandmany(self):
- """tests eager load for a parent object with a child object that
- contains a many-to-many relationship to a third object."""
- items = orderitems
-
- m = mapper(Item, items,
- properties = dict(
- keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = False, order_by=[keywords.c.keyword_id]),
- ))
-
- m = mapper(Order, orders, properties = dict(
- items = relation(m, lazy = False)
- ))
- q = create_session().query(m)
- l = q.select("orders.order_id in (1,2,3)")
- self.assert_result(l, Order,
- {'order_id' : 1, 'items': (Item, [])},
- {'order_id' : 2, 'items': (Item, [
- {'item_id':1, 'item_name':'item 1', 'keywords': (Keyword, [{'keyword_id':2, 'name':'red'}, {'keyword_id':4, 'name':'big'}, {'keyword_id' : 6, 'name':'round'}])},
- {'item_id':2, 'item_name':'item 2','keywords' : (Keyword, [{'keyword_id' : 2, 'name':'red'}, {'keyword_id' : 5, 'name':'small'}, {'keyword_id' : 7, 'name':'square'}])}
- ])},
- {'order_id' : 3, 'items': (Item, [
- {'item_id':3, 'item_name':'item 3', 'keywords' : (Keyword, [{'keyword_id' : 3, 'name':'green'}, {'keyword_id' : 4, 'name':'big'}, {'keyword_id' : 6, 'name':'round'}])},
- {'item_id':4, 'item_name':'item 4'},
- {'item_id':5, 'item_name':'item 5'}
- ])},
- )
-
-
if __name__ == "__main__":
testbase.main()