[ticket:597], and are constructed with a thunk instead
- orm
- forwards-compatibility with 0.4: added one(), first(), and
- all() to Query
+ all() to Query. almost all Query functionality from 0.4 is
+ present in 0.3.9 for forwards-compat purposes.
+ - reset_joinpoint() really really works this time, promise ! lets
+ you re-join from the root:
+ query.join(['a', 'b']).filter(<crit>).reset_joinpoint().\
+ join(['a', 'c']).filter(<some other crit>).all()
+ in 0.4 all join() calls start from the "root"
- added synchronization to the mapper() construction step, to avoid
thread collections when pre-existing mappers are compiling in a
different thread [ticket:613]
"""
return self.filter(self._join_by(args, kwargs, start=self._joinpoint))
- def _join_to(self, prop, outerjoin=False):
+ def _join_to(self, prop, outerjoin=False, start=None):
+ if start is None:
+ start = self._joinpoint
+
if isinstance(prop, list):
keys = prop
else:
- [keys,p] = self._locate_prop(prop, start=self._joinpoint)
+ [keys,p] = self._locate_prop(prop, start=start)
+
clause = self._from_obj[-1]
- mapper = self._joinpoint
+
+ currenttables = [clause]
+ class FindJoinedTables(sql.NoColumnVisitor):
+ def visit_join(self, join):
+ currenttables.append(join.left)
+ currenttables.append(join.right)
+ FindJoinedTables().traverse(clause)
+
+ mapper = start
for key in keys:
prop = mapper.get_property(key, resolve_synonyms=True)
if prop._is_self_referential():
raise exceptions.InvalidRequestError("Self-referential query on '%s' property must be constructed manually using an Alias object for the related table." % str(prop))
- if outerjoin:
- if prop.secondary:
- clause = clause.outerjoin(prop.secondary, prop.get_join(mapper, primary=True, secondary=False))
- clause = clause.outerjoin(prop.select_table, prop.get_join(mapper, primary=False))
- else:
- clause = clause.outerjoin(prop.select_table, prop.get_join(mapper))
- else:
- if prop.secondary:
- clause = clause.join(prop.secondary, prop.get_join(mapper, primary=True, secondary=False))
- clause = clause.join(prop.select_table, prop.get_join(mapper, primary=False))
+ # dont re-join to a table already in our from objects
+ if prop.select_table not in currenttables:
+ if outerjoin:
+ if prop.secondary:
+ clause = clause.outerjoin(prop.secondary, prop.get_join(mapper, primary=True, secondary=False))
+ clause = clause.outerjoin(prop.select_table, prop.get_join(mapper, primary=False))
+ else:
+ clause = clause.outerjoin(prop.select_table, prop.get_join(mapper))
else:
- clause = clause.join(prop.select_table, prop.get_join(mapper))
+ if prop.secondary:
+ clause = clause.join(prop.secondary, prop.get_join(mapper, primary=True, secondary=False))
+ clause = clause.join(prop.select_table, prop.get_join(mapper, primary=False))
+ else:
+ clause = clause.join(prop.select_table, prop.get_join(mapper))
mapper = prop.mapper
return (clause, mapper)
to be released in 0.4."""
q = self._clone()
- q._joinpoint = q._mapper
+ q._joinpoint = q.mapper
return q
def select_from(self, from_obj):
modules_to_test = (
'orm.attributes',
'orm.mapper',
+ 'orm.query',
+ 'orm.lazy_relations',
+ 'orm.eager_relations',
'orm.generative',
'orm.lazytest1',
'orm.eagertest1',
--- /dev/null
+"""basic tests of eager loaded attributes"""
+
+from sqlalchemy import *
+from sqlalchemy.orm import *
+import testbase
+
+from fixtures import *
+from query import QueryTest
+
+class EagerTest(QueryTest):
+ keep_mappers = False
+
+ def setup_mappers(self):
+ pass
+
+ def test_basic(self):
+ mapper(User, users, properties={
+ 'addresses':relation(mapper(Address, addresses), lazy=False)
+ })
+ sess = create_session()
+ q = sess.query(User)
+
+ assert [User(id=7, addresses=[Address(id=1, email_address='jack@bean.com')])] == q.filter(users.c.id == 7).all()
+ assert fixtures.user_address_result == q.all()
+
+ def test_no_orphan(self):
+ """test that an eagerly loaded child object is not marked as an orphan"""
+
+ mapper(User, users, properties={
+ 'addresses':relation(Address, cascade="all,delete-orphan", lazy=False)
+ })
+ mapper(Address, addresses)
+
+ sess = create_session()
+ user = sess.query(User).get(7)
+ assert getattr(User, 'addresses').hasparent(user.addresses[0], optimistic=True)
+ assert not class_mapper(Address)._is_orphan(user.addresses[0])
+
+ def test_orderby(self):
+ mapper(User, users, properties = {
+ 'addresses':relation(mapper(Address, addresses), lazy=False, order_by=addresses.c.email_address),
+ })
+ q = create_session().query(User)
+ assert [
+ User(id=7, addresses=[
+ Address(id=1)
+ ]),
+ User(id=8, addresses=[
+ Address(id=3, email_address='ed@bettyboop.com'),
+ Address(id=4, email_address='ed@lala.com'),
+ Address(id=2, email_address='ed@wood.com')
+ ]),
+ User(id=9, addresses=[
+ Address(id=5)
+ ]),
+ User(id=10, addresses=[])
+ ] == q.all()
+
+ def test_orderby_secondary(self):
+ """tests that a regular mapper select on a single table can order by a relation to a second table"""
+
+ mapper(Address, addresses)
+
+ mapper(User, users, properties = dict(
+ addresses = relation(Address, lazy=False),
+ ))
+
+ q = create_session().query(User)
+ l = q.filter(users.c.id==addresses.c.user_id).order_by(addresses.c.email_address).all()
+
+ assert [
+ User(id=8, addresses=[
+ Address(id=2, email_address='ed@wood.com'),
+ Address(id=3, email_address='ed@bettyboop.com'),
+ Address(id=4, email_address='ed@lala.com'),
+ ]),
+ User(id=9, addresses=[
+ Address(id=5)
+ ]),
+ User(id=7, addresses=[
+ Address(id=1)
+ ]),
+ ] == l
+
+ def test_orderby_desc(self):
+ mapper(Address, addresses)
+
+ mapper(User, users, properties = dict(
+ addresses = relation(Address, lazy=False, order_by=[desc(addresses.c.email_address)]),
+ ))
+ sess = create_session()
+ assert [
+ User(id=7, addresses=[
+ Address(id=1)
+ ]),
+ User(id=8, addresses=[
+ Address(id=2, email_address='ed@wood.com'),
+ Address(id=4, email_address='ed@lala.com'),
+ Address(id=3, email_address='ed@bettyboop.com'),
+ ]),
+ User(id=9, addresses=[
+ Address(id=5)
+ ]),
+ User(id=10, addresses=[])
+ ] == sess.query(User).all()
+
+ def test_many_to_many(self):
+
+ mapper(Keyword, keywords)
+ mapper(Item, items, properties = dict(
+ keywords = relation(Keyword, secondary=item_keywords, lazy=False, order_by=keywords.c.id),
+ ))
+
+ q = create_session().query(Item)
+ def go():
+ assert fixtures.item_keyword_result == q.all()
+ self.assert_sql_count(testbase.db, go, 1)
+
+ def go():
+ assert fixtures.item_keyword_result[0:2] == q.join('keywords').filter(keywords.c.name == 'red').all()
+ self.assert_sql_count(testbase.db, go, 1)
+
+
+ def test_eager_option(self):
+ mapper(Keyword, keywords)
+ mapper(Item, items, properties = dict(
+ keywords = relation(Keyword, secondary=item_keywords, lazy=True),
+ ))
+
+ q = create_session().query(Item)
+
+ def go():
+ assert fixtures.item_keyword_result[0:2] == q.options(eagerload('keywords')).join('keywords').filter(keywords.c.name == 'red').all()
+
+ self.assert_sql_count(testbase.db, go, 1)
+
+ def test_cyclical(self):
+ """test that a circular eager relationship breaks the cycle with a lazy loader"""
+
+ mapper(Address, addresses)
+ mapper(User, users, properties = dict(
+ addresses = relation(Address, lazy=False, backref=backref('user', lazy=False))
+ ))
+ assert class_mapper(User).get_property('addresses').lazy is False
+ assert class_mapper(Address).get_property('user').lazy is False
+
+ sess = create_session()
+ assert fixtures.user_address_result == sess.query(User).all()
+
+ def test_double(self):
+ """tests lazy loading with two relations simulatneously, from the same table, using aliases. """
+ openorders = alias(orders, 'openorders')
+ closedorders = alias(orders, 'closedorders')
+
+ mapper(Address, addresses)
+
+ mapper(User, users, properties = dict(
+ addresses = relation(Address, lazy=False),
+ open_orders = relation(mapper(Order, openorders, entity_name='open'), primaryjoin = and_(openorders.c.isopen == 1, users.c.id==openorders.c.user_id), lazy=False),
+ closed_orders = relation(mapper(Order, closedorders,entity_name='closed'), primaryjoin = and_(closedorders.c.isopen == 0, users.c.id==closedorders.c.user_id), lazy=False)
+ ))
+ q = create_session().query(User)
+
+ def go():
+ assert [
+ User(
+ id=7,
+ addresses=[Address(id=1)],
+ open_orders = [Order(id=3)],
+ closed_orders = [Order(id=1), Order(id=5)]
+ ),
+ User(
+ id=8,
+ addresses=[Address(id=2), Address(id=3), Address(id=4)],
+ open_orders = [],
+ closed_orders = []
+ ),
+ User(
+ id=9,
+ addresses=[Address(id=5)],
+ open_orders = [Order(id=4)],
+ closed_orders = [Order(id=2)]
+ ),
+ User(id=10)
+
+ ] == q.all()
+ self.assert_sql_count(testbase.db, go, 1)
+
+ def test_limit(self):
+ """test limit operations combined with lazy-load relationships."""
+
+ mapper(Item, items)
+ mapper(Order, orders, properties={
+ 'items':relation(Item, secondary=order_items, lazy=False)
+ })
+ mapper(User, users, properties={
+ 'addresses':relation(mapper(Address, addresses), lazy=False),
+ 'orders':relation(Order, lazy=True)
+ })
+
+ sess = create_session()
+ q = sess.query(User)
+
+ if testbase.db.engine.name == 'mssql':
+ l = q.limit(2).all()
+ assert fixtures.user_all_result[:2] == l
+ else:
+ l = q.limit(2).offset(1).all()
+ assert fixtures.user_all_result[1:3] == l
+
+ def test_distinct(self):
+ # this is an involved 3x union of the users table to get a lot of rows.
+ # then see if the "distinct" works its way out. you actually get the same
+ # result with or without the distinct, just via less or more rows.
+ u2 = users.alias('u2')
+ s = union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u')
+
+ mapper(User, users, properties={
+ 'addresses':relation(mapper(Address, addresses), lazy=False),
+ })
+
+ sess = create_session()
+ q = sess.query(User)
+
+ def go():
+ l = q.filter(s.c.u2_id==User.c.id).distinct().all()
+ assert fixtures.user_address_result == l
+ self.assert_sql_count(testbase.db, go, 1)
+
+ def test_limit_2(self):
+ mapper(Keyword, keywords)
+ mapper(Item, items, properties = dict(
+ keywords = relation(Keyword, secondary=item_keywords, lazy=False, order_by=[keywords.c.id]),
+ ))
+
+ sess = create_session()
+ q = sess.query(Item)
+ l = q.filter((Item.c.description=='item 2') | (Item.c.description=='item 5') | (Item.c.description=='item 3')).\
+ order_by(Item.c.id).limit(2).all()
+
+ assert fixtures.item_keyword_result[1:3] == l
+
+ def test_limit_3(self):
+ """test that the ORDER BY is propigated from the inner select to the outer select, when using the
+ 'wrapped' select statement resulting from the combination of eager loading and limit/offset clauses."""
+
+ mapper(Item, items)
+ mapper(Order, orders, properties = dict(
+ items = relation(Item, secondary=order_items, lazy=False)
+ ))
+
+ mapper(Address, addresses)
+ mapper(User, users, properties = dict(
+ addresses = relation(Address, lazy=False),
+ orders = relation(Order, lazy=False),
+ ))
+ sess = create_session()
+
+ q = sess.query(User)
+
+ if testbase.db.engine.name != 'mssql':
+ l = q.join('orders').order_by(desc(orders.c.user_id)).limit(2).offset(1)
+ assert [
+ User(id=9,
+ orders=[Order(id=2), Order(id=4)],
+ addresses=[Address(id=5)]
+ ),
+ User(id=7,
+ orders=[Order(id=1), Order(id=3), Order(id=5)],
+ addresses=[Address(id=1)]
+ )
+ ] == l.all()
+
+ l = q.join('addresses').order_by(desc(addresses.c.email_address)).limit(1).offset(0)
+ assert [
+ User(id=7,
+ orders=[Order(id=1), Order(id=3), Order(id=5)],
+ addresses=[Address(id=1)]
+ )
+ ] == l.all()
+
+ def test_one_to_many_scalar(self):
+ mapper(User, users, properties = dict(
+ address = relation(mapper(Address, addresses), lazy=False, uselist=False)
+ ))
+ q = create_session().query(User)
+
+ def go():
+ l = q.filter(users.c.id == 7).all()
+ assert [User(id=7, address=Address(id=1))] == l
+ self.assert_sql_count(testbase.db, go, 1)
+
+ def test_many_to_one(self):
+ mapper(Address, addresses, properties = dict(
+ user = relation(mapper(User, users), lazy=False)
+ ))
+ sess = create_session()
+ q = sess.query(Address)
+
+ def go():
+ a = q.filter(addresses.c.id==1).one()
+ assert a.user is not None
+ u1 = sess.query(User).get(7)
+ assert a.user is u1
+ self.assert_sql_count(testbase.db, go, 1)
+
+
+ def test_one_and_many(self):
+ """tests eager load for a parent object with a child object that
+ contains a many-to-many relationship to a third object."""
+
+ mapper(User, users, properties={
+ 'orders':relation(Order, lazy=False)
+ })
+ mapper(Item, items)
+ mapper(Order, orders, properties = dict(
+ items = relation(Item, secondary=order_items, lazy=False, order_by=items.c.id)
+ ))
+
+ q = create_session().query(User)
+
+ l = q.filter("users.id in (7, 8, 9)")
+
+ def go():
+ assert fixtures.user_order_result[0:3] == l.all()
+ self.assert_sql_count(testbase.db, go, 1)
+
+ def test_double_with_aggregate(self):
+
+ max_orders_by_user = select([func.max(orders.c.id).label('order_id')], group_by=[orders.c.user_id]).alias('max_orders_by_user')
+
+ max_orders = orders.select(orders.c.id==max_orders_by_user.c.order_id).alias('max_orders')
+
+ mapper(Order, orders)
+ mapper(User, users, properties={
+ 'orders':relation(Order, backref='user', lazy=False),
+ 'max_order':relation(mapper(Order, max_orders, non_primary=True), lazy=False, uselist=False)
+ })
+ q = create_session().query(User)
+
+ def go():
+ assert [
+ User(id=7, orders=[
+ Order(id=1),
+ Order(id=3),
+ Order(id=5),
+ ],
+ max_order=Order(id=5)
+ ),
+ User(id=8, orders=[]),
+ User(id=9, orders=[Order(id=2),Order(id=4)],
+ max_order=Order(id=4)
+ ),
+ User(id=10),
+ ] == q.all()
+ self.assert_sql_count(testbase.db, go, 1)
+
+ def test_wide(self):
+ mapper(Order, orders, properties={'items':relation(Item, secondary=order_items, lazy=False, order_by=items.c.id)})
+ mapper(Item, items)
+ mapper(User, users, properties = dict(
+ addresses = relation(mapper(Address, addresses), lazy = False),
+ orders = relation(Order, lazy = False),
+ ))
+ q = create_session().query(User)
+ l = q.select()
+ assert fixtures.user_all_result == q.all()
+
+ def test_against_select(self):
+ """test eager loading of a mapper which is against a select"""
+
+ s = select([orders], orders.c.isopen==1).alias('openorders')
+
+ mapper(Order, s, properties={
+ 'user':relation(User, lazy=False)
+ })
+ mapper(User, users)
+
+ q = create_session().query(Order)
+ assert [
+ Order(id=3, user=User(id=7)),
+ Order(id=4, user=User(id=9))
+ ] == q.all()
+
+ q = q.select_from(s.join(order_items).join(items)).filter(~items.c.id.in_(1, 2, 5))
+ assert [
+ Order(id=3, user=User(id=7)),
+ ] == q.all()
+
+ def test_aliasing(self):
+ """test that eager loading uses aliases to insulate the eager load from regular criterion against those tables."""
+
+ mapper(User, users, properties = dict(
+ addresses = relation(mapper(Address, addresses), lazy=False)
+ ))
+ q = create_session().query(User)
+ l = q.filter(addresses.c.email_address == 'ed@lala.com').filter(addresses.c.user_id==users.c.id)
+ assert fixtures.user_address_result[1:2] == l.all()
+
+if __name__ == '__main__':
+ testbase.main()
--- /dev/null
+from sqlalchemy import *
+
+_recursion_stack = util.Set()
+class Base(object):
+ def __init__(self, **kwargs):
+ for k in kwargs:
+ setattr(self, k, kwargs[k])
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def __eq__(self, other):
+ """'passively' compare this object to another.
+
+ only look at attributes that are present on the source object.
+
+ """
+
+ if self in _recursion_stack:
+ return True
+ _recursion_stack.add(self)
+ try:
+ # use __dict__ to avoid instrumented properties
+ for attr in self.__dict__.keys():
+ if attr[0] == '_':
+ continue
+ value = getattr(self, attr)
+ if hasattr(value, '__iter__') and not isinstance(value, basestring):
+ if len(value) == 0:
+ continue
+ for (us, them) in zip(value, getattr(other, attr)):
+ if us != them:
+ return False
+ else:
+ continue
+ else:
+ if value is not None:
+ if value != getattr(other, attr):
+ return False
+ else:
+ return True
+ finally:
+ _recursion_stack.remove(self)
+
+class User(Base):pass
+class Order(Base):pass
+class Item(Base):pass
+class Keyword(Base):pass
+class Address(Base):pass
+
+metadata = MetaData()
+
+users = Table('users', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('name', String(30), nullable=False))
+
+orders = Table('orders', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('user_id', None, ForeignKey('users.id')),
+ Column('address_id', None, ForeignKey('addresses.id')),
+ Column('description', String(30)),
+ Column('isopen', Integer)
+ )
+
+addresses = Table('addresses', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('user_id', None, ForeignKey('users.id')),
+ Column('email_address', String(50), nullable=False))
+
+items = Table('items', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('description', String(30), nullable=False)
+ )
+
+order_items = Table('order_items', metadata,
+ Column('item_id', None, ForeignKey('items.id')),
+ Column('order_id', None, ForeignKey('orders.id')))
+
+item_keywords = Table('item_keywords', metadata,
+ Column('item_id', None, ForeignKey('items.id')),
+ Column('keyword_id', None, ForeignKey('keywords.id')))
+
+keywords = Table('keywords', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('name', String(30), nullable=False)
+ )
+
+def install_fixture_data():
+ users.insert().execute(
+ dict(id = 7, name = 'jack'),
+ dict(id = 8, name = 'ed'),
+ dict(id = 9, name = 'fred'),
+ dict(id = 10, name = 'chuck'),
+
+ )
+ addresses.insert().execute(
+ dict(id = 1, user_id = 7, email_address = "jack@bean.com"),
+ dict(id = 2, user_id = 8, email_address = "ed@wood.com"),
+ dict(id = 3, user_id = 8, email_address = "ed@bettyboop.com"),
+ dict(id = 4, user_id = 8, email_address = "ed@lala.com"),
+ dict(id = 5, user_id = 9, email_address = "fred@fred.com"),
+ )
+ orders.insert().execute(
+ dict(id = 1, user_id = 7, description = 'order 1', isopen=0, address_id=1),
+ dict(id = 2, user_id = 9, description = 'order 2', isopen=0, address_id=4),
+ dict(id = 3, user_id = 7, description = 'order 3', isopen=1, address_id=1),
+ dict(id = 4, user_id = 9, description = 'order 4', isopen=1, address_id=4),
+ dict(id = 5, user_id = 7, description = 'order 5', isopen=0, address_id=1)
+ )
+ items.insert().execute(
+ dict(id=1, description='item 1'),
+ dict(id=2, description='item 2'),
+ dict(id=3, description='item 3'),
+ dict(id=4, description='item 4'),
+ dict(id=5, description='item 5'),
+ )
+ order_items.insert().execute(
+ dict(item_id=1, order_id=1),
+ dict(item_id=2, order_id=1),
+ dict(item_id=3, order_id=1),
+
+ dict(item_id=1, order_id=2),
+ dict(item_id=2, order_id=2),
+ dict(item_id=3, order_id=2),
+
+ dict(item_id=3, order_id=3),
+ dict(item_id=4, order_id=3),
+ dict(item_id=5, order_id=3),
+
+ dict(item_id=1, order_id=4),
+ dict(item_id=5, order_id=4),
+
+ dict(item_id=5, order_id=5),
+ )
+ keywords.insert().execute(
+ dict(id=1, name='blue'),
+ dict(id=2, name='red'),
+ dict(id=3, name='green'),
+ dict(id=4, name='big'),
+ dict(id=5, name='small'),
+ dict(id=6, name='round'),
+ dict(id=7, name='square')
+ )
+
+ # this many-to-many table has the keywords inserted
+ # in primary key order, to appease the unit tests.
+ # this is because postgres, oracle, and sqlite all support
+ # true insert-order row id, but of course our pal MySQL does not,
+ # so the best it can do is order by, well something, so there you go.
+ item_keywords.insert().execute(
+ dict(keyword_id=2, item_id=1),
+ dict(keyword_id=2, item_id=2),
+ dict(keyword_id=4, item_id=1),
+ dict(keyword_id=6, item_id=1),
+ dict(keyword_id=5, item_id=2),
+ dict(keyword_id=3, item_id=3),
+ dict(keyword_id=4, item_id=3),
+ dict(keyword_id=7, item_id=2),
+ dict(keyword_id=6, item_id=3)
+ )
+
+class Fixtures(object):
+ @property
+ def user_address_result(self):
+ return [
+ User(id=7, addresses=[
+ Address(id=1)
+ ]),
+ User(id=8, addresses=[
+ Address(id=2, email_address='ed@wood.com'),
+ Address(id=3, email_address='ed@bettyboop.com'),
+ Address(id=4, email_address='ed@lala.com'),
+ ]),
+ User(id=9, addresses=[
+ Address(id=5)
+ ]),
+ User(id=10, addresses=[])
+ ]
+
+ @property
+ def user_all_result(self):
+ return [
+ User(id=7, addresses=[
+ Address(id=1)
+ ], orders=[
+ Order(description='order 1', items=[Item(description='item 1'), Item(description='item 2'), Item(description='item 3')]),
+ Order(description='order 3'),
+ Order(description='order 5'),
+ ]),
+ User(id=8, addresses=[
+ Address(id=2),
+ Address(id=3),
+ Address(id=4)
+ ]),
+ User(id=9, addresses=[
+ Address(id=5)
+ ], orders=[
+ Order(description='order 2', items=[Item(description='item 1'), Item(description='item 2'), Item(description='item 3')]),
+ Order(description='order 4', items=[Item(description='item 1'), Item(description='item 5')]),
+ ]),
+ User(id=10, addresses=[])
+ ]
+
+ @property
+ def user_order_result(self):
+ return [
+ User(id=7, orders=[
+ Order(id=1, items=[Item(id=1), Item(id=2), Item(id=3)]),
+ Order(id=3, items=[Item(id=3), Item(id=4), Item(id=5)]),
+ Order(id=5, items=[Item(id=5)]),
+ ]),
+ User(id=8, orders=[]),
+ User(id=9, orders=[
+ Order(id=2, items=[Item(id=1), Item(id=2), Item(id=3)]),
+ Order(id=4, items=[Item(id=1), Item(id=5)]),
+ ]),
+ User(id=10)
+ ]
+
+ @property
+ def item_keyword_result(self):
+ return [
+ Item(id=1, keywords=[Keyword(name='red'), Keyword(name='big'), Keyword(name='round')]),
+ Item(id=2, keywords=[Keyword(name='red'), Keyword(name='small'), Keyword(name='square')]),
+ Item(id=3, keywords=[Keyword(name='green'), Keyword(name='big'), Keyword(name='round')]),
+ Item(id=4, keywords=[]),
+ Item(id=5, keywords=[]),
+ ]
+fixtures = Fixtures()
--- /dev/null
+"""basic tests of lazy loaded attributes"""
+
+from sqlalchemy import *
+from sqlalchemy.orm import *
+import testbase
+
+from fixtures import *
+from query import QueryTest
+
+class LazyTest(QueryTest):
+ keep_mappers = False
+
+ def setup_mappers(self):
+ pass
+
+ def test_basic(self):
+ mapper(User, users, properties={
+ 'addresses':relation(mapper(Address, addresses), lazy=True)
+ })
+ sess = create_session()
+ q = sess.query(User)
+ assert [User(id=7, addresses=[Address(id=1, email_address='jack@bean.com')])] == q.filter(users.c.id == 7).all()
+
+ def test_bindstosession(self):
+ """test that lazy loaders use the mapper's contextual session if the parent instance
+ is not in a session, and that an error is raised if no contextual session"""
+
+ from sqlalchemy.ext.sessioncontext import SessionContext
+ ctx = SessionContext(create_session)
+ m = mapper(User, users, properties = dict(
+ addresses = relation(mapper(Address, addresses, extension=ctx.mapper_extension), lazy=True)
+ ), extension=ctx.mapper_extension)
+ q = ctx.current.query(m)
+ u = q.filter(users.c.id == 7).first()
+ ctx.current.expunge(u)
+ assert User(id=7, addresses=[Address(id=1, email_address='jack@bean.com')]) == u
+
+ clear_mappers()
+
+ mapper(User, users, properties={
+ 'addresses':relation(mapper(Address, addresses), lazy=True)
+ })
+ try:
+ sess = create_session()
+ q = sess.query(User)
+ u = q.filter(users.c.id == 7).first()
+ sess.expunge(u)
+ assert User(id=7, addresses=[Address(id=1, email_address='jack@bean.com')]) == u
+ except exceptions.InvalidRequestError, err:
+ assert "not bound to a Session, and no contextual session" in str(err)
+
+ def test_orderby(self):
+ mapper(User, users, properties = {
+ 'addresses':relation(mapper(Address, addresses), lazy=True, order_by=addresses.c.email_address),
+ })
+ q = create_session().query(User)
+ assert [
+ User(id=7, addresses=[
+ Address(id=1)
+ ]),
+ User(id=8, addresses=[
+ Address(id=3, email_address='ed@bettyboop.com'),
+ Address(id=4, email_address='ed@lala.com'),
+ Address(id=2, email_address='ed@wood.com')
+ ]),
+ User(id=9, addresses=[
+ Address(id=5)
+ ]),
+ User(id=10, addresses=[])
+ ] == q.all()
+
+ def test_orderby_secondary(self):
+ """tests that a regular mapper select on a single table can order by a relation to a second table"""
+
+ mapper(Address, addresses)
+
+ mapper(User, users, properties = dict(
+ addresses = relation(Address, lazy=True),
+ ))
+ q = create_session().query(User)
+ l = q.filter(users.c.id==addresses.c.user_id).order_by(addresses.c.email_address).all()
+ assert [
+ User(id=8, addresses=[
+ Address(id=2, email_address='ed@wood.com'),
+ Address(id=3, email_address='ed@bettyboop.com'),
+ Address(id=4, email_address='ed@lala.com'),
+ ]),
+ User(id=9, addresses=[
+ Address(id=5)
+ ]),
+ User(id=7, addresses=[
+ Address(id=1)
+ ]),
+ ] == l
+
+ def test_orderby_desc(self):
+ mapper(Address, addresses)
+
+ mapper(User, users, properties = dict(
+ addresses = relation(Address, lazy=True, order_by=[desc(addresses.c.email_address)]),
+ ))
+ sess = create_session()
+ assert [
+ User(id=7, addresses=[
+ Address(id=1)
+ ]),
+ User(id=8, addresses=[
+ Address(id=2, email_address='ed@wood.com'),
+ Address(id=4, email_address='ed@lala.com'),
+ Address(id=3, email_address='ed@bettyboop.com'),
+ ]),
+ User(id=9, addresses=[
+ Address(id=5)
+ ]),
+ User(id=10, addresses=[])
+ ] == sess.query(User).all()
+
+ def test_no_orphan(self):
+ """test that a lazily loaded child object is not marked as an orphan"""
+
+ mapper(User, users, properties={
+ 'addresses':relation(Address, cascade="all,delete-orphan", lazy=True)
+ })
+ mapper(Address, addresses)
+
+ sess = create_session()
+ user = sess.query(User).get(7)
+ assert getattr(User, 'addresses').hasparent(user.addresses[0], optimistic=True)
+ assert not class_mapper(Address)._is_orphan(user.addresses[0])
+
+ def test_limit(self):
+ """test limit operations combined with lazy-load relationships."""
+
+ mapper(Item, items)
+ mapper(Order, orders, properties={
+ 'items':relation(Item, secondary=order_items, lazy=True)
+ })
+ mapper(User, users, properties={
+ 'addresses':relation(mapper(Address, addresses), lazy=True),
+ 'orders':relation(Order, lazy=True)
+ })
+
+ sess = create_session()
+ q = sess.query(User)
+
+ if testbase.db.engine.name == 'mssql':
+ l = q.limit(2).all()
+ assert fixtures.user_all_result[:2] == l
+ else:
+ l = q.limit(2).offset(1).all()
+ assert fixtures.user_all_result[1:3] == l
+
+ def test_distinct(self):
+ mapper(Item, items)
+ mapper(Order, orders, properties={
+ 'items':relation(Item, secondary=order_items, lazy=True)
+ })
+ mapper(User, users, properties={
+ 'addresses':relation(mapper(Address, addresses), lazy=True),
+ 'orders':relation(Order, lazy=True)
+ })
+
+ sess = create_session()
+ q = sess.query(User)
+
+ # use a union all to get a lot of rows to join against
+ u2 = users.alias('u2')
+ s = union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u')
+ print [key for key in s.c.keys()]
+ l = q.filter(s.c.u2_id==User.c.id).distinct().all()
+ assert fixtures.user_all_result == l
+
+ def test_one_to_many_scalar(self):
+ mapper(User, users, properties = dict(
+ address = relation(mapper(Address, addresses), lazy=True, uselist=False)
+ ))
+ q = create_session().query(User)
+ l = q.filter(users.c.id == 7).all()
+ assert [User(id=7, address=Address(id=1))] == l
+
+ def test_double(self):
+ """tests lazy loading with two relations simulatneously, from the same table, using aliases. """
+ openorders = alias(orders, 'openorders')
+ closedorders = alias(orders, 'closedorders')
+
+ mapper(Address, addresses)
+
+ mapper(User, users, properties = dict(
+ addresses = relation(Address, lazy = True),
+ open_orders = relation(mapper(Order, openorders, entity_name='open'), primaryjoin = and_(openorders.c.isopen == 1, users.c.id==openorders.c.user_id), lazy=True),
+ closed_orders = relation(mapper(Order, closedorders,entity_name='closed'), primaryjoin = and_(closedorders.c.isopen == 0, users.c.id==closedorders.c.user_id), lazy=True)
+ ))
+ q = create_session().query(User)
+
+ assert [
+ User(
+ id=7,
+ addresses=[Address(id=1)],
+ open_orders = [Order(id=3)],
+ closed_orders = [Order(id=1), Order(id=5)]
+ ),
+ User(
+ id=8,
+ addresses=[Address(id=2), Address(id=3), Address(id=4)],
+ open_orders = [],
+ closed_orders = []
+ ),
+ User(
+ id=9,
+ addresses=[Address(id=5)],
+ open_orders = [Order(id=4)],
+ closed_orders = [Order(id=2)]
+ ),
+ User(id=10)
+
+ ] == q.all()
+
+ def test_many_to_many(self):
+
+ mapper(Keyword, keywords)
+ mapper(Item, items, properties = dict(
+ keywords = relation(Keyword, secondary=item_keywords, lazy=True),
+ ))
+
+ q = create_session().query(Item)
+ assert fixtures.item_keyword_result == q.all()
+
+ assert fixtures.item_keyword_result[0:2] == q.join('keywords').filter(keywords.c.name == 'red').all()
+
+ def test_uses_get(self):
+ """test that a simple many-to-one lazyload optimizes to use query.get()."""
+
+ mapper(Address, addresses, properties = dict(
+ user = relation(mapper(User, users), lazy=True)
+ ))
+
+ sess = create_session()
+
+ # load address
+ a1 = sess.query(Address).filter_by(email_address="ed@wood.com").one()
+
+ # load user that is attached to the address
+ u1 = sess.query(User).get(8)
+
+ def go():
+ # lazy load of a1.user should get it from the session
+ assert a1.user is u1
+ self.assert_sql_count(testbase.db, go, 0)
+
+ def test_many_to_one(self):
+ mapper(Address, addresses, properties = dict(
+ user = relation(mapper(User, users), lazy=True)
+ ))
+ sess = create_session()
+ q = sess.query(Address)
+ a = q.filter(addresses.c.id==1).one()
+
+ assert a.user is not None
+
+ u1 = sess.query(User).get(7)
+
+ assert a.user is u1
+
+if __name__ == '__main__':
+ testbase.main()
)
-class LazyTest(MapperSuperTest):
-
- def testbasic(self):
- """tests a basic one-to-many lazy load"""
- m = mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses), lazy = True)
- ))
- q = create_session().query(m)
- l = q.select(users.c.user_id == 7)
- self.assert_result(l, User,
- {'user_id' : 7, 'addresses' : (Address, [{'address_id' : 1}])},
- )
-
- def testbindstosession(self):
- ctx = SessionContext(create_session)
- m = mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses, extension=ctx.mapper_extension), lazy=True)
- ), extension=ctx.mapper_extension)
- q = ctx.current.query(m)
- u = q.filter(users.c.user_id == 7).selectfirst()
- ctx.current.expunge(u)
- self.assert_result([u], User,
- {'user_id' : 7, 'addresses' : (Address, [{'address_id' : 1}])},
- )
-
- def testcreateinstance(self):
- class Ext(MapperExtension):
- def create_instance(self, *args, **kwargs):
- return User()
- m = mapper(Address, addresses)
- m = mapper(User, users, extension=Ext(), properties = dict(
- addresses = relation(Address, lazy=True),
- ))
-
- q = create_session().query(m)
- l = q.select();
- self.assert_result(l, User, *user_address_result)
-
- def testorderby(self):
- m = mapper(Address, addresses)
-
- m = mapper(User, users, properties = dict(
- addresses = relation(m, lazy = True, order_by=addresses.c.email_address),
- ))
- q = create_session().query(m)
- l = q.select()
-
- self.assert_result(l, User,
- {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])},
- {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@bettyboop.com'}, {'email_address':'ed@lala.com'}, {'email_address':'ed@wood.com'}])},
- {'user_id' : 9, 'addresses' : (Address, [])}
- )
-
- def testorderby_select(self):
- """tests that a regular mapper select on a single table can order by a relation to a second table"""
- m = mapper(Address, addresses)
-
- m = mapper(User, users, properties = dict(
- addresses = relation(m, lazy = True),
- ))
- q = create_session().query(m)
- l = q.select(users.c.user_id==addresses.c.user_id, order_by=addresses.c.email_address)
-
- self.assert_result(l, User,
- {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@wood.com'}, {'email_address':'ed@bettyboop.com'}, {'email_address':'ed@lala.com'}, ])},
- {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])},
- )
-
- def testorderby_desc(self):
- m = mapper(Address, addresses)
-
- m = mapper(User, users, properties = dict(
- addresses = relation(m, lazy = True, order_by=[desc(addresses.c.email_address)]),
- ))
- q = create_session().query(m)
- l = q.select()
-
- self.assert_result(l, User,
- {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])},
- {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@wood.com'}, {'email_address':'ed@lala.com'}, {'email_address':'ed@bettyboop.com'}])},
- {'user_id' : 9, 'addresses' : (Address, [])},
- )
-
- def testorphanstate(self):
- """test that a lazily loaded child object is not marked as an orphan"""
- m = mapper(User, users, properties={
- 'addresses':relation(Address, cascade="all,delete-orphan", lazy=True)
- })
- mapper(Address, addresses)
-
- q = create_session().query(m)
- user = q.get(7)
- assert getattr(User, 'addresses').hasparent(user.addresses[0], optimistic=True)
- assert not class_mapper(Address)._is_orphan(user.addresses[0])
-
- def testlimit(self):
- ordermapper = mapper(Order, orders, properties = dict(
- items = relation(mapper(Item, orderitems), lazy = True)
- ))
-
- m = mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses), lazy = True),
- orders = relation(ordermapper, primaryjoin = users.c.user_id==orders.c.user_id, lazy = True),
- ))
- sess= create_session()
- q = sess.query(m)
-
- if db.engine.name == 'mssql':
- l = q.select(limit=2)
- self.assert_result(l, User, *user_all_result[:2])
- else:
- l = q.select(limit=2, offset=1)
- self.assert_result(l, User, *user_all_result[1:3])
-
- # use a union all to get a lot of rows to join against
- u2 = users.alias('u2')
- s = union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u')
- print [key for key in s.c.keys()]
- l = q.select(s.c.u2_user_id==User.c.user_id, distinct=True)
- self.assert_result(l, User, *user_all_result)
-
- sess.clear()
- clear_mappers()
- m = mapper(Item, orderitems, properties = dict(
- keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = True),
- ))
-
- l = sess.query(m).select((Item.c.item_name=='item 2') | (Item.c.item_name=='item 5') | (Item.c.item_name=='item 3'), order_by=[Item.c.item_id], limit=2)
- self.assert_result(l, Item, *[item_keyword_result[1], item_keyword_result[2]])
-
- def testonetoone(self):
- m = mapper(User, users, properties = dict(
- address = relation(mapper(Address, addresses), lazy = True, uselist = False)
- ))
- q = create_session().query(m)
- l = q.select(users.c.user_id == 7)
- self.assert_result(l, User, {'user_id':7, 'address' : (Address, {'address_id':1})})
-
- def testbackwardsonetoone(self):
- m = mapper(Address, addresses, properties = dict(
- user = relation(mapper(User, users), lazy = True)
- ))
- q = create_session().query(m)
- l = q.select(addresses.c.address_id == 1)
- self.echo(repr(l))
- print repr(l[0].__dict__)
- self.echo(repr(l[0].user))
- self.assert_(l[0].user is not None)
-
- def testuseget(self):
- """test that a simple many-to-one lazyload optimizes to use query.get().
-
- this is done currently by comparing the 'get' SQL clause of the query
- to the 'lazy' SQL clause of the lazy loader, so it relies heavily on
- ClauseElement.compare()"""
-
- m = mapper(Address, addresses, properties = dict(
- user = relation(mapper(User, users), lazy = True)
- ))
- sess = create_session()
- a1 = sess.query(Address).get_by(email_address = "ed@wood.com")
- u1 = sess.query(User).get(8)
- def go():
- assert a1.user is u1
- self.assert_sql_count(db, go, 0)
-
- def testdouble(self):
- """tests lazy loading with two relations simulatneously, from the same table, using aliases. """
- openorders = alias(orders, 'openorders')
- closedorders = alias(orders, 'closedorders')
- m = mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses), lazy = True),
- open_orders = relation(mapper(Order, openorders, entity_name='open'), primaryjoin = and_(openorders.c.isopen == 1, users.c.user_id==openorders.c.user_id), lazy = True),
- closed_orders = relation(mapper(Order, closedorders,entity_name='closed'), primaryjoin = and_(closedorders.c.isopen == 0, users.c.user_id==closedorders.c.user_id), lazy = True)
- ))
- q = create_session().query(m)
- l = q.select()
- self.assert_result(l, User,
- {'user_id' : 7,
- 'addresses' : (Address, [{'address_id' : 1}]),
- 'open_orders' : (Order, [{'order_id' : 3}]),
- 'closed_orders' : (Order, [{'order_id' : 1},{'order_id' : 5},])
- },
- {'user_id' : 8,
- 'addresses' : (Address, [{'address_id' : 2}, {'address_id' : 3}, {'address_id' : 4}]),
- 'open_orders' : (Order, []),
- 'closed_orders' : (Order, [])
- },
- {'user_id' : 9,
- 'addresses' : (Address, []),
- 'open_orders' : (Order, [{'order_id' : 4}]),
- 'closed_orders' : (Order, [{'order_id' : 2}])
- }
- )
-
- def testmanytomany(self):
- """tests a many-to-many lazy load"""
- mapper(Item, orderitems, properties = dict(
- keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = True),
- ))
- q = create_session().query(Item)
- l = q.select()
- self.assert_result(l, Item,
- {'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])},
- {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])},
- {'item_id' : 3, 'keywords' : (Keyword, [{'keyword_id' : 3}, {'keyword_id' : 4}, {'keyword_id' : 6}])},
- {'item_id' : 4, 'keywords' : (Keyword, [])},
- {'item_id' : 5, 'keywords' : (Keyword, [])}
- )
- l = q.select(and_(keywords.c.name == 'red', keywords.c.keyword_id == itemkeywords.c.keyword_id, Item.c.item_id==itemkeywords.c.item_id))
- self.assert_result(l, Item,
- {'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])},
- {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])},
- )
-
-
-
-class EagerTest(MapperSuperTest):
- def testbasic(self):
- """tests a basic one-to-many eager load"""
- m = mapper(Address, addresses)
-
- m = mapper(User, users, properties = dict(
- addresses = relation(m, lazy = False),
- ))
- q = create_session().query(m)
- l = q.select()
- self.assert_result(l, User, *user_address_result)
-
- def testorderby(self):
- m = mapper(Address, addresses)
-
- m = mapper(User, users, properties = dict(
- addresses = relation(m, lazy = False, order_by=addresses.c.email_address),
- ))
- q = create_session().query(m)
- l = q.select()
- self.assert_result(l, User,
- {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])},
- {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@bettyboop.com'}, {'email_address':'ed@lala.com'}, {'email_address':'ed@wood.com'}])},
- {'user_id' : 9, 'addresses' : (Address, [])}
- )
-
-
- def testorderby_desc(self):
- m = mapper(Address, addresses)
-
- m = mapper(User, users, properties = dict(
- addresses = relation(m, lazy = False, order_by=[desc(addresses.c.email_address)]),
- ))
- q = create_session().query(m)
- l = q.select()
-
- self.assert_result(l, User,
- {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])},
- {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@wood.com'},{'email_address':'ed@lala.com'}, {'email_address':'ed@bettyboop.com'}, ])},
- {'user_id' : 9, 'addresses' : (Address, [])},
- )
-
- def testlimit(self):
- ordermapper = mapper(Order, orders, properties = dict(
- items = relation(mapper(Item, orderitems), lazy = False)
- ))
-
- m = mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses), lazy = False),
- orders = relation(ordermapper, primaryjoin = users.c.user_id==orders.c.user_id, lazy = False),
- ))
- sess = create_session()
- q = sess.query(m)
-
- if db.engine.name == 'mssql':
- l = q.select(limit=2)
- self.assert_result(l, User, *user_all_result[:2])
- else:
- l = q.select(limit=2, offset=1)
- self.assert_result(l, User, *user_all_result[1:3])
-
- # this is an involved 3x union of the users table to get a lot of rows.
- # then see if the "distinct" works its way out. you actually get the same
- # result with or without the distinct, just via less or more rows.
- u2 = users.alias('u2')
- s = union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u')
- l = q.select(s.c.u2_user_id==User.c.user_id, distinct=True)
- self.assert_result(l, User, *user_all_result)
- sess.clear()
- clear_mappers()
- m = mapper(Item, orderitems, properties = dict(
- keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = False, order_by=[keywords.c.keyword_id]),
- ))
- q = sess.query(m)
- l = q.select((Item.c.item_name=='item 2') | (Item.c.item_name=='item 5') | (Item.c.item_name=='item 3'), order_by=[Item.c.item_id], limit=2)
- self.assert_result(l, Item, *[item_keyword_result[1], item_keyword_result[2]])
-
- def testmorelimit(self):
- """test that the ORDER BY is propigated from the inner select to the outer select, when using the
- 'wrapped' select statement resulting from the combination of eager loading and limit/offset clauses."""
- ordermapper = mapper(Order, orders, properties = dict(
- items = relation(mapper(Item, orderitems), lazy = False)
- ))
-
- m = mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses), lazy = False),
- orders = relation(ordermapper, primaryjoin = users.c.user_id==orders.c.user_id, lazy = False),
- ))
- sess = create_session()
- q = sess.query(m)
-
- if db.engine.name != 'mssql':
- l = q.select(q.join_to('orders'), order_by=desc(orders.c.user_id), limit=2, offset=1)
- self.assert_result(l, User, *(user_all_result[2], user_all_result[0]))
-
- def testonetoone(self):
- m = mapper(User, users, properties = dict(
- address = relation(mapper(Address, addresses), lazy = False, uselist = False)
- ))
- q = create_session().query(m)
- l = q.select(users.c.user_id == 7)
- self.assert_result(l, User,
- {'user_id' : 7, 'address' : (Address, {'address_id' : 1, 'email_address': 'jack@bean.com'})},
- )
-
- def testbackwardsonetoone(self):
- m = mapper(Address, addresses, properties = dict(
- user = relation(mapper(User, users), lazy = False)
- )).compile()
- self.echo(repr(m.props['user'].uselist))
- q = create_session().query(m)
- l = q.select(addresses.c.address_id == 1)
- self.assert_result(l, Address,
- {'address_id' : 1, 'email_address' : 'jack@bean.com',
- 'user' : (User, {'user_id' : 7, 'user_name' : 'jack'})
- },
- )
-
- def testorphanstate(self):
- """test that an eagerly loaded child object is not marked as an orphan"""
- m = mapper(User, users, properties={
- 'addresses':relation(Address, cascade="all,delete-orphan", lazy=False)
- })
- mapper(Address, addresses)
-
- s = create_session()
- q = s.query(m)
- user = q.get(7)
- assert getattr(User, 'addresses').hasparent(user.addresses[0], optimistic=True)
- assert not class_mapper(Address)._is_orphan(user.addresses[0])
-
- def testwithrepeat(self):
- """tests a one-to-many eager load where we also query on joined criterion, where the joined
- criterion is using the same tables that are used within the eager load. the mapper must insure that the
- criterion doesnt interfere with the eager load criterion."""
- m = mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses), primaryjoin = users.c.user_id==addresses.c.user_id, lazy = False)
- ))
- q = create_session().query(m)
- l = q.select(and_(addresses.c.email_address == 'ed@lala.com', addresses.c.user_id==users.c.user_id))
- self.assert_result(l, User,
- {'user_id' : 8, 'addresses' : (Address, [{'address_id' : 2, 'email_address':'ed@wood.com'}, {'address_id':3, 'email_address':'ed@bettyboop.com'}, {'address_id':4, 'email_address':'ed@lala.com'}])},
- )
-
- def testcircular(self):
- """test that a circular eager relationship breaks the cycle with a lazy loader"""
- m = mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses), lazy=False, backref=backref('user', lazy=False))
- ))
- assert class_mapper(User).props['addresses'].lazy is False
- assert class_mapper(Address).props['user'].lazy is False
- session = create_session()
- l = session.query(User).select()
- self.assert_result(l, User, *user_address_result)
-
- def testcompile(self):
- """tests deferred operation of a pre-compiled mapper statement"""
- session = create_session()
- m = mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses), lazy = False)
- ))
- s = session.query(m).compile(and_(addresses.c.email_address == bindparam('emailad'), addresses.c.user_id==users.c.user_id))
- c = s.compile()
- self.echo("\n" + str(c) + repr(c.get_params()))
-
- l = m.instances(s.execute(emailad = 'jack@bean.com'), session)
- self.echo(repr(l))
-
- def testonselect(self):
- """test eager loading of a mapper which is against a select"""
-
- s = select([orders], orders.c.isopen==1).alias('openorders')
- mapper(Order, s, properties={
- 'user':relation(User, lazy=False)
- })
- mapper(User, users)
-
- q = create_session().query(Order)
- self.assert_result(q.list(), Order,
- {'order_id':3, 'user' : (User, {'user_id':7})},
- {'order_id':4, 'user' : (User, {'user_id':9})},
- )
-
- q = q.select_from(s.outerjoin(orderitems)).filter(orderitems.c.item_name != 'item 2')
- self.assert_result(q.list(), Order,
- {'order_id':3, 'user' : (User, {'user_id':7})},
- )
-
-
- def testmulti(self):
- """tests eager loading with two relations simultaneously"""
- m = mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses), primaryjoin = users.c.user_id==addresses.c.user_id, lazy = False),
- orders = relation(mapper(Order, orders), lazy = False),
- ))
- q = create_session().query(m)
- l = q.select()
- self.assert_result(l, User,
- {'user_id' : 7,
- 'addresses' : (Address, [{'address_id' : 1}]),
- 'orders' : (Order, [{'order_id' : 1}, {'order_id' : 3},{'order_id' : 5},])
- },
- {'user_id' : 8,
- 'addresses' : (Address, [{'address_id' : 2}, {'address_id' : 3}, {'address_id' : 4}]),
- 'orders' : (Order, [])
- },
- {'user_id' : 9,
- 'addresses' : (Address, []),
- 'orders' : (Order, [{'order_id' : 2},{'order_id' : 4}])
- }
- )
-
- def testdouble(self):
- """tests eager loading with two relations simultaneously, from the same table. """
- openorders = alias(orders, 'openorders')
- closedorders = alias(orders, 'closedorders')
- ordermapper = mapper(Order, orders)
- m = mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses), lazy = False),
- open_orders = relation(mapper(Order, openorders, non_primary=True), primaryjoin = and_(openorders.c.isopen == 1, users.c.user_id==openorders.c.user_id), lazy = False),
- closed_orders = relation(mapper(Order, closedorders, non_primary=True), primaryjoin = and_(closedorders.c.isopen == 0, users.c.user_id==closedorders.c.user_id), lazy = False)
- ))
- q = create_session().query(m)
- l = q.select()
- self.assert_result(l, User,
- {'user_id' : 7,
- 'addresses' : (Address, [{'address_id' : 1}]),
- 'open_orders' : (Order, [{'order_id' : 3}]),
- 'closed_orders' : (Order, [{'order_id' : 1},{'order_id' : 5},])
- },
- {'user_id' : 8,
- 'addresses' : (Address, [{'address_id' : 2}, {'address_id' : 3}, {'address_id' : 4}]),
- 'open_orders' : (Order, []),
- 'closed_orders' : (Order, [])
- },
- {'user_id' : 9,
- 'addresses' : (Address, []),
- 'open_orders' : (Order, [{'order_id' : 4}]),
- 'closed_orders' : (Order, [{'order_id' : 2}])
- }
- )
-
- def testdoublewithscalar(self):
- """tests eager loading with two relations from the same table, with one of them joining to the parent User. the other is the primary mapper. doesn't re-test addresses relation."""
- max_orders_by_user = select([func.max(orders.c.order_id).label('order_id')], group_by=[orders.c.user_id]).alias('max_orders_by_user')
- max_orders = orders.select(orders.c.order_id==max_orders_by_user.c.order_id).alias('max_orders')
- m = mapper(User, users, properties={
- 'orders':relation(mapper(Order, orders), backref='user', lazy=False),
- 'max_order':relation(mapper(Order, max_orders, non_primary=True), lazy=False, uselist=False)
- })
- q = create_session().query(m)
- l = q.select()
- self.assert_result(l, User,
- {'user_id' : 7,
- 'orders' : (Order, [{'order_id' : 1}, {'order_id' : 3},{'order_id' : 5},]),
- 'max_order' : (Order, {'order_id' : 5})
- },
- {'user_id' : 8,
- 'orders' : (Order, []),
- 'max_order' : None,
- },
- {'user_id' : 9,
- 'orders' : (Order, [{'order_id' : 2},{'order_id' : 4}]),
- 'max_order' : (Order, {'order_id' : 4})
- }
- )
-
- def testnested(self):
- """tests eager loading of a parent item with two types of child items,
- where one of those child items eager loads its own child items."""
- ordermapper = mapper(Order, orders, properties = dict(
- items = relation(mapper(Item, orderitems), lazy = False)
- ))
-
- m = mapper(User, users, properties = dict(
- addresses = relation(mapper(Address, addresses), lazy = False),
- orders = relation(ordermapper, primaryjoin = users.c.user_id==orders.c.user_id, lazy = False),
- ))
- q = create_session().query(m)
- l = q.select()
- self.assert_result(l, User, *user_all_result)
-
- def testmanytomany(self):
- items = orderitems
- m = mapper(Item, items, properties = dict(
- keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy=False, order_by=[keywords.c.keyword_id]),
- ))
- q = create_session().query(m)
- l = q.select()
- self.assert_result(l, Item, *item_keyword_result)
-
- l = q.select(and_(keywords.c.name == 'red', keywords.c.keyword_id == itemkeywords.c.keyword_id, items.c.item_id==itemkeywords.c.item_id))
- self.assert_result(l, Item,
- {'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])},
- {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])},
- )
-
-
- def testmanytomanyoptions(self):
- items = orderitems
-
- m = mapper(Item, items, properties = dict(
- keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy=True, order_by=[keywords.c.keyword_id]),
- ))
- q = create_session().query(m).options(eagerload('keywords'))
- def go():
- l = q.select()
- self.assert_result(l, Item, *item_keyword_result)
- self.assert_sql_count(db, go, 1)
-
- def go():
- l = q.select(and_(keywords.c.name == 'red', keywords.c.keyword_id == itemkeywords.c.keyword_id, items.c.item_id==itemkeywords.c.item_id))
- self.assert_result(l, Item,
- {'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])},
- {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])},
- )
- self.assert_sql_count(db, go, 1)
-
- def testoneandmany(self):
- """tests eager load for a parent object with a child object that
- contains a many-to-many relationship to a third object."""
- items = orderitems
-
- m = mapper(Item, items,
- properties = dict(
- keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = False, order_by=[keywords.c.keyword_id]),
- ))
-
- m = mapper(Order, orders, properties = dict(
- items = relation(m, lazy = False)
- ))
- q = create_session().query(m)
- l = q.select("orders.order_id in (1,2,3)")
- self.assert_result(l, Order,
- {'order_id' : 1, 'items': (Item, [])},
- {'order_id' : 2, 'items': (Item, [
- {'item_id':1, 'item_name':'item 1', 'keywords': (Keyword, [{'keyword_id':2, 'name':'red'}, {'keyword_id':4, 'name':'big'}, {'keyword_id' : 6, 'name':'round'}])},
- {'item_id':2, 'item_name':'item 2','keywords' : (Keyword, [{'keyword_id' : 2, 'name':'red'}, {'keyword_id' : 5, 'name':'small'}, {'keyword_id' : 7, 'name':'square'}])}
- ])},
- {'order_id' : 3, 'items': (Item, [
- {'item_id':3, 'item_name':'item 3', 'keywords' : (Keyword, [{'keyword_id' : 3, 'name':'green'}, {'keyword_id' : 4, 'name':'big'}, {'keyword_id' : 6, 'name':'round'}])},
- {'item_id':4, 'item_name':'item 4'},
- {'item_id':5, 'item_name':'item 5'}
- ])},
- )
-
-class InstancesTest(MapperSuperTest):
- def testcustomfromalias(self):
- mapper(User, users, properties={
- 'addresses':relation(Address, lazy=True)
- })
- mapper(Address, addresses)
- query = users.select(users.c.user_id==7).union(users.select(users.c.user_id>7)).alias('ulist').outerjoin(addresses).select(use_labels=True,order_by=['ulist.user_id', addresses.c.address_id])
- q = create_session().query(User)
-
- def go():
- l = q.options(contains_alias('ulist'), contains_eager('addresses')).instances(query.execute())
- self.assert_result(l, User, *user_address_result)
- self.assert_sql_count(testbase.db, go, 1)
-
- def testcustomeagerquery(self):
- mapper(User, users, properties={
- # setting lazy=True - the contains_eager() option below
- # should imply eagerload()
- 'addresses':relation(Address, lazy=True)
- })
- mapper(Address, addresses)
-
- selectquery = users.outerjoin(addresses).select(use_labels=True, order_by=[users.c.user_id, addresses.c.address_id])
- q = create_session().query(User)
-
- def go():
- l = q.options(contains_eager('addresses')).instances(selectquery.execute())
- self.assert_result(l, User, *user_address_result)
- self.assert_sql_count(testbase.db, go, 1)
-
- def testcustomeagerwithstringalias(self):
- mapper(User, users, properties={
- 'addresses':relation(Address, lazy=False)
- })
- mapper(Address, addresses)
-
- adalias = addresses.alias('adalias')
- selectquery = users.outerjoin(adalias).select(use_labels=True, order_by=[users.c.user_id, adalias.c.address_id])
- q = create_session().query(User)
-
- def go():
- l = q.options(contains_eager('addresses', alias="adalias")).instances(selectquery.execute())
- self.assert_result(l, User, *user_address_result)
- self.assert_sql_count(testbase.db, go, 1)
-
- def testcustomeagerwithalias(self):
- mapper(User, users, properties={
- 'addresses':relation(Address, lazy=False)
- })
- mapper(Address, addresses)
-
- adalias = addresses.alias('adalias')
- selectquery = users.outerjoin(adalias).select(use_labels=True, order_by=[users.c.user_id, adalias.c.address_id])
- q = create_session().query(User)
-
- def go():
- l = q.options(contains_eager('addresses', alias=adalias)).instances(selectquery.execute())
- self.assert_result(l, User, *user_address_result)
- self.assert_sql_count(testbase.db, go, 1)
-
- def testcustomeagerwithdecorator(self):
- mapper(User, users, properties={
- 'addresses':relation(Address, lazy=False)
- })
- mapper(Address, addresses)
-
- adalias = addresses.alias('adalias')
- selectquery = users.outerjoin(adalias).select(use_labels=True, order_by=[users.c.user_id, adalias.c.address_id])
- def decorate(row):
- d = {}
- for c in addresses.columns:
- d[c] = row[adalias.corresponding_column(c)]
- return d
-
- q = create_session().query(User)
-
- def go():
- l = q.options(contains_eager('addresses', decorator=decorate)).instances(selectquery.execute())
- self.assert_result(l, User, *user_address_result)
- self.assert_sql_count(testbase.db, go, 1)
-
- def testmultiplemappers(self):
- mapper(User, users, properties={
- 'addresses':relation(Address, lazy=True)
- })
- mapper(Address, addresses)
-
- sess = create_session()
-
- (user7, user8, user9) = sess.query(User).select()
- (address1, address2, address3, address4) = sess.query(Address).select()
-
- selectquery = users.outerjoin(addresses).select(use_labels=True, order_by=[users.c.user_id, addresses.c.address_id])
- q = sess.query(User)
- l = q.instances(selectquery.execute(), Address)
- # note the result is a cartesian product
- assert l == [
- (user7, address1),
- (user8, address2),
- (user8, address3),
- (user8, address4),
- (user9, None)
- ]
-
- def testmultipleonquery(self):
- mapper(User, users, properties={
- 'addresses':relation(Address, lazy=True)
- })
- mapper(Address, addresses)
- sess = create_session()
- (user7, user8, user9) = sess.query(User).select()
- (address1, address2, address3, address4) = sess.query(Address).select()
- q = sess.query(User)
- q = q.add_entity(Address).outerjoin('addresses')
- l = q.list()
- assert l == [
- (user7, address1),
- (user8, address2),
- (user8, address3),
- (user8, address4),
- (user9, None)
- ]
-
- def testcolumnonquery(self):
- mapper(User, users, properties={
- 'addresses':relation(Address, lazy=True)
- })
- mapper(Address, addresses)
-
- sess = create_session()
- (user7, user8, user9) = sess.query(User).select()
- q = sess.query(User)
- q = q.group_by([c for c in users.c]).order_by(User.c.user_id).outerjoin('addresses').add_column(func.count(addresses.c.address_id).label('count'))
- l = q.list()
- assert l == [
- (user7, 1),
- (user8, 3),
- (user9, 0)
- ], repr(l)
-
- def testmapperspluscolumn(self):
- mapper(User, users)
- s = select([users, func.count(addresses.c.address_id).label('count')], from_obj=[users.outerjoin(addresses)], group_by=[c for c in users.c], order_by=[users.c.user_id])
- sess = create_session()
- (user7, user8, user9) = sess.query(User).select()
- q = sess.query(User)
- l = q.instances(s.execute(), "count")
- assert l == [
- (user7, 1),
- (user8, 3),
- (user9, 0)
- ]
-
- def testmappersplustwocolumns(self):
- mapper(User, users)
-
- # Fixme ticket #475!
- if db.engine.name == 'mysql':
- col2 = func.concat("Name:", users.c.user_name).label('concat')
- else:
- col2 = ("Name:" + users.c.user_name).label('concat')
-
- s = select([users, func.count(addresses.c.address_id).label('count'), col2], from_obj=[users.outerjoin(addresses)], group_by=[c for c in users.c], order_by=[users.c.user_id])
- sess = create_session()
- (user7, user8, user9) = sess.query(User).select()
- q = sess.query(User)
- l = q.instances(s.execute(), "count", "concat")
- print l
- assert l == [
- (user7, 1, "Name:jack"),
- (user8, 3, "Name:ed"),
- (user9, 0, "Name:fred")
- ]
if __name__ == "__main__":
--- /dev/null
+from sqlalchemy import *
+from sqlalchemy.orm import *
+import testbase
+from fixtures import *
+
+class Base(object):
+ def __init__(self, **kwargs):
+ for k in kwargs:
+ setattr(self, k, kwargs[k])
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def __eq__(self, other):
+ """'passively' compare this object to another.
+
+ only look at attributes that are present on the source object.
+
+ """
+ # use __dict__ to avoid instrumented properties
+ for attr in self.__dict__.keys():
+ if attr[0] == '_':
+ continue
+ value = getattr(self, attr)
+ if hasattr(value, '__iter__') and not isinstance(value, basestring):
+ if len(value) == 0:
+ continue
+ for (us, them) in zip(value, getattr(other, attr)):
+ if us != them:
+ return False
+ else:
+ continue
+ else:
+ if value is not None:
+ if value != getattr(other, attr):
+ return False
+ else:
+ return True
+
+class QueryTest(testbase.ORMTest):
+ keep_mappers = True
+ keep_data = True
+
+ def setUpAll(self):
+ super(QueryTest, self).setUpAll()
+ install_fixture_data()
+ self.setup_mappers()
+
+ def tearDownAll(self):
+ clear_mappers()
+ super(QueryTest, self).tearDownAll()
+
+ def define_tables(self, meta):
+ # a slight dirty trick here.
+ meta.tables = metadata.tables
+ metadata.connect(meta.engine)
+
+ def setup_mappers(self):
+ mapper(User, users, properties={
+ 'addresses':relation(Address),
+ 'orders':relation(Order, backref='user'), # o2m, m2o
+ })
+ mapper(Address, addresses)
+ mapper(Order, orders, properties={
+ 'items':relation(Item, secondary=order_items), #m2m
+ 'address':relation(Address), # m2o
+ })
+ mapper(Item, items, properties={
+ 'keywords':relation(Keyword, secondary=item_keywords) #m2m
+ })
+ mapper(Keyword, keywords)
+
+
+class GetTest(QueryTest):
+ def test_get(self):
+ s = create_session()
+ assert s.query(User).get(19) is None
+ u = s.query(User).get(7)
+ u2 = s.query(User).get(7)
+ assert u is u2
+ s.clear()
+ u2 = s.query(User).get(7)
+ assert u is not u2
+
+ def test_unicode(self):
+ """test that Query.get properly sets up the type for the bind parameter. using unicode would normally fail
+ on postgres, mysql and oracle unless it is converted to an encoded string"""
+
+ table = Table('unicode_data', users.metadata,
+ Column('id', Unicode(40), primary_key=True),
+ Column('data', Unicode(40)))
+ table.create()
+ ustring = 'petit voix m\xe2\x80\x99a '.decode('utf-8')
+ table.insert().execute(id=ustring, data=ustring)
+ class LocalFoo(Base):pass
+ mapper(LocalFoo, table)
+ assert create_session().query(LocalFoo).get(ustring) == LocalFoo(id=ustring, data=ustring)
+
+class CompileTest(QueryTest):
+ def test_deferred(self):
+ session = create_session()
+ s = session.query(User).filter(and_(addresses.c.email_address == bindparam('emailad'), addresses.c.user_id==users.c.id)).compile()
+
+ l = session.query(User).instances(s.execute(emailad = 'jack@bean.com'))
+ assert [User(id=7)] == l
+
+class SliceTest(QueryTest):
+ def test_first(self):
+ assert User(id=7) == create_session().query(User).first()
+
+ assert create_session().query(User).filter(users.c.id==27).first() is None
+
+class FilterTest(QueryTest):
+ def test_basic(self):
+ assert [User(id=7), User(id=8), User(id=9),User(id=10)] == create_session().query(User).all()
+
+ def test_onefilter(self):
+ assert [User(id=8), User(id=9)] == create_session().query(User).filter(users.c.name.endswith('ed')).all()
+
+
+class ParentTest(QueryTest):
+ def test_o2m(self):
+ sess = create_session()
+ q = sess.query(User)
+
+ u1 = q.filter_by(name='jack').one()
+
+ # test auto-lookup of property
+ o = sess.query(Order).with_parent(u1).all()
+ assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o
+
+ # test with explicit property
+ o = sess.query(Order).with_parent(u1, property='orders').all()
+ assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o
+
+ # test static method
+ o = Query.query_from_parent(u1, property='orders', session=sess).all()
+ assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o
+
+ # test generative criterion
+ o = sess.query(Order).with_parent(u1).filter(orders.c.id>2).all()
+ assert [Order(description="order 3"), Order(description="order 5")] == o
+
+ def test_noparent(self):
+ sess = create_session()
+ q = sess.query(User)
+
+ u1 = q.filter_by(name='jack').one()
+
+ try:
+ q = sess.query(Item).with_parent(u1)
+ assert False
+ except exceptions.InvalidRequestError, e:
+ assert str(e) == "Could not locate a property which relates instances of class 'Item' to instances of class 'User'"
+
+ def test_m2m(self):
+ sess = create_session()
+ i1 = sess.query(Item).filter_by(id=2).one()
+ k = sess.query(Keyword).with_parent(i1).all()
+ assert [Keyword(name='red'), Keyword(name='small'), Keyword(name='square')] == k
+
+
+class JoinTest(QueryTest):
+ def test_overlapping_paths(self):
+ # load a user who has an order that contains item id 3 and address id 1 (order 3, owned by jack)
+ result = create_session().query(User).join(['orders', 'items']).filter_by(id=3).reset_joinpoint().join(['orders','address']).filter_by(id=1).all()
+ assert [User(id=7, name='jack')] == result
+
+ def test_overlapping_paths_outerjoin(self):
+ result = create_session().query(User).outerjoin(['orders', 'items']).filter_by(id=3).reset_joinpoint().outerjoin(['orders','address']).filter_by(id=1).all()
+ assert [User(id=7, name='jack')] == result
+
+ def test_overlap_with_aliases(self):
+ oalias = orders.alias('oalias')
+
+ result = create_session().query(User).select_from(users.join(oalias)).filter(oalias.c.description.in_("order 1", "order 2", "order 3")).join(['orders', 'items']).all()
+ assert [User(id=7, name='jack'), User(id=9, name='fred')] == result
+
+ result = create_session().query(User).select_from(users.join(oalias)).filter(oalias.c.description.in_("order 1", "order 2", "order 3")).join(['orders', 'items']).filter_by(id=4).all()
+ assert [User(id=7, name='jack')] == result
+
+
+class SynonymTest(QueryTest):
+ keep_mappers = True
+ keep_data = True
+
+ def setup_mappers(self):
+ mapper(User, users, properties={
+ 'name_syn':synonym('name'),
+ 'addresses':relation(Address),
+ 'orders':relation(Order, backref='user'), # o2m, m2o
+ 'orders_syn':synonym('orders')
+ })
+ mapper(Address, addresses)
+ mapper(Order, orders, properties={
+ 'items':relation(Item, secondary=order_items), #m2m
+ 'address':relation(Address), # m2o
+ 'items_syn':synonym('items')
+ })
+ mapper(Item, items, properties={
+ 'keywords':relation(Keyword, secondary=item_keywords) #m2m
+ })
+ mapper(Keyword, keywords)
+
+ def test_joins(self):
+ for j in (
+ ['orders', 'items'],
+ ['orders_syn', 'items'],
+ ['orders', 'items_syn'],
+ ['orders_syn', 'items_syn'],
+ ):
+ result = create_session().query(User).join(j).filter_by(id=3).all()
+ assert [User(id=7, name='jack'), User(id=9, name='fred')] == result
+
+ def test_with_parent(self):
+ for nameprop, orderprop in (
+ ('name', 'orders'),
+ ('name_syn', 'orders'),
+ ('name', 'orders_syn'),
+ ('name_syn', 'orders_syn'),
+ ):
+ sess = create_session()
+ q = sess.query(User)
+
+ u1 = q.filter_by(**{nameprop:'jack'}).one()
+
+ o = sess.query(Order).with_parent(u1, property=orderprop).all()
+ assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o
+
+class InstancesTest(QueryTest):
+
+ def test_from_alias(self):
+
+ query = users.select(users.c.id==7).union(users.select(users.c.id>7)).alias('ulist').outerjoin(addresses).select(use_labels=True,order_by=['ulist.id', addresses.c.id])
+ q = create_session().query(User)
+
+ def go():
+ l = q.options(contains_alias('ulist'), contains_eager('addresses')).instances(query.execute())
+ assert fixtures.user_address_result == l
+ self.assert_sql_count(testbase.db, go, 1)
+
+ def test_contains_eager(self):
+
+ selectquery = users.outerjoin(addresses).select(use_labels=True, order_by=[users.c.id, addresses.c.id])
+ q = create_session().query(User)
+
+ def go():
+ l = q.options(contains_eager('addresses')).instances(selectquery.execute())
+ assert fixtures.user_address_result == l
+ self.assert_sql_count(testbase.db, go, 1)
+
+ def test_contains_eager_alias(self):
+ adalias = addresses.alias('adalias')
+ selectquery = users.outerjoin(adalias).select(use_labels=True, order_by=[users.c.id, adalias.c.id])
+ q = create_session().query(User)
+
+ def go():
+ # test using a string alias name
+ l = q.options(contains_eager('addresses', alias="adalias")).instances(selectquery.execute())
+ assert fixtures.user_address_result == l
+ self.assert_sql_count(testbase.db, go, 1)
+
+ def go():
+ # test using the Alias object itself
+ l = q.options(contains_eager('addresses', alias=adalias)).instances(selectquery.execute())
+ assert fixtures.user_address_result == l
+ self.assert_sql_count(testbase.db, go, 1)
+
+ def decorate(row):
+ d = {}
+ for c in addresses.columns:
+ d[c] = row[adalias.corresponding_column(c)]
+ return d
+
+ def go():
+ # test using a custom 'decorate' function
+ l = q.options(contains_eager('addresses', decorator=decorate)).instances(selectquery.execute())
+ assert fixtures.user_address_result == l
+ self.assert_sql_count(testbase.db, go, 1)
+
+ def test_multi_mappers(self):
+ sess = create_session()
+
+ (user7, user8, user9, user10) = sess.query(User).all()
+ (address1, address2, address3, address4, address5) = sess.query(Address).all()
+
+ # note the result is a cartesian product
+ expected = [(user7, address1),
+ (user8, address2),
+ (user8, address3),
+ (user8, address4),
+ (user9, address5),
+ (user10, None)]
+
+ selectquery = users.outerjoin(addresses).select(use_labels=True, order_by=[users.c.id, addresses.c.id])
+ q = sess.query(User)
+ l = q.instances(selectquery.execute(), Address)
+ assert l == expected
+
+ q = sess.query(User)
+ q = q.add_entity(Address).outerjoin('addresses')
+ l = q.all()
+ assert l == expected
+
+ q = sess.query(User).add_entity(Address)
+ l = q.join('addresses').filter_by(email_address='ed@bettyboop.com').all()
+ assert l == [(user8, address3)]
+
+ def test_multi_columns(self):
+ sess = create_session()
+ (user7, user8, user9, user10) = sess.query(User).all()
+ expected = [(user7, 1),
+ (user8, 3),
+ (user9, 1),
+ (user10, 0)
+ ]
+
+ q = sess.query(User)
+ q = q.group_by([c for c in users.c]).order_by(User.c.id).outerjoin('addresses').add_column(func.count(addresses.c.id).label('count'))
+ l = q.all()
+ assert l == expected
+
+ s = select([users, func.count(addresses.c.id).label('count')], from_obj=[users.outerjoin(addresses)], group_by=[c for c in users.c], order_by=users.c.id)
+ q = sess.query(User)
+ l = q.instances(s.execute(), "count")
+ assert l == expected
+
+ @testbase.unsupported('mysql') # only because of "+" operator requiring "concat" in mysql (fix #475)
+ def test_two_columns(self):
+ sess = create_session()
+ (user7, user8, user9, user10) = sess.query(User).all()
+ expected = [
+ (user7, 1, "Name:jack"),
+ (user8, 3, "Name:ed"),
+ (user9, 1, "Name:fred"),
+ (user10, 0, "Name:chuck")]
+
+ s = select([users, func.count(addresses.c.id).label('count'), ("Name:" + users.c.name).label('concat')], from_obj=[users.outerjoin(addresses)], group_by=[c for c in users.c], order_by=[users.c.id])
+ q = create_session().query(User)
+ l = q.instances(s.execute(), "count", "concat")
+ assert l == expected
+
+
+if __name__ == '__main__':
+ testbase.main()
+
+