From: Mike Bayer Date: Thu, 12 Jul 2007 18:38:55 +0000 (+0000) Subject: - converted mapper.py unit test to 0.4's four separate mapper.py, query.py, eager_rel... X-Git-Tag: rel_0_3_9~28 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6c851956b549f30c8d5936ee542a1c1f2c5592df;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - converted mapper.py unit test to 0.4's four separate mapper.py, query.py, eager_relations.py, lazy_relations.py. tests 0.4 forwards compatibility for [ticket:631] - fixed "reset_joinpoint()" in query to actually work, when the same table appears in two join()s it reuses that same table as a joinpoint the way 0.4 does. --- diff --git a/CHANGES b/CHANGES index 9914f92954..6149e6f216 100644 --- a/CHANGES +++ b/CHANGES @@ -10,7 +10,13 @@ [ticket:597], and are constructed with a thunk instead - orm - forwards-compatibility with 0.4: added one(), first(), and - all() to Query + all() to Query. almost all Query functionality from 0.4 is + present in 0.3.9 for forwards-compat purposes. + - reset_joinpoint() really really works this time, promise ! lets + you re-join from the root: + query.join(['a', 'b']).filter().reset_joinpoint().\ + join(['a', 'c']).filter().all() + in 0.4 all join() calls start from the "root" - added synchronization to the mapper() construction step, to avoid thread collections when pre-existing mappers are compiling in a different thread [ticket:613] diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py index 9c3feacf05..dd73c2aeec 100644 --- a/lib/sqlalchemy/orm/query.py +++ b/lib/sqlalchemy/orm/query.py @@ -522,29 +522,43 @@ class Query(object): """ return self.filter(self._join_by(args, kwargs, start=self._joinpoint)) - def _join_to(self, prop, outerjoin=False): + def _join_to(self, prop, outerjoin=False, start=None): + if start is None: + start = self._joinpoint + if isinstance(prop, list): keys = prop else: - [keys,p] = self._locate_prop(prop, start=self._joinpoint) + [keys,p] = self._locate_prop(prop, start=start) + clause = self._from_obj[-1] - mapper = self._joinpoint + + currenttables = [clause] + class FindJoinedTables(sql.NoColumnVisitor): + def visit_join(self, join): + currenttables.append(join.left) + currenttables.append(join.right) + FindJoinedTables().traverse(clause) + + mapper = start for key in keys: prop = mapper.get_property(key, resolve_synonyms=True) if prop._is_self_referential(): raise exceptions.InvalidRequestError("Self-referential query on '%s' property must be constructed manually using an Alias object for the related table." % str(prop)) - if outerjoin: - if prop.secondary: - clause = clause.outerjoin(prop.secondary, prop.get_join(mapper, primary=True, secondary=False)) - clause = clause.outerjoin(prop.select_table, prop.get_join(mapper, primary=False)) - else: - clause = clause.outerjoin(prop.select_table, prop.get_join(mapper)) - else: - if prop.secondary: - clause = clause.join(prop.secondary, prop.get_join(mapper, primary=True, secondary=False)) - clause = clause.join(prop.select_table, prop.get_join(mapper, primary=False)) + # dont re-join to a table already in our from objects + if prop.select_table not in currenttables: + if outerjoin: + if prop.secondary: + clause = clause.outerjoin(prop.secondary, prop.get_join(mapper, primary=True, secondary=False)) + clause = clause.outerjoin(prop.select_table, prop.get_join(mapper, primary=False)) + else: + clause = clause.outerjoin(prop.select_table, prop.get_join(mapper)) else: - clause = clause.join(prop.select_table, prop.get_join(mapper)) + if prop.secondary: + clause = clause.join(prop.secondary, prop.get_join(mapper, primary=True, secondary=False)) + clause = clause.join(prop.select_table, prop.get_join(mapper, primary=False)) + else: + clause = clause.join(prop.select_table, prop.get_join(mapper)) mapper = prop.mapper return (clause, mapper) @@ -734,7 +748,7 @@ class Query(object): to be released in 0.4.""" q = self._clone() - q._joinpoint = q._mapper + q._joinpoint = q.mapper return q def select_from(self, from_obj): diff --git a/test/orm/alltests.py b/test/orm/alltests.py index 46eeb3ce92..35650c136f 100644 --- a/test/orm/alltests.py +++ b/test/orm/alltests.py @@ -5,6 +5,9 @@ def suite(): modules_to_test = ( 'orm.attributes', 'orm.mapper', + 'orm.query', + 'orm.lazy_relations', + 'orm.eager_relations', 'orm.generative', 'orm.lazytest1', 'orm.eagertest1', diff --git a/test/orm/eager_relations.py b/test/orm/eager_relations.py new file mode 100644 index 0000000000..37b5ecdf7e --- /dev/null +++ b/test/orm/eager_relations.py @@ -0,0 +1,401 @@ +"""basic tests of eager loaded attributes""" + +from sqlalchemy import * +from sqlalchemy.orm import * +import testbase + +from fixtures import * +from query import QueryTest + +class EagerTest(QueryTest): + keep_mappers = False + + def setup_mappers(self): + pass + + def test_basic(self): + mapper(User, users, properties={ + 'addresses':relation(mapper(Address, addresses), lazy=False) + }) + sess = create_session() + q = sess.query(User) + + assert [User(id=7, addresses=[Address(id=1, email_address='jack@bean.com')])] == q.filter(users.c.id == 7).all() + assert fixtures.user_address_result == q.all() + + def test_no_orphan(self): + """test that an eagerly loaded child object is not marked as an orphan""" + + mapper(User, users, properties={ + 'addresses':relation(Address, cascade="all,delete-orphan", lazy=False) + }) + mapper(Address, addresses) + + sess = create_session() + user = sess.query(User).get(7) + assert getattr(User, 'addresses').hasparent(user.addresses[0], optimistic=True) + assert not class_mapper(Address)._is_orphan(user.addresses[0]) + + def test_orderby(self): + mapper(User, users, properties = { + 'addresses':relation(mapper(Address, addresses), lazy=False, order_by=addresses.c.email_address), + }) + q = create_session().query(User) + assert [ + User(id=7, addresses=[ + Address(id=1) + ]), + User(id=8, addresses=[ + Address(id=3, email_address='ed@bettyboop.com'), + Address(id=4, email_address='ed@lala.com'), + Address(id=2, email_address='ed@wood.com') + ]), + User(id=9, addresses=[ + Address(id=5) + ]), + User(id=10, addresses=[]) + ] == q.all() + + def test_orderby_secondary(self): + """tests that a regular mapper select on a single table can order by a relation to a second table""" + + mapper(Address, addresses) + + mapper(User, users, properties = dict( + addresses = relation(Address, lazy=False), + )) + + q = create_session().query(User) + l = q.filter(users.c.id==addresses.c.user_id).order_by(addresses.c.email_address).all() + + assert [ + User(id=8, addresses=[ + Address(id=2, email_address='ed@wood.com'), + Address(id=3, email_address='ed@bettyboop.com'), + Address(id=4, email_address='ed@lala.com'), + ]), + User(id=9, addresses=[ + Address(id=5) + ]), + User(id=7, addresses=[ + Address(id=1) + ]), + ] == l + + def test_orderby_desc(self): + mapper(Address, addresses) + + mapper(User, users, properties = dict( + addresses = relation(Address, lazy=False, order_by=[desc(addresses.c.email_address)]), + )) + sess = create_session() + assert [ + User(id=7, addresses=[ + Address(id=1) + ]), + User(id=8, addresses=[ + Address(id=2, email_address='ed@wood.com'), + Address(id=4, email_address='ed@lala.com'), + Address(id=3, email_address='ed@bettyboop.com'), + ]), + User(id=9, addresses=[ + Address(id=5) + ]), + User(id=10, addresses=[]) + ] == sess.query(User).all() + + def test_many_to_many(self): + + mapper(Keyword, keywords) + mapper(Item, items, properties = dict( + keywords = relation(Keyword, secondary=item_keywords, lazy=False, order_by=keywords.c.id), + )) + + q = create_session().query(Item) + def go(): + assert fixtures.item_keyword_result == q.all() + self.assert_sql_count(testbase.db, go, 1) + + def go(): + assert fixtures.item_keyword_result[0:2] == q.join('keywords').filter(keywords.c.name == 'red').all() + self.assert_sql_count(testbase.db, go, 1) + + + def test_eager_option(self): + mapper(Keyword, keywords) + mapper(Item, items, properties = dict( + keywords = relation(Keyword, secondary=item_keywords, lazy=True), + )) + + q = create_session().query(Item) + + def go(): + assert fixtures.item_keyword_result[0:2] == q.options(eagerload('keywords')).join('keywords').filter(keywords.c.name == 'red').all() + + self.assert_sql_count(testbase.db, go, 1) + + def test_cyclical(self): + """test that a circular eager relationship breaks the cycle with a lazy loader""" + + mapper(Address, addresses) + mapper(User, users, properties = dict( + addresses = relation(Address, lazy=False, backref=backref('user', lazy=False)) + )) + assert class_mapper(User).get_property('addresses').lazy is False + assert class_mapper(Address).get_property('user').lazy is False + + sess = create_session() + assert fixtures.user_address_result == sess.query(User).all() + + def test_double(self): + """tests lazy loading with two relations simulatneously, from the same table, using aliases. """ + openorders = alias(orders, 'openorders') + closedorders = alias(orders, 'closedorders') + + mapper(Address, addresses) + + mapper(User, users, properties = dict( + addresses = relation(Address, lazy=False), + open_orders = relation(mapper(Order, openorders, entity_name='open'), primaryjoin = and_(openorders.c.isopen == 1, users.c.id==openorders.c.user_id), lazy=False), + closed_orders = relation(mapper(Order, closedorders,entity_name='closed'), primaryjoin = and_(closedorders.c.isopen == 0, users.c.id==closedorders.c.user_id), lazy=False) + )) + q = create_session().query(User) + + def go(): + assert [ + User( + id=7, + addresses=[Address(id=1)], + open_orders = [Order(id=3)], + closed_orders = [Order(id=1), Order(id=5)] + ), + User( + id=8, + addresses=[Address(id=2), Address(id=3), Address(id=4)], + open_orders = [], + closed_orders = [] + ), + User( + id=9, + addresses=[Address(id=5)], + open_orders = [Order(id=4)], + closed_orders = [Order(id=2)] + ), + User(id=10) + + ] == q.all() + self.assert_sql_count(testbase.db, go, 1) + + def test_limit(self): + """test limit operations combined with lazy-load relationships.""" + + mapper(Item, items) + mapper(Order, orders, properties={ + 'items':relation(Item, secondary=order_items, lazy=False) + }) + mapper(User, users, properties={ + 'addresses':relation(mapper(Address, addresses), lazy=False), + 'orders':relation(Order, lazy=True) + }) + + sess = create_session() + q = sess.query(User) + + if testbase.db.engine.name == 'mssql': + l = q.limit(2).all() + assert fixtures.user_all_result[:2] == l + else: + l = q.limit(2).offset(1).all() + assert fixtures.user_all_result[1:3] == l + + def test_distinct(self): + # this is an involved 3x union of the users table to get a lot of rows. + # then see if the "distinct" works its way out. you actually get the same + # result with or without the distinct, just via less or more rows. + u2 = users.alias('u2') + s = union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u') + + mapper(User, users, properties={ + 'addresses':relation(mapper(Address, addresses), lazy=False), + }) + + sess = create_session() + q = sess.query(User) + + def go(): + l = q.filter(s.c.u2_id==User.c.id).distinct().all() + assert fixtures.user_address_result == l + self.assert_sql_count(testbase.db, go, 1) + + def test_limit_2(self): + mapper(Keyword, keywords) + mapper(Item, items, properties = dict( + keywords = relation(Keyword, secondary=item_keywords, lazy=False, order_by=[keywords.c.id]), + )) + + sess = create_session() + q = sess.query(Item) + l = q.filter((Item.c.description=='item 2') | (Item.c.description=='item 5') | (Item.c.description=='item 3')).\ + order_by(Item.c.id).limit(2).all() + + assert fixtures.item_keyword_result[1:3] == l + + def test_limit_3(self): + """test that the ORDER BY is propigated from the inner select to the outer select, when using the + 'wrapped' select statement resulting from the combination of eager loading and limit/offset clauses.""" + + mapper(Item, items) + mapper(Order, orders, properties = dict( + items = relation(Item, secondary=order_items, lazy=False) + )) + + mapper(Address, addresses) + mapper(User, users, properties = dict( + addresses = relation(Address, lazy=False), + orders = relation(Order, lazy=False), + )) + sess = create_session() + + q = sess.query(User) + + if testbase.db.engine.name != 'mssql': + l = q.join('orders').order_by(desc(orders.c.user_id)).limit(2).offset(1) + assert [ + User(id=9, + orders=[Order(id=2), Order(id=4)], + addresses=[Address(id=5)] + ), + User(id=7, + orders=[Order(id=1), Order(id=3), Order(id=5)], + addresses=[Address(id=1)] + ) + ] == l.all() + + l = q.join('addresses').order_by(desc(addresses.c.email_address)).limit(1).offset(0) + assert [ + User(id=7, + orders=[Order(id=1), Order(id=3), Order(id=5)], + addresses=[Address(id=1)] + ) + ] == l.all() + + def test_one_to_many_scalar(self): + mapper(User, users, properties = dict( + address = relation(mapper(Address, addresses), lazy=False, uselist=False) + )) + q = create_session().query(User) + + def go(): + l = q.filter(users.c.id == 7).all() + assert [User(id=7, address=Address(id=1))] == l + self.assert_sql_count(testbase.db, go, 1) + + def test_many_to_one(self): + mapper(Address, addresses, properties = dict( + user = relation(mapper(User, users), lazy=False) + )) + sess = create_session() + q = sess.query(Address) + + def go(): + a = q.filter(addresses.c.id==1).one() + assert a.user is not None + u1 = sess.query(User).get(7) + assert a.user is u1 + self.assert_sql_count(testbase.db, go, 1) + + + def test_one_and_many(self): + """tests eager load for a parent object with a child object that + contains a many-to-many relationship to a third object.""" + + mapper(User, users, properties={ + 'orders':relation(Order, lazy=False) + }) + mapper(Item, items) + mapper(Order, orders, properties = dict( + items = relation(Item, secondary=order_items, lazy=False, order_by=items.c.id) + )) + + q = create_session().query(User) + + l = q.filter("users.id in (7, 8, 9)") + + def go(): + assert fixtures.user_order_result[0:3] == l.all() + self.assert_sql_count(testbase.db, go, 1) + + def test_double_with_aggregate(self): + + max_orders_by_user = select([func.max(orders.c.id).label('order_id')], group_by=[orders.c.user_id]).alias('max_orders_by_user') + + max_orders = orders.select(orders.c.id==max_orders_by_user.c.order_id).alias('max_orders') + + mapper(Order, orders) + mapper(User, users, properties={ + 'orders':relation(Order, backref='user', lazy=False), + 'max_order':relation(mapper(Order, max_orders, non_primary=True), lazy=False, uselist=False) + }) + q = create_session().query(User) + + def go(): + assert [ + User(id=7, orders=[ + Order(id=1), + Order(id=3), + Order(id=5), + ], + max_order=Order(id=5) + ), + User(id=8, orders=[]), + User(id=9, orders=[Order(id=2),Order(id=4)], + max_order=Order(id=4) + ), + User(id=10), + ] == q.all() + self.assert_sql_count(testbase.db, go, 1) + + def test_wide(self): + mapper(Order, orders, properties={'items':relation(Item, secondary=order_items, lazy=False, order_by=items.c.id)}) + mapper(Item, items) + mapper(User, users, properties = dict( + addresses = relation(mapper(Address, addresses), lazy = False), + orders = relation(Order, lazy = False), + )) + q = create_session().query(User) + l = q.select() + assert fixtures.user_all_result == q.all() + + def test_against_select(self): + """test eager loading of a mapper which is against a select""" + + s = select([orders], orders.c.isopen==1).alias('openorders') + + mapper(Order, s, properties={ + 'user':relation(User, lazy=False) + }) + mapper(User, users) + + q = create_session().query(Order) + assert [ + Order(id=3, user=User(id=7)), + Order(id=4, user=User(id=9)) + ] == q.all() + + q = q.select_from(s.join(order_items).join(items)).filter(~items.c.id.in_(1, 2, 5)) + assert [ + Order(id=3, user=User(id=7)), + ] == q.all() + + def test_aliasing(self): + """test that eager loading uses aliases to insulate the eager load from regular criterion against those tables.""" + + mapper(User, users, properties = dict( + addresses = relation(mapper(Address, addresses), lazy=False) + )) + q = create_session().query(User) + l = q.filter(addresses.c.email_address == 'ed@lala.com').filter(addresses.c.user_id==users.c.id) + assert fixtures.user_address_result[1:2] == l.all() + +if __name__ == '__main__': + testbase.main() diff --git a/test/orm/fixtures.py b/test/orm/fixtures.py new file mode 100644 index 0000000000..aed74c1701 --- /dev/null +++ b/test/orm/fixtures.py @@ -0,0 +1,229 @@ +from sqlalchemy import * + +_recursion_stack = util.Set() +class Base(object): + def __init__(self, **kwargs): + for k in kwargs: + setattr(self, k, kwargs[k]) + + def __ne__(self, other): + return not self.__eq__(other) + + def __eq__(self, other): + """'passively' compare this object to another. + + only look at attributes that are present on the source object. + + """ + + if self in _recursion_stack: + return True + _recursion_stack.add(self) + try: + # use __dict__ to avoid instrumented properties + for attr in self.__dict__.keys(): + if attr[0] == '_': + continue + value = getattr(self, attr) + if hasattr(value, '__iter__') and not isinstance(value, basestring): + if len(value) == 0: + continue + for (us, them) in zip(value, getattr(other, attr)): + if us != them: + return False + else: + continue + else: + if value is not None: + if value != getattr(other, attr): + return False + else: + return True + finally: + _recursion_stack.remove(self) + +class User(Base):pass +class Order(Base):pass +class Item(Base):pass +class Keyword(Base):pass +class Address(Base):pass + +metadata = MetaData() + +users = Table('users', metadata, + Column('id', Integer, primary_key=True), + Column('name', String(30), nullable=False)) + +orders = Table('orders', metadata, + Column('id', Integer, primary_key=True), + Column('user_id', None, ForeignKey('users.id')), + Column('address_id', None, ForeignKey('addresses.id')), + Column('description', String(30)), + Column('isopen', Integer) + ) + +addresses = Table('addresses', metadata, + Column('id', Integer, primary_key=True), + Column('user_id', None, ForeignKey('users.id')), + Column('email_address', String(50), nullable=False)) + +items = Table('items', metadata, + Column('id', Integer, primary_key=True), + Column('description', String(30), nullable=False) + ) + +order_items = Table('order_items', metadata, + Column('item_id', None, ForeignKey('items.id')), + Column('order_id', None, ForeignKey('orders.id'))) + +item_keywords = Table('item_keywords', metadata, + Column('item_id', None, ForeignKey('items.id')), + Column('keyword_id', None, ForeignKey('keywords.id'))) + +keywords = Table('keywords', metadata, + Column('id', Integer, primary_key=True), + Column('name', String(30), nullable=False) + ) + +def install_fixture_data(): + users.insert().execute( + dict(id = 7, name = 'jack'), + dict(id = 8, name = 'ed'), + dict(id = 9, name = 'fred'), + dict(id = 10, name = 'chuck'), + + ) + addresses.insert().execute( + dict(id = 1, user_id = 7, email_address = "jack@bean.com"), + dict(id = 2, user_id = 8, email_address = "ed@wood.com"), + dict(id = 3, user_id = 8, email_address = "ed@bettyboop.com"), + dict(id = 4, user_id = 8, email_address = "ed@lala.com"), + dict(id = 5, user_id = 9, email_address = "fred@fred.com"), + ) + orders.insert().execute( + dict(id = 1, user_id = 7, description = 'order 1', isopen=0, address_id=1), + dict(id = 2, user_id = 9, description = 'order 2', isopen=0, address_id=4), + dict(id = 3, user_id = 7, description = 'order 3', isopen=1, address_id=1), + dict(id = 4, user_id = 9, description = 'order 4', isopen=1, address_id=4), + dict(id = 5, user_id = 7, description = 'order 5', isopen=0, address_id=1) + ) + items.insert().execute( + dict(id=1, description='item 1'), + dict(id=2, description='item 2'), + dict(id=3, description='item 3'), + dict(id=4, description='item 4'), + dict(id=5, description='item 5'), + ) + order_items.insert().execute( + dict(item_id=1, order_id=1), + dict(item_id=2, order_id=1), + dict(item_id=3, order_id=1), + + dict(item_id=1, order_id=2), + dict(item_id=2, order_id=2), + dict(item_id=3, order_id=2), + + dict(item_id=3, order_id=3), + dict(item_id=4, order_id=3), + dict(item_id=5, order_id=3), + + dict(item_id=1, order_id=4), + dict(item_id=5, order_id=4), + + dict(item_id=5, order_id=5), + ) + keywords.insert().execute( + dict(id=1, name='blue'), + dict(id=2, name='red'), + dict(id=3, name='green'), + dict(id=4, name='big'), + dict(id=5, name='small'), + dict(id=6, name='round'), + dict(id=7, name='square') + ) + + # this many-to-many table has the keywords inserted + # in primary key order, to appease the unit tests. + # this is because postgres, oracle, and sqlite all support + # true insert-order row id, but of course our pal MySQL does not, + # so the best it can do is order by, well something, so there you go. + item_keywords.insert().execute( + dict(keyword_id=2, item_id=1), + dict(keyword_id=2, item_id=2), + dict(keyword_id=4, item_id=1), + dict(keyword_id=6, item_id=1), + dict(keyword_id=5, item_id=2), + dict(keyword_id=3, item_id=3), + dict(keyword_id=4, item_id=3), + dict(keyword_id=7, item_id=2), + dict(keyword_id=6, item_id=3) + ) + +class Fixtures(object): + @property + def user_address_result(self): + return [ + User(id=7, addresses=[ + Address(id=1) + ]), + User(id=8, addresses=[ + Address(id=2, email_address='ed@wood.com'), + Address(id=3, email_address='ed@bettyboop.com'), + Address(id=4, email_address='ed@lala.com'), + ]), + User(id=9, addresses=[ + Address(id=5) + ]), + User(id=10, addresses=[]) + ] + + @property + def user_all_result(self): + return [ + User(id=7, addresses=[ + Address(id=1) + ], orders=[ + Order(description='order 1', items=[Item(description='item 1'), Item(description='item 2'), Item(description='item 3')]), + Order(description='order 3'), + Order(description='order 5'), + ]), + User(id=8, addresses=[ + Address(id=2), + Address(id=3), + Address(id=4) + ]), + User(id=9, addresses=[ + Address(id=5) + ], orders=[ + Order(description='order 2', items=[Item(description='item 1'), Item(description='item 2'), Item(description='item 3')]), + Order(description='order 4', items=[Item(description='item 1'), Item(description='item 5')]), + ]), + User(id=10, addresses=[]) + ] + + @property + def user_order_result(self): + return [ + User(id=7, orders=[ + Order(id=1, items=[Item(id=1), Item(id=2), Item(id=3)]), + Order(id=3, items=[Item(id=3), Item(id=4), Item(id=5)]), + Order(id=5, items=[Item(id=5)]), + ]), + User(id=8, orders=[]), + User(id=9, orders=[ + Order(id=2, items=[Item(id=1), Item(id=2), Item(id=3)]), + Order(id=4, items=[Item(id=1), Item(id=5)]), + ]), + User(id=10) + ] + + @property + def item_keyword_result(self): + return [ + Item(id=1, keywords=[Keyword(name='red'), Keyword(name='big'), Keyword(name='round')]), + Item(id=2, keywords=[Keyword(name='red'), Keyword(name='small'), Keyword(name='square')]), + Item(id=3, keywords=[Keyword(name='green'), Keyword(name='big'), Keyword(name='round')]), + Item(id=4, keywords=[]), + Item(id=5, keywords=[]), + ] +fixtures = Fixtures() diff --git a/test/orm/lazy_relations.py b/test/orm/lazy_relations.py new file mode 100644 index 0000000000..5accb74713 --- /dev/null +++ b/test/orm/lazy_relations.py @@ -0,0 +1,265 @@ +"""basic tests of lazy loaded attributes""" + +from sqlalchemy import * +from sqlalchemy.orm import * +import testbase + +from fixtures import * +from query import QueryTest + +class LazyTest(QueryTest): + keep_mappers = False + + def setup_mappers(self): + pass + + def test_basic(self): + mapper(User, users, properties={ + 'addresses':relation(mapper(Address, addresses), lazy=True) + }) + sess = create_session() + q = sess.query(User) + assert [User(id=7, addresses=[Address(id=1, email_address='jack@bean.com')])] == q.filter(users.c.id == 7).all() + + def test_bindstosession(self): + """test that lazy loaders use the mapper's contextual session if the parent instance + is not in a session, and that an error is raised if no contextual session""" + + from sqlalchemy.ext.sessioncontext import SessionContext + ctx = SessionContext(create_session) + m = mapper(User, users, properties = dict( + addresses = relation(mapper(Address, addresses, extension=ctx.mapper_extension), lazy=True) + ), extension=ctx.mapper_extension) + q = ctx.current.query(m) + u = q.filter(users.c.id == 7).first() + ctx.current.expunge(u) + assert User(id=7, addresses=[Address(id=1, email_address='jack@bean.com')]) == u + + clear_mappers() + + mapper(User, users, properties={ + 'addresses':relation(mapper(Address, addresses), lazy=True) + }) + try: + sess = create_session() + q = sess.query(User) + u = q.filter(users.c.id == 7).first() + sess.expunge(u) + assert User(id=7, addresses=[Address(id=1, email_address='jack@bean.com')]) == u + except exceptions.InvalidRequestError, err: + assert "not bound to a Session, and no contextual session" in str(err) + + def test_orderby(self): + mapper(User, users, properties = { + 'addresses':relation(mapper(Address, addresses), lazy=True, order_by=addresses.c.email_address), + }) + q = create_session().query(User) + assert [ + User(id=7, addresses=[ + Address(id=1) + ]), + User(id=8, addresses=[ + Address(id=3, email_address='ed@bettyboop.com'), + Address(id=4, email_address='ed@lala.com'), + Address(id=2, email_address='ed@wood.com') + ]), + User(id=9, addresses=[ + Address(id=5) + ]), + User(id=10, addresses=[]) + ] == q.all() + + def test_orderby_secondary(self): + """tests that a regular mapper select on a single table can order by a relation to a second table""" + + mapper(Address, addresses) + + mapper(User, users, properties = dict( + addresses = relation(Address, lazy=True), + )) + q = create_session().query(User) + l = q.filter(users.c.id==addresses.c.user_id).order_by(addresses.c.email_address).all() + assert [ + User(id=8, addresses=[ + Address(id=2, email_address='ed@wood.com'), + Address(id=3, email_address='ed@bettyboop.com'), + Address(id=4, email_address='ed@lala.com'), + ]), + User(id=9, addresses=[ + Address(id=5) + ]), + User(id=7, addresses=[ + Address(id=1) + ]), + ] == l + + def test_orderby_desc(self): + mapper(Address, addresses) + + mapper(User, users, properties = dict( + addresses = relation(Address, lazy=True, order_by=[desc(addresses.c.email_address)]), + )) + sess = create_session() + assert [ + User(id=7, addresses=[ + Address(id=1) + ]), + User(id=8, addresses=[ + Address(id=2, email_address='ed@wood.com'), + Address(id=4, email_address='ed@lala.com'), + Address(id=3, email_address='ed@bettyboop.com'), + ]), + User(id=9, addresses=[ + Address(id=5) + ]), + User(id=10, addresses=[]) + ] == sess.query(User).all() + + def test_no_orphan(self): + """test that a lazily loaded child object is not marked as an orphan""" + + mapper(User, users, properties={ + 'addresses':relation(Address, cascade="all,delete-orphan", lazy=True) + }) + mapper(Address, addresses) + + sess = create_session() + user = sess.query(User).get(7) + assert getattr(User, 'addresses').hasparent(user.addresses[0], optimistic=True) + assert not class_mapper(Address)._is_orphan(user.addresses[0]) + + def test_limit(self): + """test limit operations combined with lazy-load relationships.""" + + mapper(Item, items) + mapper(Order, orders, properties={ + 'items':relation(Item, secondary=order_items, lazy=True) + }) + mapper(User, users, properties={ + 'addresses':relation(mapper(Address, addresses), lazy=True), + 'orders':relation(Order, lazy=True) + }) + + sess = create_session() + q = sess.query(User) + + if testbase.db.engine.name == 'mssql': + l = q.limit(2).all() + assert fixtures.user_all_result[:2] == l + else: + l = q.limit(2).offset(1).all() + assert fixtures.user_all_result[1:3] == l + + def test_distinct(self): + mapper(Item, items) + mapper(Order, orders, properties={ + 'items':relation(Item, secondary=order_items, lazy=True) + }) + mapper(User, users, properties={ + 'addresses':relation(mapper(Address, addresses), lazy=True), + 'orders':relation(Order, lazy=True) + }) + + sess = create_session() + q = sess.query(User) + + # use a union all to get a lot of rows to join against + u2 = users.alias('u2') + s = union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u') + print [key for key in s.c.keys()] + l = q.filter(s.c.u2_id==User.c.id).distinct().all() + assert fixtures.user_all_result == l + + def test_one_to_many_scalar(self): + mapper(User, users, properties = dict( + address = relation(mapper(Address, addresses), lazy=True, uselist=False) + )) + q = create_session().query(User) + l = q.filter(users.c.id == 7).all() + assert [User(id=7, address=Address(id=1))] == l + + def test_double(self): + """tests lazy loading with two relations simulatneously, from the same table, using aliases. """ + openorders = alias(orders, 'openorders') + closedorders = alias(orders, 'closedorders') + + mapper(Address, addresses) + + mapper(User, users, properties = dict( + addresses = relation(Address, lazy = True), + open_orders = relation(mapper(Order, openorders, entity_name='open'), primaryjoin = and_(openorders.c.isopen == 1, users.c.id==openorders.c.user_id), lazy=True), + closed_orders = relation(mapper(Order, closedorders,entity_name='closed'), primaryjoin = and_(closedorders.c.isopen == 0, users.c.id==closedorders.c.user_id), lazy=True) + )) + q = create_session().query(User) + + assert [ + User( + id=7, + addresses=[Address(id=1)], + open_orders = [Order(id=3)], + closed_orders = [Order(id=1), Order(id=5)] + ), + User( + id=8, + addresses=[Address(id=2), Address(id=3), Address(id=4)], + open_orders = [], + closed_orders = [] + ), + User( + id=9, + addresses=[Address(id=5)], + open_orders = [Order(id=4)], + closed_orders = [Order(id=2)] + ), + User(id=10) + + ] == q.all() + + def test_many_to_many(self): + + mapper(Keyword, keywords) + mapper(Item, items, properties = dict( + keywords = relation(Keyword, secondary=item_keywords, lazy=True), + )) + + q = create_session().query(Item) + assert fixtures.item_keyword_result == q.all() + + assert fixtures.item_keyword_result[0:2] == q.join('keywords').filter(keywords.c.name == 'red').all() + + def test_uses_get(self): + """test that a simple many-to-one lazyload optimizes to use query.get().""" + + mapper(Address, addresses, properties = dict( + user = relation(mapper(User, users), lazy=True) + )) + + sess = create_session() + + # load address + a1 = sess.query(Address).filter_by(email_address="ed@wood.com").one() + + # load user that is attached to the address + u1 = sess.query(User).get(8) + + def go(): + # lazy load of a1.user should get it from the session + assert a1.user is u1 + self.assert_sql_count(testbase.db, go, 0) + + def test_many_to_one(self): + mapper(Address, addresses, properties = dict( + user = relation(mapper(User, users), lazy=True) + )) + sess = create_session() + q = sess.query(Address) + a = q.filter(addresses.c.id==1).one() + + assert a.user is not None + + u1 = sess.query(User).get(7) + + assert a.user is u1 + +if __name__ == '__main__': + testbase.main() diff --git a/test/orm/mapper.py b/test/orm/mapper.py index 197f988b7a..f366d4866e 100644 --- a/test/orm/mapper.py +++ b/test/orm/mapper.py @@ -990,742 +990,6 @@ class NoLoadTest(MapperSuperTest): ) -class LazyTest(MapperSuperTest): - - def testbasic(self): - """tests a basic one-to-many lazy load""" - m = mapper(User, users, properties = dict( - addresses = relation(mapper(Address, addresses), lazy = True) - )) - q = create_session().query(m) - l = q.select(users.c.user_id == 7) - self.assert_result(l, User, - {'user_id' : 7, 'addresses' : (Address, [{'address_id' : 1}])}, - ) - - def testbindstosession(self): - ctx = SessionContext(create_session) - m = mapper(User, users, properties = dict( - addresses = relation(mapper(Address, addresses, extension=ctx.mapper_extension), lazy=True) - ), extension=ctx.mapper_extension) - q = ctx.current.query(m) - u = q.filter(users.c.user_id == 7).selectfirst() - ctx.current.expunge(u) - self.assert_result([u], User, - {'user_id' : 7, 'addresses' : (Address, [{'address_id' : 1}])}, - ) - - def testcreateinstance(self): - class Ext(MapperExtension): - def create_instance(self, *args, **kwargs): - return User() - m = mapper(Address, addresses) - m = mapper(User, users, extension=Ext(), properties = dict( - addresses = relation(Address, lazy=True), - )) - - q = create_session().query(m) - l = q.select(); - self.assert_result(l, User, *user_address_result) - - def testorderby(self): - m = mapper(Address, addresses) - - m = mapper(User, users, properties = dict( - addresses = relation(m, lazy = True, order_by=addresses.c.email_address), - )) - q = create_session().query(m) - l = q.select() - - self.assert_result(l, User, - {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])}, - {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@bettyboop.com'}, {'email_address':'ed@lala.com'}, {'email_address':'ed@wood.com'}])}, - {'user_id' : 9, 'addresses' : (Address, [])} - ) - - def testorderby_select(self): - """tests that a regular mapper select on a single table can order by a relation to a second table""" - m = mapper(Address, addresses) - - m = mapper(User, users, properties = dict( - addresses = relation(m, lazy = True), - )) - q = create_session().query(m) - l = q.select(users.c.user_id==addresses.c.user_id, order_by=addresses.c.email_address) - - self.assert_result(l, User, - {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@wood.com'}, {'email_address':'ed@bettyboop.com'}, {'email_address':'ed@lala.com'}, ])}, - {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])}, - ) - - def testorderby_desc(self): - m = mapper(Address, addresses) - - m = mapper(User, users, properties = dict( - addresses = relation(m, lazy = True, order_by=[desc(addresses.c.email_address)]), - )) - q = create_session().query(m) - l = q.select() - - self.assert_result(l, User, - {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])}, - {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@wood.com'}, {'email_address':'ed@lala.com'}, {'email_address':'ed@bettyboop.com'}])}, - {'user_id' : 9, 'addresses' : (Address, [])}, - ) - - def testorphanstate(self): - """test that a lazily loaded child object is not marked as an orphan""" - m = mapper(User, users, properties={ - 'addresses':relation(Address, cascade="all,delete-orphan", lazy=True) - }) - mapper(Address, addresses) - - q = create_session().query(m) - user = q.get(7) - assert getattr(User, 'addresses').hasparent(user.addresses[0], optimistic=True) - assert not class_mapper(Address)._is_orphan(user.addresses[0]) - - def testlimit(self): - ordermapper = mapper(Order, orders, properties = dict( - items = relation(mapper(Item, orderitems), lazy = True) - )) - - m = mapper(User, users, properties = dict( - addresses = relation(mapper(Address, addresses), lazy = True), - orders = relation(ordermapper, primaryjoin = users.c.user_id==orders.c.user_id, lazy = True), - )) - sess= create_session() - q = sess.query(m) - - if db.engine.name == 'mssql': - l = q.select(limit=2) - self.assert_result(l, User, *user_all_result[:2]) - else: - l = q.select(limit=2, offset=1) - self.assert_result(l, User, *user_all_result[1:3]) - - # use a union all to get a lot of rows to join against - u2 = users.alias('u2') - s = union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u') - print [key for key in s.c.keys()] - l = q.select(s.c.u2_user_id==User.c.user_id, distinct=True) - self.assert_result(l, User, *user_all_result) - - sess.clear() - clear_mappers() - m = mapper(Item, orderitems, properties = dict( - keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = True), - )) - - l = sess.query(m).select((Item.c.item_name=='item 2') | (Item.c.item_name=='item 5') | (Item.c.item_name=='item 3'), order_by=[Item.c.item_id], limit=2) - self.assert_result(l, Item, *[item_keyword_result[1], item_keyword_result[2]]) - - def testonetoone(self): - m = mapper(User, users, properties = dict( - address = relation(mapper(Address, addresses), lazy = True, uselist = False) - )) - q = create_session().query(m) - l = q.select(users.c.user_id == 7) - self.assert_result(l, User, {'user_id':7, 'address' : (Address, {'address_id':1})}) - - def testbackwardsonetoone(self): - m = mapper(Address, addresses, properties = dict( - user = relation(mapper(User, users), lazy = True) - )) - q = create_session().query(m) - l = q.select(addresses.c.address_id == 1) - self.echo(repr(l)) - print repr(l[0].__dict__) - self.echo(repr(l[0].user)) - self.assert_(l[0].user is not None) - - def testuseget(self): - """test that a simple many-to-one lazyload optimizes to use query.get(). - - this is done currently by comparing the 'get' SQL clause of the query - to the 'lazy' SQL clause of the lazy loader, so it relies heavily on - ClauseElement.compare()""" - - m = mapper(Address, addresses, properties = dict( - user = relation(mapper(User, users), lazy = True) - )) - sess = create_session() - a1 = sess.query(Address).get_by(email_address = "ed@wood.com") - u1 = sess.query(User).get(8) - def go(): - assert a1.user is u1 - self.assert_sql_count(db, go, 0) - - def testdouble(self): - """tests lazy loading with two relations simulatneously, from the same table, using aliases. """ - openorders = alias(orders, 'openorders') - closedorders = alias(orders, 'closedorders') - m = mapper(User, users, properties = dict( - addresses = relation(mapper(Address, addresses), lazy = True), - open_orders = relation(mapper(Order, openorders, entity_name='open'), primaryjoin = and_(openorders.c.isopen == 1, users.c.user_id==openorders.c.user_id), lazy = True), - closed_orders = relation(mapper(Order, closedorders,entity_name='closed'), primaryjoin = and_(closedorders.c.isopen == 0, users.c.user_id==closedorders.c.user_id), lazy = True) - )) - q = create_session().query(m) - l = q.select() - self.assert_result(l, User, - {'user_id' : 7, - 'addresses' : (Address, [{'address_id' : 1}]), - 'open_orders' : (Order, [{'order_id' : 3}]), - 'closed_orders' : (Order, [{'order_id' : 1},{'order_id' : 5},]) - }, - {'user_id' : 8, - 'addresses' : (Address, [{'address_id' : 2}, {'address_id' : 3}, {'address_id' : 4}]), - 'open_orders' : (Order, []), - 'closed_orders' : (Order, []) - }, - {'user_id' : 9, - 'addresses' : (Address, []), - 'open_orders' : (Order, [{'order_id' : 4}]), - 'closed_orders' : (Order, [{'order_id' : 2}]) - } - ) - - def testmanytomany(self): - """tests a many-to-many lazy load""" - mapper(Item, orderitems, properties = dict( - keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = True), - )) - q = create_session().query(Item) - l = q.select() - self.assert_result(l, Item, - {'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])}, - {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])}, - {'item_id' : 3, 'keywords' : (Keyword, [{'keyword_id' : 3}, {'keyword_id' : 4}, {'keyword_id' : 6}])}, - {'item_id' : 4, 'keywords' : (Keyword, [])}, - {'item_id' : 5, 'keywords' : (Keyword, [])} - ) - l = q.select(and_(keywords.c.name == 'red', keywords.c.keyword_id == itemkeywords.c.keyword_id, Item.c.item_id==itemkeywords.c.item_id)) - self.assert_result(l, Item, - {'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])}, - {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])}, - ) - - - -class EagerTest(MapperSuperTest): - def testbasic(self): - """tests a basic one-to-many eager load""" - m = mapper(Address, addresses) - - m = mapper(User, users, properties = dict( - addresses = relation(m, lazy = False), - )) - q = create_session().query(m) - l = q.select() - self.assert_result(l, User, *user_address_result) - - def testorderby(self): - m = mapper(Address, addresses) - - m = mapper(User, users, properties = dict( - addresses = relation(m, lazy = False, order_by=addresses.c.email_address), - )) - q = create_session().query(m) - l = q.select() - self.assert_result(l, User, - {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])}, - {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@bettyboop.com'}, {'email_address':'ed@lala.com'}, {'email_address':'ed@wood.com'}])}, - {'user_id' : 9, 'addresses' : (Address, [])} - ) - - - def testorderby_desc(self): - m = mapper(Address, addresses) - - m = mapper(User, users, properties = dict( - addresses = relation(m, lazy = False, order_by=[desc(addresses.c.email_address)]), - )) - q = create_session().query(m) - l = q.select() - - self.assert_result(l, User, - {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])}, - {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@wood.com'},{'email_address':'ed@lala.com'}, {'email_address':'ed@bettyboop.com'}, ])}, - {'user_id' : 9, 'addresses' : (Address, [])}, - ) - - def testlimit(self): - ordermapper = mapper(Order, orders, properties = dict( - items = relation(mapper(Item, orderitems), lazy = False) - )) - - m = mapper(User, users, properties = dict( - addresses = relation(mapper(Address, addresses), lazy = False), - orders = relation(ordermapper, primaryjoin = users.c.user_id==orders.c.user_id, lazy = False), - )) - sess = create_session() - q = sess.query(m) - - if db.engine.name == 'mssql': - l = q.select(limit=2) - self.assert_result(l, User, *user_all_result[:2]) - else: - l = q.select(limit=2, offset=1) - self.assert_result(l, User, *user_all_result[1:3]) - - # this is an involved 3x union of the users table to get a lot of rows. - # then see if the "distinct" works its way out. you actually get the same - # result with or without the distinct, just via less or more rows. - u2 = users.alias('u2') - s = union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u') - l = q.select(s.c.u2_user_id==User.c.user_id, distinct=True) - self.assert_result(l, User, *user_all_result) - sess.clear() - clear_mappers() - m = mapper(Item, orderitems, properties = dict( - keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = False, order_by=[keywords.c.keyword_id]), - )) - q = sess.query(m) - l = q.select((Item.c.item_name=='item 2') | (Item.c.item_name=='item 5') | (Item.c.item_name=='item 3'), order_by=[Item.c.item_id], limit=2) - self.assert_result(l, Item, *[item_keyword_result[1], item_keyword_result[2]]) - - def testmorelimit(self): - """test that the ORDER BY is propigated from the inner select to the outer select, when using the - 'wrapped' select statement resulting from the combination of eager loading and limit/offset clauses.""" - ordermapper = mapper(Order, orders, properties = dict( - items = relation(mapper(Item, orderitems), lazy = False) - )) - - m = mapper(User, users, properties = dict( - addresses = relation(mapper(Address, addresses), lazy = False), - orders = relation(ordermapper, primaryjoin = users.c.user_id==orders.c.user_id, lazy = False), - )) - sess = create_session() - q = sess.query(m) - - if db.engine.name != 'mssql': - l = q.select(q.join_to('orders'), order_by=desc(orders.c.user_id), limit=2, offset=1) - self.assert_result(l, User, *(user_all_result[2], user_all_result[0])) - - def testonetoone(self): - m = mapper(User, users, properties = dict( - address = relation(mapper(Address, addresses), lazy = False, uselist = False) - )) - q = create_session().query(m) - l = q.select(users.c.user_id == 7) - self.assert_result(l, User, - {'user_id' : 7, 'address' : (Address, {'address_id' : 1, 'email_address': 'jack@bean.com'})}, - ) - - def testbackwardsonetoone(self): - m = mapper(Address, addresses, properties = dict( - user = relation(mapper(User, users), lazy = False) - )).compile() - self.echo(repr(m.props['user'].uselist)) - q = create_session().query(m) - l = q.select(addresses.c.address_id == 1) - self.assert_result(l, Address, - {'address_id' : 1, 'email_address' : 'jack@bean.com', - 'user' : (User, {'user_id' : 7, 'user_name' : 'jack'}) - }, - ) - - def testorphanstate(self): - """test that an eagerly loaded child object is not marked as an orphan""" - m = mapper(User, users, properties={ - 'addresses':relation(Address, cascade="all,delete-orphan", lazy=False) - }) - mapper(Address, addresses) - - s = create_session() - q = s.query(m) - user = q.get(7) - assert getattr(User, 'addresses').hasparent(user.addresses[0], optimistic=True) - assert not class_mapper(Address)._is_orphan(user.addresses[0]) - - def testwithrepeat(self): - """tests a one-to-many eager load where we also query on joined criterion, where the joined - criterion is using the same tables that are used within the eager load. the mapper must insure that the - criterion doesnt interfere with the eager load criterion.""" - m = mapper(User, users, properties = dict( - addresses = relation(mapper(Address, addresses), primaryjoin = users.c.user_id==addresses.c.user_id, lazy = False) - )) - q = create_session().query(m) - l = q.select(and_(addresses.c.email_address == 'ed@lala.com', addresses.c.user_id==users.c.user_id)) - self.assert_result(l, User, - {'user_id' : 8, 'addresses' : (Address, [{'address_id' : 2, 'email_address':'ed@wood.com'}, {'address_id':3, 'email_address':'ed@bettyboop.com'}, {'address_id':4, 'email_address':'ed@lala.com'}])}, - ) - - def testcircular(self): - """test that a circular eager relationship breaks the cycle with a lazy loader""" - m = mapper(User, users, properties = dict( - addresses = relation(mapper(Address, addresses), lazy=False, backref=backref('user', lazy=False)) - )) - assert class_mapper(User).props['addresses'].lazy is False - assert class_mapper(Address).props['user'].lazy is False - session = create_session() - l = session.query(User).select() - self.assert_result(l, User, *user_address_result) - - def testcompile(self): - """tests deferred operation of a pre-compiled mapper statement""" - session = create_session() - m = mapper(User, users, properties = dict( - addresses = relation(mapper(Address, addresses), lazy = False) - )) - s = session.query(m).compile(and_(addresses.c.email_address == bindparam('emailad'), addresses.c.user_id==users.c.user_id)) - c = s.compile() - self.echo("\n" + str(c) + repr(c.get_params())) - - l = m.instances(s.execute(emailad = 'jack@bean.com'), session) - self.echo(repr(l)) - - def testonselect(self): - """test eager loading of a mapper which is against a select""" - - s = select([orders], orders.c.isopen==1).alias('openorders') - mapper(Order, s, properties={ - 'user':relation(User, lazy=False) - }) - mapper(User, users) - - q = create_session().query(Order) - self.assert_result(q.list(), Order, - {'order_id':3, 'user' : (User, {'user_id':7})}, - {'order_id':4, 'user' : (User, {'user_id':9})}, - ) - - q = q.select_from(s.outerjoin(orderitems)).filter(orderitems.c.item_name != 'item 2') - self.assert_result(q.list(), Order, - {'order_id':3, 'user' : (User, {'user_id':7})}, - ) - - - def testmulti(self): - """tests eager loading with two relations simultaneously""" - m = mapper(User, users, properties = dict( - addresses = relation(mapper(Address, addresses), primaryjoin = users.c.user_id==addresses.c.user_id, lazy = False), - orders = relation(mapper(Order, orders), lazy = False), - )) - q = create_session().query(m) - l = q.select() - self.assert_result(l, User, - {'user_id' : 7, - 'addresses' : (Address, [{'address_id' : 1}]), - 'orders' : (Order, [{'order_id' : 1}, {'order_id' : 3},{'order_id' : 5},]) - }, - {'user_id' : 8, - 'addresses' : (Address, [{'address_id' : 2}, {'address_id' : 3}, {'address_id' : 4}]), - 'orders' : (Order, []) - }, - {'user_id' : 9, - 'addresses' : (Address, []), - 'orders' : (Order, [{'order_id' : 2},{'order_id' : 4}]) - } - ) - - def testdouble(self): - """tests eager loading with two relations simultaneously, from the same table. """ - openorders = alias(orders, 'openorders') - closedorders = alias(orders, 'closedorders') - ordermapper = mapper(Order, orders) - m = mapper(User, users, properties = dict( - addresses = relation(mapper(Address, addresses), lazy = False), - open_orders = relation(mapper(Order, openorders, non_primary=True), primaryjoin = and_(openorders.c.isopen == 1, users.c.user_id==openorders.c.user_id), lazy = False), - closed_orders = relation(mapper(Order, closedorders, non_primary=True), primaryjoin = and_(closedorders.c.isopen == 0, users.c.user_id==closedorders.c.user_id), lazy = False) - )) - q = create_session().query(m) - l = q.select() - self.assert_result(l, User, - {'user_id' : 7, - 'addresses' : (Address, [{'address_id' : 1}]), - 'open_orders' : (Order, [{'order_id' : 3}]), - 'closed_orders' : (Order, [{'order_id' : 1},{'order_id' : 5},]) - }, - {'user_id' : 8, - 'addresses' : (Address, [{'address_id' : 2}, {'address_id' : 3}, {'address_id' : 4}]), - 'open_orders' : (Order, []), - 'closed_orders' : (Order, []) - }, - {'user_id' : 9, - 'addresses' : (Address, []), - 'open_orders' : (Order, [{'order_id' : 4}]), - 'closed_orders' : (Order, [{'order_id' : 2}]) - } - ) - - def testdoublewithscalar(self): - """tests eager loading with two relations from the same table, with one of them joining to the parent User. the other is the primary mapper. doesn't re-test addresses relation.""" - max_orders_by_user = select([func.max(orders.c.order_id).label('order_id')], group_by=[orders.c.user_id]).alias('max_orders_by_user') - max_orders = orders.select(orders.c.order_id==max_orders_by_user.c.order_id).alias('max_orders') - m = mapper(User, users, properties={ - 'orders':relation(mapper(Order, orders), backref='user', lazy=False), - 'max_order':relation(mapper(Order, max_orders, non_primary=True), lazy=False, uselist=False) - }) - q = create_session().query(m) - l = q.select() - self.assert_result(l, User, - {'user_id' : 7, - 'orders' : (Order, [{'order_id' : 1}, {'order_id' : 3},{'order_id' : 5},]), - 'max_order' : (Order, {'order_id' : 5}) - }, - {'user_id' : 8, - 'orders' : (Order, []), - 'max_order' : None, - }, - {'user_id' : 9, - 'orders' : (Order, [{'order_id' : 2},{'order_id' : 4}]), - 'max_order' : (Order, {'order_id' : 4}) - } - ) - - def testnested(self): - """tests eager loading of a parent item with two types of child items, - where one of those child items eager loads its own child items.""" - ordermapper = mapper(Order, orders, properties = dict( - items = relation(mapper(Item, orderitems), lazy = False) - )) - - m = mapper(User, users, properties = dict( - addresses = relation(mapper(Address, addresses), lazy = False), - orders = relation(ordermapper, primaryjoin = users.c.user_id==orders.c.user_id, lazy = False), - )) - q = create_session().query(m) - l = q.select() - self.assert_result(l, User, *user_all_result) - - def testmanytomany(self): - items = orderitems - m = mapper(Item, items, properties = dict( - keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy=False, order_by=[keywords.c.keyword_id]), - )) - q = create_session().query(m) - l = q.select() - self.assert_result(l, Item, *item_keyword_result) - - l = q.select(and_(keywords.c.name == 'red', keywords.c.keyword_id == itemkeywords.c.keyword_id, items.c.item_id==itemkeywords.c.item_id)) - self.assert_result(l, Item, - {'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])}, - {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])}, - ) - - - def testmanytomanyoptions(self): - items = orderitems - - m = mapper(Item, items, properties = dict( - keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy=True, order_by=[keywords.c.keyword_id]), - )) - q = create_session().query(m).options(eagerload('keywords')) - def go(): - l = q.select() - self.assert_result(l, Item, *item_keyword_result) - self.assert_sql_count(db, go, 1) - - def go(): - l = q.select(and_(keywords.c.name == 'red', keywords.c.keyword_id == itemkeywords.c.keyword_id, items.c.item_id==itemkeywords.c.item_id)) - self.assert_result(l, Item, - {'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])}, - {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])}, - ) - self.assert_sql_count(db, go, 1) - - def testoneandmany(self): - """tests eager load for a parent object with a child object that - contains a many-to-many relationship to a third object.""" - items = orderitems - - m = mapper(Item, items, - properties = dict( - keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = False, order_by=[keywords.c.keyword_id]), - )) - - m = mapper(Order, orders, properties = dict( - items = relation(m, lazy = False) - )) - q = create_session().query(m) - l = q.select("orders.order_id in (1,2,3)") - self.assert_result(l, Order, - {'order_id' : 1, 'items': (Item, [])}, - {'order_id' : 2, 'items': (Item, [ - {'item_id':1, 'item_name':'item 1', 'keywords': (Keyword, [{'keyword_id':2, 'name':'red'}, {'keyword_id':4, 'name':'big'}, {'keyword_id' : 6, 'name':'round'}])}, - {'item_id':2, 'item_name':'item 2','keywords' : (Keyword, [{'keyword_id' : 2, 'name':'red'}, {'keyword_id' : 5, 'name':'small'}, {'keyword_id' : 7, 'name':'square'}])} - ])}, - {'order_id' : 3, 'items': (Item, [ - {'item_id':3, 'item_name':'item 3', 'keywords' : (Keyword, [{'keyword_id' : 3, 'name':'green'}, {'keyword_id' : 4, 'name':'big'}, {'keyword_id' : 6, 'name':'round'}])}, - {'item_id':4, 'item_name':'item 4'}, - {'item_id':5, 'item_name':'item 5'} - ])}, - ) - -class InstancesTest(MapperSuperTest): - def testcustomfromalias(self): - mapper(User, users, properties={ - 'addresses':relation(Address, lazy=True) - }) - mapper(Address, addresses) - query = users.select(users.c.user_id==7).union(users.select(users.c.user_id>7)).alias('ulist').outerjoin(addresses).select(use_labels=True,order_by=['ulist.user_id', addresses.c.address_id]) - q = create_session().query(User) - - def go(): - l = q.options(contains_alias('ulist'), contains_eager('addresses')).instances(query.execute()) - self.assert_result(l, User, *user_address_result) - self.assert_sql_count(testbase.db, go, 1) - - def testcustomeagerquery(self): - mapper(User, users, properties={ - # setting lazy=True - the contains_eager() option below - # should imply eagerload() - 'addresses':relation(Address, lazy=True) - }) - mapper(Address, addresses) - - selectquery = users.outerjoin(addresses).select(use_labels=True, order_by=[users.c.user_id, addresses.c.address_id]) - q = create_session().query(User) - - def go(): - l = q.options(contains_eager('addresses')).instances(selectquery.execute()) - self.assert_result(l, User, *user_address_result) - self.assert_sql_count(testbase.db, go, 1) - - def testcustomeagerwithstringalias(self): - mapper(User, users, properties={ - 'addresses':relation(Address, lazy=False) - }) - mapper(Address, addresses) - - adalias = addresses.alias('adalias') - selectquery = users.outerjoin(adalias).select(use_labels=True, order_by=[users.c.user_id, adalias.c.address_id]) - q = create_session().query(User) - - def go(): - l = q.options(contains_eager('addresses', alias="adalias")).instances(selectquery.execute()) - self.assert_result(l, User, *user_address_result) - self.assert_sql_count(testbase.db, go, 1) - - def testcustomeagerwithalias(self): - mapper(User, users, properties={ - 'addresses':relation(Address, lazy=False) - }) - mapper(Address, addresses) - - adalias = addresses.alias('adalias') - selectquery = users.outerjoin(adalias).select(use_labels=True, order_by=[users.c.user_id, adalias.c.address_id]) - q = create_session().query(User) - - def go(): - l = q.options(contains_eager('addresses', alias=adalias)).instances(selectquery.execute()) - self.assert_result(l, User, *user_address_result) - self.assert_sql_count(testbase.db, go, 1) - - def testcustomeagerwithdecorator(self): - mapper(User, users, properties={ - 'addresses':relation(Address, lazy=False) - }) - mapper(Address, addresses) - - adalias = addresses.alias('adalias') - selectquery = users.outerjoin(adalias).select(use_labels=True, order_by=[users.c.user_id, adalias.c.address_id]) - def decorate(row): - d = {} - for c in addresses.columns: - d[c] = row[adalias.corresponding_column(c)] - return d - - q = create_session().query(User) - - def go(): - l = q.options(contains_eager('addresses', decorator=decorate)).instances(selectquery.execute()) - self.assert_result(l, User, *user_address_result) - self.assert_sql_count(testbase.db, go, 1) - - def testmultiplemappers(self): - mapper(User, users, properties={ - 'addresses':relation(Address, lazy=True) - }) - mapper(Address, addresses) - - sess = create_session() - - (user7, user8, user9) = sess.query(User).select() - (address1, address2, address3, address4) = sess.query(Address).select() - - selectquery = users.outerjoin(addresses).select(use_labels=True, order_by=[users.c.user_id, addresses.c.address_id]) - q = sess.query(User) - l = q.instances(selectquery.execute(), Address) - # note the result is a cartesian product - assert l == [ - (user7, address1), - (user8, address2), - (user8, address3), - (user8, address4), - (user9, None) - ] - - def testmultipleonquery(self): - mapper(User, users, properties={ - 'addresses':relation(Address, lazy=True) - }) - mapper(Address, addresses) - sess = create_session() - (user7, user8, user9) = sess.query(User).select() - (address1, address2, address3, address4) = sess.query(Address).select() - q = sess.query(User) - q = q.add_entity(Address).outerjoin('addresses') - l = q.list() - assert l == [ - (user7, address1), - (user8, address2), - (user8, address3), - (user8, address4), - (user9, None) - ] - - def testcolumnonquery(self): - mapper(User, users, properties={ - 'addresses':relation(Address, lazy=True) - }) - mapper(Address, addresses) - - sess = create_session() - (user7, user8, user9) = sess.query(User).select() - q = sess.query(User) - q = q.group_by([c for c in users.c]).order_by(User.c.user_id).outerjoin('addresses').add_column(func.count(addresses.c.address_id).label('count')) - l = q.list() - assert l == [ - (user7, 1), - (user8, 3), - (user9, 0) - ], repr(l) - - def testmapperspluscolumn(self): - mapper(User, users) - s = select([users, func.count(addresses.c.address_id).label('count')], from_obj=[users.outerjoin(addresses)], group_by=[c for c in users.c], order_by=[users.c.user_id]) - sess = create_session() - (user7, user8, user9) = sess.query(User).select() - q = sess.query(User) - l = q.instances(s.execute(), "count") - assert l == [ - (user7, 1), - (user8, 3), - (user9, 0) - ] - - def testmappersplustwocolumns(self): - mapper(User, users) - - # Fixme ticket #475! - if db.engine.name == 'mysql': - col2 = func.concat("Name:", users.c.user_name).label('concat') - else: - col2 = ("Name:" + users.c.user_name).label('concat') - - s = select([users, func.count(addresses.c.address_id).label('count'), col2], from_obj=[users.outerjoin(addresses)], group_by=[c for c in users.c], order_by=[users.c.user_id]) - sess = create_session() - (user7, user8, user9) = sess.query(User).select() - q = sess.query(User) - l = q.instances(s.execute(), "count", "concat") - print l - assert l == [ - (user7, 1, "Name:jack"), - (user8, 3, "Name:ed"), - (user9, 0, "Name:fred") - ] if __name__ == "__main__": diff --git a/test/orm/query.py b/test/orm/query.py new file mode 100644 index 0000000000..f3172a11d1 --- /dev/null +++ b/test/orm/query.py @@ -0,0 +1,347 @@ +from sqlalchemy import * +from sqlalchemy.orm import * +import testbase +from fixtures import * + +class Base(object): + def __init__(self, **kwargs): + for k in kwargs: + setattr(self, k, kwargs[k]) + + def __ne__(self, other): + return not self.__eq__(other) + + def __eq__(self, other): + """'passively' compare this object to another. + + only look at attributes that are present on the source object. + + """ + # use __dict__ to avoid instrumented properties + for attr in self.__dict__.keys(): + if attr[0] == '_': + continue + value = getattr(self, attr) + if hasattr(value, '__iter__') and not isinstance(value, basestring): + if len(value) == 0: + continue + for (us, them) in zip(value, getattr(other, attr)): + if us != them: + return False + else: + continue + else: + if value is not None: + if value != getattr(other, attr): + return False + else: + return True + +class QueryTest(testbase.ORMTest): + keep_mappers = True + keep_data = True + + def setUpAll(self): + super(QueryTest, self).setUpAll() + install_fixture_data() + self.setup_mappers() + + def tearDownAll(self): + clear_mappers() + super(QueryTest, self).tearDownAll() + + def define_tables(self, meta): + # a slight dirty trick here. + meta.tables = metadata.tables + metadata.connect(meta.engine) + + def setup_mappers(self): + mapper(User, users, properties={ + 'addresses':relation(Address), + 'orders':relation(Order, backref='user'), # o2m, m2o + }) + mapper(Address, addresses) + mapper(Order, orders, properties={ + 'items':relation(Item, secondary=order_items), #m2m + 'address':relation(Address), # m2o + }) + mapper(Item, items, properties={ + 'keywords':relation(Keyword, secondary=item_keywords) #m2m + }) + mapper(Keyword, keywords) + + +class GetTest(QueryTest): + def test_get(self): + s = create_session() + assert s.query(User).get(19) is None + u = s.query(User).get(7) + u2 = s.query(User).get(7) + assert u is u2 + s.clear() + u2 = s.query(User).get(7) + assert u is not u2 + + def test_unicode(self): + """test that Query.get properly sets up the type for the bind parameter. using unicode would normally fail + on postgres, mysql and oracle unless it is converted to an encoded string""" + + table = Table('unicode_data', users.metadata, + Column('id', Unicode(40), primary_key=True), + Column('data', Unicode(40))) + table.create() + ustring = 'petit voix m\xe2\x80\x99a '.decode('utf-8') + table.insert().execute(id=ustring, data=ustring) + class LocalFoo(Base):pass + mapper(LocalFoo, table) + assert create_session().query(LocalFoo).get(ustring) == LocalFoo(id=ustring, data=ustring) + +class CompileTest(QueryTest): + def test_deferred(self): + session = create_session() + s = session.query(User).filter(and_(addresses.c.email_address == bindparam('emailad'), addresses.c.user_id==users.c.id)).compile() + + l = session.query(User).instances(s.execute(emailad = 'jack@bean.com')) + assert [User(id=7)] == l + +class SliceTest(QueryTest): + def test_first(self): + assert User(id=7) == create_session().query(User).first() + + assert create_session().query(User).filter(users.c.id==27).first() is None + +class FilterTest(QueryTest): + def test_basic(self): + assert [User(id=7), User(id=8), User(id=9),User(id=10)] == create_session().query(User).all() + + def test_onefilter(self): + assert [User(id=8), User(id=9)] == create_session().query(User).filter(users.c.name.endswith('ed')).all() + + +class ParentTest(QueryTest): + def test_o2m(self): + sess = create_session() + q = sess.query(User) + + u1 = q.filter_by(name='jack').one() + + # test auto-lookup of property + o = sess.query(Order).with_parent(u1).all() + assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o + + # test with explicit property + o = sess.query(Order).with_parent(u1, property='orders').all() + assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o + + # test static method + o = Query.query_from_parent(u1, property='orders', session=sess).all() + assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o + + # test generative criterion + o = sess.query(Order).with_parent(u1).filter(orders.c.id>2).all() + assert [Order(description="order 3"), Order(description="order 5")] == o + + def test_noparent(self): + sess = create_session() + q = sess.query(User) + + u1 = q.filter_by(name='jack').one() + + try: + q = sess.query(Item).with_parent(u1) + assert False + except exceptions.InvalidRequestError, e: + assert str(e) == "Could not locate a property which relates instances of class 'Item' to instances of class 'User'" + + def test_m2m(self): + sess = create_session() + i1 = sess.query(Item).filter_by(id=2).one() + k = sess.query(Keyword).with_parent(i1).all() + assert [Keyword(name='red'), Keyword(name='small'), Keyword(name='square')] == k + + +class JoinTest(QueryTest): + def test_overlapping_paths(self): + # load a user who has an order that contains item id 3 and address id 1 (order 3, owned by jack) + result = create_session().query(User).join(['orders', 'items']).filter_by(id=3).reset_joinpoint().join(['orders','address']).filter_by(id=1).all() + assert [User(id=7, name='jack')] == result + + def test_overlapping_paths_outerjoin(self): + result = create_session().query(User).outerjoin(['orders', 'items']).filter_by(id=3).reset_joinpoint().outerjoin(['orders','address']).filter_by(id=1).all() + assert [User(id=7, name='jack')] == result + + def test_overlap_with_aliases(self): + oalias = orders.alias('oalias') + + result = create_session().query(User).select_from(users.join(oalias)).filter(oalias.c.description.in_("order 1", "order 2", "order 3")).join(['orders', 'items']).all() + assert [User(id=7, name='jack'), User(id=9, name='fred')] == result + + result = create_session().query(User).select_from(users.join(oalias)).filter(oalias.c.description.in_("order 1", "order 2", "order 3")).join(['orders', 'items']).filter_by(id=4).all() + assert [User(id=7, name='jack')] == result + + +class SynonymTest(QueryTest): + keep_mappers = True + keep_data = True + + def setup_mappers(self): + mapper(User, users, properties={ + 'name_syn':synonym('name'), + 'addresses':relation(Address), + 'orders':relation(Order, backref='user'), # o2m, m2o + 'orders_syn':synonym('orders') + }) + mapper(Address, addresses) + mapper(Order, orders, properties={ + 'items':relation(Item, secondary=order_items), #m2m + 'address':relation(Address), # m2o + 'items_syn':synonym('items') + }) + mapper(Item, items, properties={ + 'keywords':relation(Keyword, secondary=item_keywords) #m2m + }) + mapper(Keyword, keywords) + + def test_joins(self): + for j in ( + ['orders', 'items'], + ['orders_syn', 'items'], + ['orders', 'items_syn'], + ['orders_syn', 'items_syn'], + ): + result = create_session().query(User).join(j).filter_by(id=3).all() + assert [User(id=7, name='jack'), User(id=9, name='fred')] == result + + def test_with_parent(self): + for nameprop, orderprop in ( + ('name', 'orders'), + ('name_syn', 'orders'), + ('name', 'orders_syn'), + ('name_syn', 'orders_syn'), + ): + sess = create_session() + q = sess.query(User) + + u1 = q.filter_by(**{nameprop:'jack'}).one() + + o = sess.query(Order).with_parent(u1, property=orderprop).all() + assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o + +class InstancesTest(QueryTest): + + def test_from_alias(self): + + query = users.select(users.c.id==7).union(users.select(users.c.id>7)).alias('ulist').outerjoin(addresses).select(use_labels=True,order_by=['ulist.id', addresses.c.id]) + q = create_session().query(User) + + def go(): + l = q.options(contains_alias('ulist'), contains_eager('addresses')).instances(query.execute()) + assert fixtures.user_address_result == l + self.assert_sql_count(testbase.db, go, 1) + + def test_contains_eager(self): + + selectquery = users.outerjoin(addresses).select(use_labels=True, order_by=[users.c.id, addresses.c.id]) + q = create_session().query(User) + + def go(): + l = q.options(contains_eager('addresses')).instances(selectquery.execute()) + assert fixtures.user_address_result == l + self.assert_sql_count(testbase.db, go, 1) + + def test_contains_eager_alias(self): + adalias = addresses.alias('adalias') + selectquery = users.outerjoin(adalias).select(use_labels=True, order_by=[users.c.id, adalias.c.id]) + q = create_session().query(User) + + def go(): + # test using a string alias name + l = q.options(contains_eager('addresses', alias="adalias")).instances(selectquery.execute()) + assert fixtures.user_address_result == l + self.assert_sql_count(testbase.db, go, 1) + + def go(): + # test using the Alias object itself + l = q.options(contains_eager('addresses', alias=adalias)).instances(selectquery.execute()) + assert fixtures.user_address_result == l + self.assert_sql_count(testbase.db, go, 1) + + def decorate(row): + d = {} + for c in addresses.columns: + d[c] = row[adalias.corresponding_column(c)] + return d + + def go(): + # test using a custom 'decorate' function + l = q.options(contains_eager('addresses', decorator=decorate)).instances(selectquery.execute()) + assert fixtures.user_address_result == l + self.assert_sql_count(testbase.db, go, 1) + + def test_multi_mappers(self): + sess = create_session() + + (user7, user8, user9, user10) = sess.query(User).all() + (address1, address2, address3, address4, address5) = sess.query(Address).all() + + # note the result is a cartesian product + expected = [(user7, address1), + (user8, address2), + (user8, address3), + (user8, address4), + (user9, address5), + (user10, None)] + + selectquery = users.outerjoin(addresses).select(use_labels=True, order_by=[users.c.id, addresses.c.id]) + q = sess.query(User) + l = q.instances(selectquery.execute(), Address) + assert l == expected + + q = sess.query(User) + q = q.add_entity(Address).outerjoin('addresses') + l = q.all() + assert l == expected + + q = sess.query(User).add_entity(Address) + l = q.join('addresses').filter_by(email_address='ed@bettyboop.com').all() + assert l == [(user8, address3)] + + def test_multi_columns(self): + sess = create_session() + (user7, user8, user9, user10) = sess.query(User).all() + expected = [(user7, 1), + (user8, 3), + (user9, 1), + (user10, 0) + ] + + q = sess.query(User) + q = q.group_by([c for c in users.c]).order_by(User.c.id).outerjoin('addresses').add_column(func.count(addresses.c.id).label('count')) + l = q.all() + assert l == expected + + s = select([users, func.count(addresses.c.id).label('count')], from_obj=[users.outerjoin(addresses)], group_by=[c for c in users.c], order_by=users.c.id) + q = sess.query(User) + l = q.instances(s.execute(), "count") + assert l == expected + + @testbase.unsupported('mysql') # only because of "+" operator requiring "concat" in mysql (fix #475) + def test_two_columns(self): + sess = create_session() + (user7, user8, user9, user10) = sess.query(User).all() + expected = [ + (user7, 1, "Name:jack"), + (user8, 3, "Name:ed"), + (user9, 1, "Name:fred"), + (user10, 0, "Name:chuck")] + + s = select([users, func.count(addresses.c.id).label('count'), ("Name:" + users.c.name).label('concat')], from_obj=[users.outerjoin(addresses)], group_by=[c for c in users.c], order_by=[users.c.id]) + q = create_session().query(User) + l = q.instances(s.execute(), "count", "concat") + assert l == expected + + +if __name__ == '__main__': + testbase.main() + +