]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- converted mapper.py unit test to 0.4's four separate mapper.py, query.py, eager_rel...
authorMike Bayer <mike_mp@zzzcomputing.com>
Thu, 12 Jul 2007 18:38:55 +0000 (18:38 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Thu, 12 Jul 2007 18:38:55 +0000 (18:38 +0000)
tests 0.4 forwards compatibility for [ticket:631]
- fixed "reset_joinpoint()" in query to actually work, when the same table appears in two join()s it reuses that
same table as a joinpoint the way 0.4 does.

CHANGES
lib/sqlalchemy/orm/query.py
test/orm/alltests.py
test/orm/eager_relations.py [new file with mode: 0644]
test/orm/fixtures.py [new file with mode: 0644]
test/orm/lazy_relations.py [new file with mode: 0644]
test/orm/mapper.py
test/orm/query.py [new file with mode: 0644]

diff --git a/CHANGES b/CHANGES
index 9914f9295494e929360169aa3fc6ce4aa35851ee..6149e6f2169f541071bca0972a03487977577db9 100644 (file)
--- a/CHANGES
+++ b/CHANGES
       [ticket:597], and are constructed with a thunk instead
 - orm
     - forwards-compatibility with 0.4: added one(), first(), and 
-      all() to Query
+      all() to Query.  almost all Query functionality from 0.4 is
+      present in 0.3.9 for forwards-compat purposes.
+    - reset_joinpoint() really really works this time, promise ! lets
+      you re-join from the root:
+      query.join(['a', 'b']).filter(<crit>).reset_joinpoint().\
+      join(['a', 'c']).filter(<some other crit>).all()
+      in 0.4 all join() calls start from the "root"
     - added synchronization to the mapper() construction step, to avoid
       thread collections when pre-existing mappers are compiling in a 
       different thread [ticket:613]
index 9c3feacf05dada7246c1a31b89dd457bd2535ff1..dd73c2aeecc953f41dd965e7544bd4154d551b63 100644 (file)
@@ -522,29 +522,43 @@ class Query(object):
         """
         return self.filter(self._join_by(args, kwargs, start=self._joinpoint))
 
-    def _join_to(self, prop, outerjoin=False):
+    def _join_to(self, prop, outerjoin=False, start=None):
+        if start is None:
+            start = self._joinpoint
+
         if isinstance(prop, list):
             keys = prop
         else:
-            [keys,p] = self._locate_prop(prop, start=self._joinpoint)
+            [keys,p] = self._locate_prop(prop, start=start)
+
         clause = self._from_obj[-1]
-        mapper = self._joinpoint
+
+        currenttables = [clause]
+        class FindJoinedTables(sql.NoColumnVisitor):
+            def visit_join(self, join):
+                currenttables.append(join.left)
+                currenttables.append(join.right)
+        FindJoinedTables().traverse(clause)
+            
+        mapper = start
         for key in keys:
             prop = mapper.get_property(key, resolve_synonyms=True)
             if prop._is_self_referential():
                 raise exceptions.InvalidRequestError("Self-referential query on '%s' property must be constructed manually using an Alias object for the related table." % str(prop))
-            if outerjoin:
-                if prop.secondary:
-                    clause = clause.outerjoin(prop.secondary, prop.get_join(mapper, primary=True, secondary=False))
-                    clause = clause.outerjoin(prop.select_table, prop.get_join(mapper, primary=False))
-                else:
-                    clause = clause.outerjoin(prop.select_table, prop.get_join(mapper))
-            else:
-                if prop.secondary:
-                    clause = clause.join(prop.secondary, prop.get_join(mapper, primary=True, secondary=False))
-                    clause = clause.join(prop.select_table, prop.get_join(mapper, primary=False))
+            # dont re-join to a table already in our from objects
+            if prop.select_table not in currenttables:
+                if outerjoin:
+                    if prop.secondary:
+                        clause = clause.outerjoin(prop.secondary, prop.get_join(mapper, primary=True, secondary=False))
+                        clause = clause.outerjoin(prop.select_table, prop.get_join(mapper, primary=False))
+                    else:
+                        clause = clause.outerjoin(prop.select_table, prop.get_join(mapper))
                 else:
-                    clause = clause.join(prop.select_table, prop.get_join(mapper))
+                    if prop.secondary:
+                        clause = clause.join(prop.secondary, prop.get_join(mapper, primary=True, secondary=False))
+                        clause = clause.join(prop.select_table, prop.get_join(mapper, primary=False))
+                    else:
+                        clause = clause.join(prop.select_table, prop.get_join(mapper))
             mapper = prop.mapper
         return (clause, mapper)
 
@@ -734,7 +748,7 @@ class Query(object):
         to be released in 0.4."""
         
         q = self._clone()
-        q._joinpoint = q._mapper
+        q._joinpoint = q.mapper
         return q
 
     def select_from(self, from_obj):
index 46eeb3ce92b374cc72d310e6fc75fa791dc94255..35650c136f8623ab03c20ebef8de82b969ea8a83 100644 (file)
@@ -5,6 +5,9 @@ def suite():
     modules_to_test = (
        'orm.attributes',
         'orm.mapper',
+        'orm.query',
+        'orm.lazy_relations',
+        'orm.eager_relations',
         'orm.generative',
         'orm.lazytest1',
         'orm.eagertest1',
diff --git a/test/orm/eager_relations.py b/test/orm/eager_relations.py
new file mode 100644 (file)
index 0000000..37b5ecd
--- /dev/null
@@ -0,0 +1,401 @@
+"""basic tests of eager loaded attributes"""
+
+from sqlalchemy import *
+from sqlalchemy.orm import *
+import testbase
+
+from fixtures import *
+from query import QueryTest
+
+class EagerTest(QueryTest):
+    keep_mappers = False
+
+    def setup_mappers(self):
+        pass
+
+    def test_basic(self):
+        mapper(User, users, properties={
+            'addresses':relation(mapper(Address, addresses), lazy=False)
+        })
+        sess = create_session()
+        q = sess.query(User)
+
+        assert [User(id=7, addresses=[Address(id=1, email_address='jack@bean.com')])] == q.filter(users.c.id == 7).all()
+        assert fixtures.user_address_result == q.all()
+
+    def test_no_orphan(self):
+        """test that an eagerly loaded child object is not marked as an orphan"""
+
+        mapper(User, users, properties={
+            'addresses':relation(Address, cascade="all,delete-orphan", lazy=False)
+        })
+        mapper(Address, addresses)
+
+        sess = create_session()
+        user = sess.query(User).get(7)
+        assert getattr(User, 'addresses').hasparent(user.addresses[0], optimistic=True)
+        assert not class_mapper(Address)._is_orphan(user.addresses[0])
+
+    def test_orderby(self):
+        mapper(User, users, properties = {
+            'addresses':relation(mapper(Address, addresses), lazy=False, order_by=addresses.c.email_address),
+        })
+        q = create_session().query(User)
+        assert [
+            User(id=7, addresses=[
+                Address(id=1)
+            ]), 
+            User(id=8, addresses=[
+                Address(id=3, email_address='ed@bettyboop.com'),
+                Address(id=4, email_address='ed@lala.com'),
+                Address(id=2, email_address='ed@wood.com')
+            ]), 
+            User(id=9, addresses=[
+                Address(id=5)
+            ]), 
+            User(id=10, addresses=[])
+        ] == q.all()
+
+    def test_orderby_secondary(self):
+        """tests that a regular mapper select on a single table can order by a relation to a second table"""
+
+        mapper(Address, addresses)
+
+        mapper(User, users, properties = dict(
+            addresses = relation(Address, lazy=False),
+        ))
+        
+        q = create_session().query(User)
+        l = q.filter(users.c.id==addresses.c.user_id).order_by(addresses.c.email_address).all()
+        
+        assert [
+            User(id=8, addresses=[
+                Address(id=2, email_address='ed@wood.com'),
+                Address(id=3, email_address='ed@bettyboop.com'),
+                Address(id=4, email_address='ed@lala.com'),
+            ]), 
+            User(id=9, addresses=[
+                Address(id=5)
+            ]), 
+            User(id=7, addresses=[
+                Address(id=1)
+            ]), 
+        ] == l
+
+    def test_orderby_desc(self):
+        mapper(Address, addresses)
+
+        mapper(User, users, properties = dict(
+            addresses = relation(Address, lazy=False, order_by=[desc(addresses.c.email_address)]),
+        ))
+        sess = create_session()
+        assert [
+            User(id=7, addresses=[
+                Address(id=1)
+            ]), 
+            User(id=8, addresses=[
+                Address(id=2, email_address='ed@wood.com'),
+                Address(id=4, email_address='ed@lala.com'),
+                Address(id=3, email_address='ed@bettyboop.com'),
+            ]), 
+            User(id=9, addresses=[
+                Address(id=5)
+            ]), 
+            User(id=10, addresses=[])
+        ] == sess.query(User).all()
+
+    def test_many_to_many(self):
+
+        mapper(Keyword, keywords)
+        mapper(Item, items, properties = dict(
+                keywords = relation(Keyword, secondary=item_keywords, lazy=False, order_by=keywords.c.id),
+        ))
+
+        q = create_session().query(Item)
+        def go():
+            assert fixtures.item_keyword_result == q.all()
+        self.assert_sql_count(testbase.db, go, 1)
+        
+        def go():
+            assert fixtures.item_keyword_result[0:2] == q.join('keywords').filter(keywords.c.name == 'red').all()
+        self.assert_sql_count(testbase.db, go, 1)
+
+
+    def test_eager_option(self):
+        mapper(Keyword, keywords)
+        mapper(Item, items, properties = dict(
+                keywords = relation(Keyword, secondary=item_keywords, lazy=True),
+        ))
+
+        q = create_session().query(Item)
+
+        def go():
+            assert fixtures.item_keyword_result[0:2] == q.options(eagerload('keywords')).join('keywords').filter(keywords.c.name == 'red').all()
+            
+        self.assert_sql_count(testbase.db, go, 1)
+
+    def test_cyclical(self):
+        """test that a circular eager relationship breaks the cycle with a lazy loader"""
+        
+        mapper(Address, addresses)
+        mapper(User, users, properties = dict(
+            addresses = relation(Address, lazy=False, backref=backref('user', lazy=False))
+        ))
+        assert class_mapper(User).get_property('addresses').lazy is False
+        assert class_mapper(Address).get_property('user').lazy is False
+        
+        sess = create_session()
+        assert fixtures.user_address_result == sess.query(User).all()
+        
+    def test_double(self):
+        """tests lazy loading with two relations simulatneously, from the same table, using aliases.  """
+        openorders = alias(orders, 'openorders')
+        closedorders = alias(orders, 'closedorders')
+
+        mapper(Address, addresses)
+
+        mapper(User, users, properties = dict(
+            addresses = relation(Address, lazy=False),
+            open_orders = relation(mapper(Order, openorders, entity_name='open'), primaryjoin = and_(openorders.c.isopen == 1, users.c.id==openorders.c.user_id), lazy=False),
+            closed_orders = relation(mapper(Order, closedorders,entity_name='closed'), primaryjoin = and_(closedorders.c.isopen == 0, users.c.id==closedorders.c.user_id), lazy=False)
+        ))
+        q = create_session().query(User)
+
+        def go():
+            assert [
+                User(
+                    id=7,
+                    addresses=[Address(id=1)],
+                    open_orders = [Order(id=3)],
+                    closed_orders = [Order(id=1), Order(id=5)]
+                ),
+                User(
+                    id=8,
+                    addresses=[Address(id=2), Address(id=3), Address(id=4)],
+                    open_orders = [],
+                    closed_orders = []
+                ),
+                User(
+                    id=9,
+                    addresses=[Address(id=5)],
+                    open_orders = [Order(id=4)],
+                    closed_orders = [Order(id=2)]
+                ),
+                User(id=10)
+
+            ] == q.all()
+        self.assert_sql_count(testbase.db, go, 1)
+        
+    def test_limit(self):
+        """test limit operations combined with lazy-load relationships."""
+
+        mapper(Item, items)
+        mapper(Order, orders, properties={
+            'items':relation(Item, secondary=order_items, lazy=False)
+        })
+        mapper(User, users, properties={
+            'addresses':relation(mapper(Address, addresses), lazy=False),
+            'orders':relation(Order, lazy=True)
+        })
+
+        sess = create_session()
+        q = sess.query(User)
+
+        if testbase.db.engine.name == 'mssql':
+            l = q.limit(2).all()
+            assert fixtures.user_all_result[:2] == l
+        else:        
+            l = q.limit(2).offset(1).all()
+            assert fixtures.user_all_result[1:3] == l
+    
+    def test_distinct(self):
+        # this is an involved 3x union of the users table to get a lot of rows.
+        # then see if the "distinct" works its way out.  you actually get the same
+        # result with or without the distinct, just via less or more rows.
+        u2 = users.alias('u2')
+        s = union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u')
+
+        mapper(User, users, properties={
+            'addresses':relation(mapper(Address, addresses), lazy=False),
+        })
+
+        sess = create_session()
+        q = sess.query(User)
+
+        def go():
+            l = q.filter(s.c.u2_id==User.c.id).distinct().all()
+            assert fixtures.user_address_result == l
+        self.assert_sql_count(testbase.db, go, 1)
+        
+    def test_limit_2(self):
+        mapper(Keyword, keywords)
+        mapper(Item, items, properties = dict(
+                keywords = relation(Keyword, secondary=item_keywords, lazy=False, order_by=[keywords.c.id]),
+            ))
+            
+        sess = create_session()
+        q = sess.query(Item)
+        l = q.filter((Item.c.description=='item 2') | (Item.c.description=='item 5') | (Item.c.description=='item 3')).\
+            order_by(Item.c.id).limit(2).all()
+
+        assert fixtures.item_keyword_result[1:3] == l
+        
+    def test_limit_3(self):
+        """test that the ORDER BY is propigated from the inner select to the outer select, when using the 
+        'wrapped' select statement resulting from the combination of eager loading and limit/offset clauses."""
+        
+        mapper(Item, items)
+        mapper(Order, orders, properties = dict(
+                items = relation(Item, secondary=order_items, lazy=False)
+        ))
+
+        mapper(Address, addresses)
+        mapper(User, users, properties = dict(
+            addresses = relation(Address, lazy=False),
+            orders = relation(Order, lazy=False),
+        ))
+        sess = create_session()
+        
+        q = sess.query(User)
+
+        if testbase.db.engine.name != 'mssql':
+            l = q.join('orders').order_by(desc(orders.c.user_id)).limit(2).offset(1)
+            assert [
+                User(id=9, 
+                    orders=[Order(id=2), Order(id=4)],
+                    addresses=[Address(id=5)]
+                ),
+                User(id=7, 
+                    orders=[Order(id=1), Order(id=3), Order(id=5)],
+                    addresses=[Address(id=1)]
+                )
+            ] == l.all()
+
+        l = q.join('addresses').order_by(desc(addresses.c.email_address)).limit(1).offset(0)
+        assert [
+            User(id=7, 
+                orders=[Order(id=1), Order(id=3), Order(id=5)],
+                addresses=[Address(id=1)]
+            )
+        ] == l.all()
+
+    def test_one_to_many_scalar(self):
+        mapper(User, users, properties = dict(
+            address = relation(mapper(Address, addresses), lazy=False, uselist=False)
+        ))
+        q = create_session().query(User)
+        
+        def go():
+            l = q.filter(users.c.id == 7).all()
+            assert [User(id=7, address=Address(id=1))] == l
+        self.assert_sql_count(testbase.db, go, 1)
+
+    def test_many_to_one(self):
+        mapper(Address, addresses, properties = dict(
+            user = relation(mapper(User, users), lazy=False)
+        ))
+        sess = create_session()
+        q = sess.query(Address)
+        
+        def go():
+            a = q.filter(addresses.c.id==1).one()
+            assert a.user is not None
+            u1 = sess.query(User).get(7)
+            assert a.user is u1
+        self.assert_sql_count(testbase.db, go, 1)
+        
+
+    def test_one_and_many(self):
+        """tests eager load for a parent object with a child object that 
+        contains a many-to-many relationship to a third object."""
+        
+        mapper(User, users, properties={
+            'orders':relation(Order, lazy=False)
+        })
+        mapper(Item, items) 
+        mapper(Order, orders, properties = dict(
+                items = relation(Item, secondary=order_items, lazy=False, order_by=items.c.id)
+            ))
+            
+        q = create_session().query(User)
+        
+        l = q.filter("users.id in (7, 8, 9)")
+        
+        def go():
+            assert fixtures.user_order_result[0:3] == l.all()
+        self.assert_sql_count(testbase.db, go, 1)
+
+    def test_double_with_aggregate(self):
+
+        max_orders_by_user = select([func.max(orders.c.id).label('order_id')], group_by=[orders.c.user_id]).alias('max_orders_by_user')
+        
+        max_orders = orders.select(orders.c.id==max_orders_by_user.c.order_id).alias('max_orders')
+        
+        mapper(Order, orders)
+        mapper(User, users, properties={
+               'orders':relation(Order, backref='user', lazy=False),
+               'max_order':relation(mapper(Order, max_orders, non_primary=True), lazy=False, uselist=False)
+               })
+        q = create_session().query(User)
+        
+        def go():
+            assert [
+                User(id=7, orders=[
+                        Order(id=1),
+                        Order(id=3),
+                        Order(id=5),
+                    ], 
+                    max_order=Order(id=5)
+                ),
+                User(id=8, orders=[]),
+                User(id=9, orders=[Order(id=2),Order(id=4)], 
+                    max_order=Order(id=4)
+                ),
+                User(id=10),
+            ] == q.all()
+        self.assert_sql_count(testbase.db, go, 1)
+
+    def test_wide(self):
+        mapper(Order, orders, properties={'items':relation(Item, secondary=order_items, lazy=False, order_by=items.c.id)})
+        mapper(Item, items)
+        mapper(User, users, properties = dict(
+            addresses = relation(mapper(Address, addresses), lazy = False),
+            orders = relation(Order, lazy = False),
+        ))
+        q = create_session().query(User)
+        l = q.select()
+        assert fixtures.user_all_result == q.all()
+
+    def test_against_select(self):
+        """test eager loading of a mapper which is against a select"""
+
+        s = select([orders], orders.c.isopen==1).alias('openorders')
+        
+        mapper(Order, s, properties={
+            'user':relation(User, lazy=False)
+        })
+        mapper(User, users)
+
+        q = create_session().query(Order)
+        assert [
+            Order(id=3, user=User(id=7)),
+            Order(id=4, user=User(id=9))
+        ] == q.all()
+        
+        q = q.select_from(s.join(order_items).join(items)).filter(~items.c.id.in_(1, 2, 5))
+        assert [
+            Order(id=3, user=User(id=7)),
+        ] == q.all()
+
+    def test_aliasing(self):
+        """test that eager loading uses aliases to insulate the eager load from regular criterion against those tables."""
+        
+        mapper(User, users, properties = dict(
+            addresses = relation(mapper(Address, addresses), lazy=False)
+        ))
+        q = create_session().query(User)
+        l = q.filter(addresses.c.email_address == 'ed@lala.com').filter(addresses.c.user_id==users.c.id)
+        assert fixtures.user_address_result[1:2] == l.all()
+
+if __name__ == '__main__':
+    testbase.main()
diff --git a/test/orm/fixtures.py b/test/orm/fixtures.py
new file mode 100644 (file)
index 0000000..aed74c1
--- /dev/null
@@ -0,0 +1,229 @@
+from sqlalchemy import *
+
+_recursion_stack = util.Set()
+class Base(object):
+    def __init__(self, **kwargs):
+        for k in kwargs:
+            setattr(self, k, kwargs[k])
+            
+    def __ne__(self, other):
+        return not self.__eq__(other)
+        
+    def __eq__(self, other):
+        """'passively' compare this object to another.
+        
+        only look at attributes that are present on the source object.
+        
+        """
+        
+        if self in _recursion_stack:
+            return True
+        _recursion_stack.add(self)
+        try:
+            # use __dict__ to avoid instrumented properties
+            for attr in self.__dict__.keys():
+                if attr[0] == '_':
+                    continue
+                value = getattr(self, attr)
+                if hasattr(value, '__iter__') and not isinstance(value, basestring):
+                    if len(value) == 0:
+                        continue
+                    for (us, them) in zip(value, getattr(other, attr)):
+                        if us != them:
+                            return False
+                    else:
+                        continue
+                else:
+                    if value is not None:
+                        if value != getattr(other, attr):
+                            return False
+            else:
+                return True
+        finally:
+            _recursion_stack.remove(self)
+            
+class User(Base):pass
+class Order(Base):pass
+class Item(Base):pass
+class Keyword(Base):pass
+class Address(Base):pass
+
+metadata = MetaData()
+
+users = Table('users', metadata,
+    Column('id', Integer, primary_key=True),
+    Column('name', String(30), nullable=False))
+
+orders = Table('orders', metadata,
+    Column('id', Integer, primary_key=True),
+    Column('user_id', None, ForeignKey('users.id')),
+    Column('address_id', None, ForeignKey('addresses.id')),
+    Column('description', String(30)),
+    Column('isopen', Integer)
+    )
+
+addresses = Table('addresses', metadata, 
+    Column('id', Integer, primary_key=True),
+    Column('user_id', None, ForeignKey('users.id')),
+    Column('email_address', String(50), nullable=False))
+
+items = Table('items', metadata, 
+    Column('id', Integer, primary_key=True),
+    Column('description', String(30), nullable=False)
+    )
+
+order_items = Table('order_items', metadata,
+    Column('item_id', None, ForeignKey('items.id')),
+    Column('order_id', None, ForeignKey('orders.id')))
+
+item_keywords = Table('item_keywords', metadata, 
+    Column('item_id', None, ForeignKey('items.id')),
+    Column('keyword_id', None, ForeignKey('keywords.id')))
+
+keywords = Table('keywords', metadata, 
+    Column('id', Integer, primary_key=True),
+    Column('name', String(30), nullable=False)
+    )
+
+def install_fixture_data():
+    users.insert().execute(
+        dict(id = 7, name = 'jack'),
+        dict(id = 8, name = 'ed'),
+        dict(id = 9, name = 'fred'),
+        dict(id = 10, name = 'chuck'),
+
+    )
+    addresses.insert().execute(
+        dict(id = 1, user_id = 7, email_address = "jack@bean.com"),
+        dict(id = 2, user_id = 8, email_address = "ed@wood.com"),
+        dict(id = 3, user_id = 8, email_address = "ed@bettyboop.com"),
+        dict(id = 4, user_id = 8, email_address = "ed@lala.com"),
+        dict(id = 5, user_id = 9, email_address = "fred@fred.com"),
+    )
+    orders.insert().execute(
+        dict(id = 1, user_id = 7, description = 'order 1', isopen=0, address_id=1),
+        dict(id = 2, user_id = 9, description = 'order 2', isopen=0, address_id=4),
+        dict(id = 3, user_id = 7, description = 'order 3', isopen=1, address_id=1),
+        dict(id = 4, user_id = 9, description = 'order 4', isopen=1, address_id=4),
+        dict(id = 5, user_id = 7, description = 'order 5', isopen=0, address_id=1)
+    )
+    items.insert().execute(
+        dict(id=1, description='item 1'),
+        dict(id=2, description='item 2'),
+        dict(id=3, description='item 3'),
+        dict(id=4, description='item 4'),
+        dict(id=5, description='item 5'),
+    )
+    order_items.insert().execute(
+        dict(item_id=1, order_id=1),
+        dict(item_id=2, order_id=1),
+        dict(item_id=3, order_id=1),
+
+        dict(item_id=1, order_id=2),
+        dict(item_id=2, order_id=2),
+        dict(item_id=3, order_id=2),
+
+        dict(item_id=3, order_id=3),
+        dict(item_id=4, order_id=3),
+        dict(item_id=5, order_id=3),
+
+        dict(item_id=1, order_id=4),
+        dict(item_id=5, order_id=4),
+
+        dict(item_id=5, order_id=5),
+    )
+    keywords.insert().execute(
+        dict(id=1, name='blue'),
+        dict(id=2, name='red'),
+        dict(id=3, name='green'),
+        dict(id=4, name='big'),
+        dict(id=5, name='small'),
+        dict(id=6, name='round'),
+        dict(id=7, name='square')
+    )
+
+    # this many-to-many table has the keywords inserted
+    # in primary key order, to appease the unit tests.
+    # this is because postgres, oracle, and sqlite all support 
+    # true insert-order row id, but of course our pal MySQL does not,
+    # so the best it can do is order by, well something, so there you go.
+    item_keywords.insert().execute(
+        dict(keyword_id=2, item_id=1),
+        dict(keyword_id=2, item_id=2),
+        dict(keyword_id=4, item_id=1),
+        dict(keyword_id=6, item_id=1),
+        dict(keyword_id=5, item_id=2),
+        dict(keyword_id=3, item_id=3),
+        dict(keyword_id=4, item_id=3),
+        dict(keyword_id=7, item_id=2),
+        dict(keyword_id=6, item_id=3)
+    )
+
+class Fixtures(object):
+    @property
+    def user_address_result(self):
+        return [
+            User(id=7, addresses=[
+                Address(id=1)
+            ]), 
+            User(id=8, addresses=[
+                Address(id=2, email_address='ed@wood.com'),
+                Address(id=3, email_address='ed@bettyboop.com'),
+                Address(id=4, email_address='ed@lala.com'),
+            ]), 
+            User(id=9, addresses=[
+                Address(id=5)
+            ]), 
+            User(id=10, addresses=[])
+        ]
+
+    @property
+    def user_all_result(self):
+        return [
+            User(id=7, addresses=[
+                Address(id=1)
+            ], orders=[
+                Order(description='order 1', items=[Item(description='item 1'), Item(description='item 2'), Item(description='item 3')]),
+                Order(description='order 3'),
+                Order(description='order 5'),
+            ]), 
+            User(id=8, addresses=[
+                Address(id=2),
+                Address(id=3),
+                Address(id=4)
+            ]), 
+            User(id=9, addresses=[
+                Address(id=5)
+            ], orders=[
+                Order(description='order 2', items=[Item(description='item 1'), Item(description='item 2'), Item(description='item 3')]),
+                Order(description='order 4', items=[Item(description='item 1'), Item(description='item 5')]),
+            ]), 
+            User(id=10, addresses=[])
+        ]
+
+    @property
+    def user_order_result(self):
+        return [
+            User(id=7, orders=[
+                Order(id=1, items=[Item(id=1), Item(id=2), Item(id=3)]),
+                Order(id=3, items=[Item(id=3), Item(id=4), Item(id=5)]),
+                Order(id=5, items=[Item(id=5)]),
+            ]),
+            User(id=8, orders=[]),
+            User(id=9, orders=[
+                Order(id=2, items=[Item(id=1), Item(id=2), Item(id=3)]),
+                Order(id=4, items=[Item(id=1), Item(id=5)]),
+            ]),
+            User(id=10)
+        ] 
+    
+    @property
+    def item_keyword_result(self):
+        return [
+            Item(id=1, keywords=[Keyword(name='red'), Keyword(name='big'), Keyword(name='round')]),
+            Item(id=2, keywords=[Keyword(name='red'), Keyword(name='small'), Keyword(name='square')]),
+            Item(id=3, keywords=[Keyword(name='green'), Keyword(name='big'), Keyword(name='round')]),
+            Item(id=4, keywords=[]),
+            Item(id=5, keywords=[]),
+        ]
+fixtures = Fixtures()
diff --git a/test/orm/lazy_relations.py b/test/orm/lazy_relations.py
new file mode 100644 (file)
index 0000000..5accb74
--- /dev/null
@@ -0,0 +1,265 @@
+"""basic tests of lazy loaded attributes"""
+
+from sqlalchemy import *
+from sqlalchemy.orm import *
+import testbase
+
+from fixtures import *
+from query import QueryTest
+
+class LazyTest(QueryTest):
+    keep_mappers = False
+
+    def setup_mappers(self):
+        pass
+        
+    def test_basic(self):
+        mapper(User, users, properties={
+            'addresses':relation(mapper(Address, addresses), lazy=True)
+        })
+        sess = create_session()
+        q = sess.query(User)
+        assert [User(id=7, addresses=[Address(id=1, email_address='jack@bean.com')])] == q.filter(users.c.id == 7).all()
+
+    def test_bindstosession(self):
+        """test that lazy loaders use the mapper's contextual session if the parent instance
+        is not in a session, and that an error is raised if no contextual session"""
+        
+        from sqlalchemy.ext.sessioncontext import SessionContext
+        ctx = SessionContext(create_session)
+        m = mapper(User, users, properties = dict(
+            addresses = relation(mapper(Address, addresses, extension=ctx.mapper_extension), lazy=True)
+        ), extension=ctx.mapper_extension)
+        q = ctx.current.query(m)
+        u = q.filter(users.c.id == 7).first()
+        ctx.current.expunge(u)
+        assert User(id=7, addresses=[Address(id=1, email_address='jack@bean.com')]) == u
+
+        clear_mappers()
+
+        mapper(User, users, properties={
+            'addresses':relation(mapper(Address, addresses), lazy=True)
+        })
+        try:
+            sess = create_session()
+            q = sess.query(User)
+            u = q.filter(users.c.id == 7).first()
+            sess.expunge(u)
+            assert User(id=7, addresses=[Address(id=1, email_address='jack@bean.com')]) == u
+        except exceptions.InvalidRequestError, err:
+            assert "not bound to a Session, and no contextual session" in str(err)
+
+    def test_orderby(self):
+        mapper(User, users, properties = {
+            'addresses':relation(mapper(Address, addresses), lazy=True, order_by=addresses.c.email_address),
+        })
+        q = create_session().query(User)
+        assert [
+            User(id=7, addresses=[
+                Address(id=1)
+            ]), 
+            User(id=8, addresses=[
+                Address(id=3, email_address='ed@bettyboop.com'),
+                Address(id=4, email_address='ed@lala.com'),
+                Address(id=2, email_address='ed@wood.com')
+            ]), 
+            User(id=9, addresses=[
+                Address(id=5)
+            ]), 
+            User(id=10, addresses=[])
+        ] == q.all()
+        
+    def test_orderby_secondary(self):
+        """tests that a regular mapper select on a single table can order by a relation to a second table"""
+        
+        mapper(Address, addresses)
+
+        mapper(User, users, properties = dict(
+            addresses = relation(Address, lazy=True),
+        ))
+        q = create_session().query(User)
+        l = q.filter(users.c.id==addresses.c.user_id).order_by(addresses.c.email_address).all()
+        assert [
+            User(id=8, addresses=[
+                Address(id=2, email_address='ed@wood.com'),
+                Address(id=3, email_address='ed@bettyboop.com'),
+                Address(id=4, email_address='ed@lala.com'),
+            ]), 
+            User(id=9, addresses=[
+                Address(id=5)
+            ]), 
+            User(id=7, addresses=[
+                Address(id=1)
+            ]), 
+        ] == l
+
+    def test_orderby_desc(self):
+        mapper(Address, addresses)
+
+        mapper(User, users, properties = dict(
+            addresses = relation(Address, lazy=True,  order_by=[desc(addresses.c.email_address)]),
+        ))
+        sess = create_session()
+        assert [
+            User(id=7, addresses=[
+                Address(id=1)
+            ]), 
+            User(id=8, addresses=[
+                Address(id=2, email_address='ed@wood.com'),
+                Address(id=4, email_address='ed@lala.com'),
+                Address(id=3, email_address='ed@bettyboop.com'),
+            ]), 
+            User(id=9, addresses=[
+                Address(id=5)
+            ]), 
+            User(id=10, addresses=[])
+        ] == sess.query(User).all()
+
+    def test_no_orphan(self):
+        """test that a lazily loaded child object is not marked as an orphan"""
+
+        mapper(User, users, properties={
+            'addresses':relation(Address, cascade="all,delete-orphan", lazy=True)
+        })
+        mapper(Address, addresses)
+
+        sess = create_session()
+        user = sess.query(User).get(7)
+        assert getattr(User, 'addresses').hasparent(user.addresses[0], optimistic=True)
+        assert not class_mapper(Address)._is_orphan(user.addresses[0])
+
+    def test_limit(self):
+        """test limit operations combined with lazy-load relationships."""
+        
+        mapper(Item, items)
+        mapper(Order, orders, properties={
+            'items':relation(Item, secondary=order_items, lazy=True)
+        })
+        mapper(User, users, properties={
+            'addresses':relation(mapper(Address, addresses), lazy=True),
+            'orders':relation(Order, lazy=True)
+        })
+
+        sess = create_session()
+        q = sess.query(User)
+
+        if testbase.db.engine.name == 'mssql':
+            l = q.limit(2).all()
+            assert fixtures.user_all_result[:2] == l
+        else:        
+            l = q.limit(2).offset(1).all()
+            assert fixtures.user_all_result[1:3] == l
+
+    def test_distinct(self):
+        mapper(Item, items)
+        mapper(Order, orders, properties={
+            'items':relation(Item, secondary=order_items, lazy=True)
+        })
+        mapper(User, users, properties={
+            'addresses':relation(mapper(Address, addresses), lazy=True),
+            'orders':relation(Order, lazy=True)
+        })
+
+        sess = create_session()
+        q = sess.query(User)
+
+        # use a union all to get a lot of rows to join against
+        u2 = users.alias('u2')
+        s = union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u')
+        print [key for key in s.c.keys()]
+        l = q.filter(s.c.u2_id==User.c.id).distinct().all()
+        assert fixtures.user_all_result == l
+
+    def test_one_to_many_scalar(self):
+        mapper(User, users, properties = dict(
+            address = relation(mapper(Address, addresses), lazy=True, uselist=False)
+        ))
+        q = create_session().query(User)
+        l = q.filter(users.c.id == 7).all()
+        assert [User(id=7, address=Address(id=1))] == l
+
+    def test_double(self):
+        """tests lazy loading with two relations simulatneously, from the same table, using aliases.  """
+        openorders = alias(orders, 'openorders')
+        closedorders = alias(orders, 'closedorders')
+
+        mapper(Address, addresses)
+        
+        mapper(User, users, properties = dict(
+            addresses = relation(Address, lazy = True),
+            open_orders = relation(mapper(Order, openorders, entity_name='open'), primaryjoin = and_(openorders.c.isopen == 1, users.c.id==openorders.c.user_id), lazy=True),
+            closed_orders = relation(mapper(Order, closedorders,entity_name='closed'), primaryjoin = and_(closedorders.c.isopen == 0, users.c.id==closedorders.c.user_id), lazy=True)
+        ))
+        q = create_session().query(User)
+
+        assert [
+            User(
+                id=7,
+                addresses=[Address(id=1)],
+                open_orders = [Order(id=3)],
+                closed_orders = [Order(id=1), Order(id=5)]
+            ),
+            User(
+                id=8,
+                addresses=[Address(id=2), Address(id=3), Address(id=4)],
+                open_orders = [],
+                closed_orders = []
+            ),
+            User(
+                id=9,
+                addresses=[Address(id=5)],
+                open_orders = [Order(id=4)],
+                closed_orders = [Order(id=2)]
+            ),
+            User(id=10)
+        
+        ] == q.all()
+
+    def test_many_to_many(self):
+
+        mapper(Keyword, keywords)
+        mapper(Item, items, properties = dict(
+                keywords = relation(Keyword, secondary=item_keywords, lazy=True),
+        ))
+        
+        q = create_session().query(Item)
+        assert fixtures.item_keyword_result == q.all()
+
+        assert fixtures.item_keyword_result[0:2] == q.join('keywords').filter(keywords.c.name == 'red').all()
+
+    def test_uses_get(self):
+        """test that a simple many-to-one lazyload optimizes to use query.get()."""
+
+        mapper(Address, addresses, properties = dict(
+            user = relation(mapper(User, users), lazy=True)
+        ))
+        
+        sess = create_session()
+        
+        # load address
+        a1 = sess.query(Address).filter_by(email_address="ed@wood.com").one()
+        
+        # load user that is attached to the address
+        u1 = sess.query(User).get(8)
+        
+        def go():
+            # lazy load of a1.user should get it from the session
+            assert a1.user is u1
+        self.assert_sql_count(testbase.db, go, 0)
+
+    def test_many_to_one(self):
+        mapper(Address, addresses, properties = dict(
+            user = relation(mapper(User, users), lazy=True)
+        ))
+        sess = create_session()
+        q = sess.query(Address)
+        a = q.filter(addresses.c.id==1).one()
+
+        assert a.user is not None
+        
+        u1 = sess.query(User).get(7)
+        
+        assert a.user is u1
+
+if __name__ == '__main__':
+    testbase.main()
index 197f988b7aeff89df8e2cdd221cb0b72f31da928..f366d4866efd354d95a9734e9984859e06b4f6a0 100644 (file)
@@ -990,742 +990,6 @@ class NoLoadTest(MapperSuperTest):
             )
 
 
-class LazyTest(MapperSuperTest):
-
-    def testbasic(self):
-        """tests a basic one-to-many lazy load"""
-        m = mapper(User, users, properties = dict(
-            addresses = relation(mapper(Address, addresses), lazy = True)
-        ))
-        q = create_session().query(m)
-        l = q.select(users.c.user_id == 7)
-        self.assert_result(l, User,
-            {'user_id' : 7, 'addresses' : (Address, [{'address_id' : 1}])},
-            )
-
-    def testbindstosession(self):
-        ctx = SessionContext(create_session)
-        m = mapper(User, users, properties = dict(
-            addresses = relation(mapper(Address, addresses, extension=ctx.mapper_extension), lazy=True)
-        ), extension=ctx.mapper_extension)
-        q = ctx.current.query(m)
-        u = q.filter(users.c.user_id == 7).selectfirst()
-        ctx.current.expunge(u)
-        self.assert_result([u], User,
-            {'user_id' : 7, 'addresses' : (Address, [{'address_id' : 1}])},
-            )
-    
-    def testcreateinstance(self):
-        class Ext(MapperExtension):
-            def create_instance(self, *args, **kwargs):
-                return User()
-        m = mapper(Address, addresses)
-        m = mapper(User, users, extension=Ext(), properties = dict(
-            addresses = relation(Address, lazy=True),
-        ))
-        
-        q = create_session().query(m)
-        l = q.select();
-        self.assert_result(l, User, *user_address_result)
-        
-    def testorderby(self):
-        m = mapper(Address, addresses)
-
-        m = mapper(User, users, properties = dict(
-            addresses = relation(m, lazy = True, order_by=addresses.c.email_address),
-        ))
-        q = create_session().query(m)
-        l = q.select()
-
-        self.assert_result(l, User,
-            {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])},
-            {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@bettyboop.com'}, {'email_address':'ed@lala.com'}, {'email_address':'ed@wood.com'}])},
-            {'user_id' : 9, 'addresses' : (Address, [])}
-            )
-
-    def testorderby_select(self):
-        """tests that a regular mapper select on a single table can order by a relation to a second table"""
-        m = mapper(Address, addresses)
-
-        m = mapper(User, users, properties = dict(
-            addresses = relation(m, lazy = True),
-        ))
-        q = create_session().query(m)
-        l = q.select(users.c.user_id==addresses.c.user_id, order_by=addresses.c.email_address)
-
-        self.assert_result(l, User,
-            {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@wood.com'}, {'email_address':'ed@bettyboop.com'}, {'email_address':'ed@lala.com'}, ])},
-            {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])},
-        )
-        
-    def testorderby_desc(self):
-        m = mapper(Address, addresses)
-
-        m = mapper(User, users, properties = dict(
-            addresses = relation(m, lazy = True, order_by=[desc(addresses.c.email_address)]),
-        ))
-        q = create_session().query(m)
-        l = q.select()
-
-        self.assert_result(l, User,
-            {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])},
-            {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@wood.com'}, {'email_address':'ed@lala.com'}, {'email_address':'ed@bettyboop.com'}])},
-            {'user_id' : 9, 'addresses' : (Address, [])},
-            )
-
-    def testorphanstate(self):
-        """test that a lazily loaded child object is not marked as an orphan"""
-        m = mapper(User, users, properties={
-            'addresses':relation(Address, cascade="all,delete-orphan", lazy=True)
-        })
-        mapper(Address, addresses)
-
-        q = create_session().query(m)
-        user = q.get(7)
-        assert getattr(User, 'addresses').hasparent(user.addresses[0], optimistic=True)
-        assert not class_mapper(Address)._is_orphan(user.addresses[0])
-        
-    def testlimit(self):
-        ordermapper = mapper(Order, orders, properties = dict(
-                items = relation(mapper(Item, orderitems), lazy = True)
-            ))
-
-        m = mapper(User, users, properties = dict(
-            addresses = relation(mapper(Address, addresses), lazy = True),
-            orders = relation(ordermapper, primaryjoin = users.c.user_id==orders.c.user_id, lazy = True),
-        ))
-        sess= create_session()
-        q = sess.query(m)
-        
-        if db.engine.name == 'mssql':
-            l = q.select(limit=2)
-            self.assert_result(l, User, *user_all_result[:2])
-        else:        
-            l = q.select(limit=2, offset=1)
-            self.assert_result(l, User, *user_all_result[1:3])
-
-        # use a union all to get a lot of rows to join against
-        u2 = users.alias('u2')
-        s = union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u')
-        print [key for key in s.c.keys()]
-        l = q.select(s.c.u2_user_id==User.c.user_id, distinct=True)
-        self.assert_result(l, User, *user_all_result)
-        
-        sess.clear()
-        clear_mappers()
-        m = mapper(Item, orderitems, properties = dict(
-                keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = True),
-            ))
-        
-        l = sess.query(m).select((Item.c.item_name=='item 2') | (Item.c.item_name=='item 5') | (Item.c.item_name=='item 3'), order_by=[Item.c.item_id], limit=2)        
-        self.assert_result(l, Item, *[item_keyword_result[1], item_keyword_result[2]])
-
-    def testonetoone(self):
-        m = mapper(User, users, properties = dict(
-            address = relation(mapper(Address, addresses), lazy = True, uselist = False)
-        ))
-        q = create_session().query(m)
-        l = q.select(users.c.user_id == 7)
-        self.assert_result(l, User, {'user_id':7, 'address' : (Address, {'address_id':1})})
-
-    def testbackwardsonetoone(self):
-        m = mapper(Address, addresses, properties = dict(
-            user = relation(mapper(User, users), lazy = True)
-        ))
-        q = create_session().query(m)
-        l = q.select(addresses.c.address_id == 1)
-        self.echo(repr(l))
-        print repr(l[0].__dict__)
-        self.echo(repr(l[0].user))
-        self.assert_(l[0].user is not None)
-
-    def testuseget(self):
-        """test that a simple many-to-one lazyload optimizes to use query.get().
-        
-        this is done currently by comparing the 'get' SQL clause of the query
-        to the 'lazy' SQL clause of the lazy loader, so it relies heavily on 
-        ClauseElement.compare()"""
-        
-        m = mapper(Address, addresses, properties = dict(
-            user = relation(mapper(User, users), lazy = True)
-        ))
-        sess = create_session()
-        a1 = sess.query(Address).get_by(email_address = "ed@wood.com")
-        u1 = sess.query(User).get(8)
-        def go():
-            assert a1.user is u1
-        self.assert_sql_count(db, go, 0)
-
-    def testdouble(self):
-        """tests lazy loading with two relations simulatneously, from the same table, using aliases.  """
-        openorders = alias(orders, 'openorders')
-        closedorders = alias(orders, 'closedorders')
-        m = mapper(User, users, properties = dict(
-            addresses = relation(mapper(Address, addresses), lazy = True),
-            open_orders = relation(mapper(Order, openorders, entity_name='open'), primaryjoin = and_(openorders.c.isopen == 1, users.c.user_id==openorders.c.user_id), lazy = True),
-            closed_orders = relation(mapper(Order, closedorders,entity_name='closed'), primaryjoin = and_(closedorders.c.isopen == 0, users.c.user_id==closedorders.c.user_id), lazy = True)
-        ))
-        q = create_session().query(m)
-        l = q.select()
-        self.assert_result(l, User,
-            {'user_id' : 7, 
-                'addresses' : (Address, [{'address_id' : 1}]),
-                'open_orders' : (Order, [{'order_id' : 3}]),
-                'closed_orders' : (Order, [{'order_id' : 1},{'order_id' : 5},])
-            },
-            {'user_id' : 8, 
-                'addresses' : (Address, [{'address_id' : 2}, {'address_id' : 3}, {'address_id' : 4}]),
-                'open_orders' : (Order, []),
-                'closed_orders' : (Order, [])
-            },
-            {'user_id' : 9, 
-                'addresses' : (Address, []),
-                'open_orders' : (Order, [{'order_id' : 4}]),
-                'closed_orders' : (Order, [{'order_id' : 2}])
-            }
-            )
-
-    def testmanytomany(self):
-        """tests a many-to-many lazy load"""
-        mapper(Item, orderitems, properties = dict(
-                keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = True),
-            ))
-        q = create_session().query(Item)
-        l = q.select()
-        self.assert_result(l, Item, 
-            {'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])},
-            {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])},
-            {'item_id' : 3, 'keywords' : (Keyword, [{'keyword_id' : 3}, {'keyword_id' : 4}, {'keyword_id' : 6}])},
-            {'item_id' : 4, 'keywords' : (Keyword, [])},
-            {'item_id' : 5, 'keywords' : (Keyword, [])}
-        )
-        l = q.select(and_(keywords.c.name == 'red', keywords.c.keyword_id == itemkeywords.c.keyword_id, Item.c.item_id==itemkeywords.c.item_id))
-        self.assert_result(l, Item, 
-            {'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])},
-            {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])},
-        )
-    
-
-        
-class EagerTest(MapperSuperTest):
-    def testbasic(self):
-        """tests a basic one-to-many eager load"""
-        m = mapper(Address, addresses)
-        
-        m = mapper(User, users, properties = dict(
-            addresses = relation(m, lazy = False),
-        ))
-        q = create_session().query(m)
-        l = q.select()
-        self.assert_result(l, User, *user_address_result)
-        
-    def testorderby(self):
-        m = mapper(Address, addresses)
-        
-        m = mapper(User, users, properties = dict(
-            addresses = relation(m, lazy = False, order_by=addresses.c.email_address),
-        ))
-        q = create_session().query(m)
-        l = q.select()
-        self.assert_result(l, User,
-            {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])},
-            {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@bettyboop.com'}, {'email_address':'ed@lala.com'}, {'email_address':'ed@wood.com'}])},
-            {'user_id' : 9, 'addresses' : (Address, [])}
-            )
-
-        
-    def testorderby_desc(self):
-        m = mapper(Address, addresses)
-
-        m = mapper(User, users, properties = dict(
-            addresses = relation(m, lazy = False, order_by=[desc(addresses.c.email_address)]),
-        ))
-        q = create_session().query(m)
-        l = q.select()
-
-        self.assert_result(l, User,
-            {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])},
-            {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@wood.com'},{'email_address':'ed@lala.com'},  {'email_address':'ed@bettyboop.com'}, ])},
-            {'user_id' : 9, 'addresses' : (Address, [])},
-            )
-    
-    def testlimit(self):
-        ordermapper = mapper(Order, orders, properties = dict(
-                items = relation(mapper(Item, orderitems), lazy = False)
-            ))
-
-        m = mapper(User, users, properties = dict(
-            addresses = relation(mapper(Address, addresses), lazy = False),
-            orders = relation(ordermapper, primaryjoin = users.c.user_id==orders.c.user_id, lazy = False),
-        ))
-        sess = create_session()
-        q = sess.query(m)
-        
-        if db.engine.name == 'mssql':
-            l = q.select(limit=2)
-            self.assert_result(l, User, *user_all_result[:2])
-        else:        
-            l = q.select(limit=2, offset=1)
-            self.assert_result(l, User, *user_all_result[1:3])
-
-        # this is an involved 3x union of the users table to get a lot of rows.
-        # then see if the "distinct" works its way out.  you actually get the same
-        # result with or without the distinct, just via less or more rows.
-        u2 = users.alias('u2')
-        s = union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u')
-        l = q.select(s.c.u2_user_id==User.c.user_id, distinct=True)
-        self.assert_result(l, User, *user_all_result)
-        sess.clear()
-        clear_mappers()
-        m = mapper(Item, orderitems, properties = dict(
-                keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = False, order_by=[keywords.c.keyword_id]),
-            ))
-        q = sess.query(m)
-        l = q.select((Item.c.item_name=='item 2') | (Item.c.item_name=='item 5') | (Item.c.item_name=='item 3'), order_by=[Item.c.item_id], limit=2)        
-        self.assert_result(l, Item, *[item_keyword_result[1], item_keyword_result[2]])
-        
-    def testmorelimit(self):
-        """test that the ORDER BY is propigated from the inner select to the outer select, when using the 
-        'wrapped' select statement resulting from the combination of eager loading and limit/offset clauses."""
-        ordermapper = mapper(Order, orders, properties = dict(
-                items = relation(mapper(Item, orderitems), lazy = False)
-            ))
-
-        m = mapper(User, users, properties = dict(
-            addresses = relation(mapper(Address, addresses), lazy = False),
-            orders = relation(ordermapper, primaryjoin = users.c.user_id==orders.c.user_id, lazy = False),
-        ))
-        sess = create_session()
-        q = sess.query(m)
-        
-        if db.engine.name != 'mssql':
-            l = q.select(q.join_to('orders'), order_by=desc(orders.c.user_id), limit=2, offset=1)
-            self.assert_result(l, User, *(user_all_result[2], user_all_result[0]))
-        
-    def testonetoone(self):
-        m = mapper(User, users, properties = dict(
-            address = relation(mapper(Address, addresses), lazy = False, uselist = False)
-        ))
-        q = create_session().query(m)
-        l = q.select(users.c.user_id == 7)
-        self.assert_result(l, User,
-            {'user_id' : 7, 'address' : (Address, {'address_id' : 1, 'email_address': 'jack@bean.com'})},
-            )
-
-    def testbackwardsonetoone(self):
-        m = mapper(Address, addresses, properties = dict(
-            user = relation(mapper(User, users), lazy = False)
-        )).compile()
-        self.echo(repr(m.props['user'].uselist))
-        q = create_session().query(m)
-        l = q.select(addresses.c.address_id == 1)
-        self.assert_result(l, Address, 
-            {'address_id' : 1, 'email_address' : 'jack@bean.com', 
-                'user' : (User, {'user_id' : 7, 'user_name' : 'jack'}) 
-            },
-        )
-
-    def testorphanstate(self):
-        """test that an eagerly loaded child object is not marked as an orphan"""
-        m = mapper(User, users, properties={
-            'addresses':relation(Address, cascade="all,delete-orphan", lazy=False)
-        })
-        mapper(Address, addresses)
-        
-        s = create_session()
-        q = s.query(m)
-        user = q.get(7)
-        assert getattr(User, 'addresses').hasparent(user.addresses[0], optimistic=True)
-        assert not class_mapper(Address)._is_orphan(user.addresses[0])
-        
-    def testwithrepeat(self):
-        """tests a one-to-many eager load where we also query on joined criterion, where the joined
-        criterion is using the same tables that are used within the eager load.  the mapper must insure that the 
-        criterion doesnt interfere with the eager load criterion."""
-        m = mapper(User, users, properties = dict(
-            addresses = relation(mapper(Address, addresses), primaryjoin = users.c.user_id==addresses.c.user_id, lazy = False)
-        ))
-        q = create_session().query(m)
-        l = q.select(and_(addresses.c.email_address == 'ed@lala.com', addresses.c.user_id==users.c.user_id))
-        self.assert_result(l, User,
-            {'user_id' : 8, 'addresses' : (Address, [{'address_id' : 2, 'email_address':'ed@wood.com'}, {'address_id':3, 'email_address':'ed@bettyboop.com'}, {'address_id':4, 'email_address':'ed@lala.com'}])},
-        )
-        
-    def testcircular(self):
-        """test that a circular eager relationship breaks the cycle with a lazy loader"""
-        m = mapper(User, users, properties = dict(
-            addresses = relation(mapper(Address, addresses), lazy=False, backref=backref('user', lazy=False))
-        ))
-        assert class_mapper(User).props['addresses'].lazy is False
-        assert class_mapper(Address).props['user'].lazy is False
-        session = create_session()
-        l = session.query(User).select()
-        self.assert_result(l, User, *user_address_result)
-        
-    def testcompile(self):
-        """tests deferred operation of a pre-compiled mapper statement"""
-        session = create_session()
-        m = mapper(User, users, properties = dict(
-            addresses = relation(mapper(Address, addresses), lazy = False)
-        ))
-        s = session.query(m).compile(and_(addresses.c.email_address == bindparam('emailad'), addresses.c.user_id==users.c.user_id))
-        c = s.compile()
-        self.echo("\n" + str(c) + repr(c.get_params()))
-        
-        l = m.instances(s.execute(emailad = 'jack@bean.com'), session)
-        self.echo(repr(l))
-    
-    def testonselect(self):
-        """test eager loading of a mapper which is against a select"""
-        
-        s = select([orders], orders.c.isopen==1).alias('openorders')
-        mapper(Order, s, properties={
-            'user':relation(User, lazy=False)
-        })
-        mapper(User, users)
-        
-        q = create_session().query(Order)
-        self.assert_result(q.list(), Order,
-            {'order_id':3, 'user' : (User, {'user_id':7})},
-            {'order_id':4, 'user' : (User, {'user_id':9})},
-        )
-
-        q = q.select_from(s.outerjoin(orderitems)).filter(orderitems.c.item_name != 'item 2')
-        self.assert_result(q.list(), Order,
-            {'order_id':3, 'user' : (User, {'user_id':7})},
-        )
-        
-        
-    def testmulti(self):
-        """tests eager loading with two relations simultaneously"""
-        m = mapper(User, users, properties = dict(
-            addresses = relation(mapper(Address, addresses), primaryjoin = users.c.user_id==addresses.c.user_id, lazy = False),
-            orders = relation(mapper(Order, orders), lazy = False),
-        ))
-        q = create_session().query(m)
-        l = q.select()
-        self.assert_result(l, User,
-            {'user_id' : 7, 
-                'addresses' : (Address, [{'address_id' : 1}]),
-                'orders' : (Order, [{'order_id' : 1}, {'order_id' : 3},{'order_id' : 5},])
-            },
-            {'user_id' : 8, 
-                'addresses' : (Address, [{'address_id' : 2}, {'address_id' : 3}, {'address_id' : 4}]),
-                'orders' : (Order, [])
-            },
-            {'user_id' : 9, 
-                'addresses' : (Address, []),
-                'orders' : (Order, [{'order_id' : 2},{'order_id' : 4}])
-            }
-            )
-
-    def testdouble(self):
-        """tests eager loading with two relations simultaneously, from the same table.  """
-        openorders = alias(orders, 'openorders')
-        closedorders = alias(orders, 'closedorders')
-        ordermapper = mapper(Order, orders)
-        m = mapper(User, users, properties = dict(
-            addresses = relation(mapper(Address, addresses), lazy = False),
-            open_orders = relation(mapper(Order, openorders, non_primary=True), primaryjoin = and_(openorders.c.isopen == 1, users.c.user_id==openorders.c.user_id), lazy = False),
-            closed_orders = relation(mapper(Order, closedorders, non_primary=True), primaryjoin = and_(closedorders.c.isopen == 0, users.c.user_id==closedorders.c.user_id), lazy = False)
-        ))
-        q = create_session().query(m)
-        l = q.select()
-        self.assert_result(l, User,
-            {'user_id' : 7, 
-                'addresses' : (Address, [{'address_id' : 1}]),
-                'open_orders' : (Order, [{'order_id' : 3}]),
-                'closed_orders' : (Order, [{'order_id' : 1},{'order_id' : 5},])
-            },
-            {'user_id' : 8, 
-                'addresses' : (Address, [{'address_id' : 2}, {'address_id' : 3}, {'address_id' : 4}]),
-                'open_orders' : (Order, []),
-                'closed_orders' : (Order, [])
-            },
-            {'user_id' : 9, 
-                'addresses' : (Address, []),
-                'open_orders' : (Order, [{'order_id' : 4}]),
-                'closed_orders' : (Order, [{'order_id' : 2}])
-            }
-            )
-
-    def testdoublewithscalar(self):
-        """tests eager loading with two relations from the same table, with one of them joining to the parent User.  the other is the primary mapper.  doesn't re-test addresses relation."""
-        max_orders_by_user = select([func.max(orders.c.order_id).label('order_id')], group_by=[orders.c.user_id]).alias('max_orders_by_user')
-        max_orders = orders.select(orders.c.order_id==max_orders_by_user.c.order_id).alias('max_orders')
-        m = mapper(User, users, properties={
-               'orders':relation(mapper(Order, orders), backref='user', lazy=False),
-               'max_order':relation(mapper(Order, max_orders, non_primary=True), lazy=False, uselist=False)
-               })
-        q = create_session().query(m)
-        l = q.select()
-        self.assert_result(l, User,
-            {'user_id' : 7, 
-                'orders' : (Order, [{'order_id' : 1}, {'order_id' : 3},{'order_id' : 5},]),
-                'max_order' : (Order, {'order_id' : 5})
-            },
-            {'user_id' : 8, 
-                'orders' : (Order, []),
-                'max_order' : None,
-            },
-            {'user_id' : 9, 
-                'orders' : (Order, [{'order_id' : 2},{'order_id' : 4}]),
-                'max_order' : (Order, {'order_id' : 4})
-            }
-            )
-
-    def testnested(self):
-        """tests eager loading of a parent item with two types of child items,
-        where one of those child items eager loads its own child items."""
-        ordermapper = mapper(Order, orders, properties = dict(
-                items = relation(mapper(Item, orderitems), lazy = False)
-            ))
-
-        m = mapper(User, users, properties = dict(
-            addresses = relation(mapper(Address, addresses), lazy = False),
-            orders = relation(ordermapper, primaryjoin = users.c.user_id==orders.c.user_id, lazy = False),
-        ))
-        q = create_session().query(m)
-        l = q.select()
-        self.assert_result(l, User, *user_all_result)
-    
-    def testmanytomany(self):
-        items = orderitems
-        m = mapper(Item, items, properties = dict(
-                keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy=False, order_by=[keywords.c.keyword_id]),
-            ))
-        q = create_session().query(m)
-        l = q.select()
-        self.assert_result(l, Item, *item_keyword_result)
-        
-        l = q.select(and_(keywords.c.name == 'red', keywords.c.keyword_id == itemkeywords.c.keyword_id, items.c.item_id==itemkeywords.c.item_id))
-        self.assert_result(l, Item, 
-            {'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])},
-            {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])},
-        )
-
-    
-    def testmanytomanyoptions(self):
-        items = orderitems
-        
-        m = mapper(Item, items, properties = dict(
-                keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy=True, order_by=[keywords.c.keyword_id]),
-            ))
-        q = create_session().query(m).options(eagerload('keywords'))
-        def go():
-            l = q.select()
-            self.assert_result(l, Item, *item_keyword_result)
-        self.assert_sql_count(db, go, 1)
-        
-        def go():
-            l = q.select(and_(keywords.c.name == 'red', keywords.c.keyword_id == itemkeywords.c.keyword_id, items.c.item_id==itemkeywords.c.item_id))
-            self.assert_result(l, Item, 
-                {'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])},
-                {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])},
-            )
-        self.assert_sql_count(db, go, 1)
-        
-    def testoneandmany(self):
-        """tests eager load for a parent object with a child object that 
-        contains a many-to-many relationship to a third object."""
-        items = orderitems
-
-        m = mapper(Item, items, 
-            properties = dict(
-                keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = False, order_by=[keywords.c.keyword_id]),
-            ))
-
-        m = mapper(Order, orders, properties = dict(
-                items = relation(m, lazy = False)
-            ))
-        q = create_session().query(m)
-        l = q.select("orders.order_id in (1,2,3)")
-        self.assert_result(l, Order,
-            {'order_id' : 1, 'items': (Item, [])}, 
-            {'order_id' : 2, 'items': (Item, [
-                {'item_id':1, 'item_name':'item 1', 'keywords': (Keyword, [{'keyword_id':2, 'name':'red'}, {'keyword_id':4, 'name':'big'}, {'keyword_id' : 6, 'name':'round'}])}, 
-                {'item_id':2, 'item_name':'item 2','keywords' : (Keyword, [{'keyword_id' : 2, 'name':'red'}, {'keyword_id' : 5, 'name':'small'}, {'keyword_id' : 7, 'name':'square'}])}
-               ])},
-            {'order_id' : 3, 'items': (Item, [
-                {'item_id':3, 'item_name':'item 3', 'keywords' : (Keyword, [{'keyword_id' : 3, 'name':'green'}, {'keyword_id' : 4, 'name':'big'}, {'keyword_id' : 6, 'name':'round'}])}, 
-                {'item_id':4, 'item_name':'item 4'}, 
-                {'item_id':5, 'item_name':'item 5'}
-               ])},
-        )
-
-class InstancesTest(MapperSuperTest):
-    def testcustomfromalias(self):
-        mapper(User, users, properties={
-            'addresses':relation(Address, lazy=True)
-        })
-        mapper(Address, addresses)
-        query = users.select(users.c.user_id==7).union(users.select(users.c.user_id>7)).alias('ulist').outerjoin(addresses).select(use_labels=True,order_by=['ulist.user_id', addresses.c.address_id])
-        q = create_session().query(User)
-        
-        def go():
-            l = q.options(contains_alias('ulist'), contains_eager('addresses')).instances(query.execute())
-            self.assert_result(l, User, *user_address_result)
-        self.assert_sql_count(testbase.db, go, 1)
-        
-    def testcustomeagerquery(self):
-        mapper(User, users, properties={
-            # setting lazy=True - the contains_eager() option below
-            # should imply eagerload()
-            'addresses':relation(Address, lazy=True)
-        })
-        mapper(Address, addresses)
-        
-        selectquery = users.outerjoin(addresses).select(use_labels=True, order_by=[users.c.user_id, addresses.c.address_id])
-        q = create_session().query(User)
-        
-        def go():
-            l = q.options(contains_eager('addresses')).instances(selectquery.execute())
-            self.assert_result(l, User, *user_address_result)
-        self.assert_sql_count(testbase.db, go, 1)
-
-    def testcustomeagerwithstringalias(self):
-        mapper(User, users, properties={
-            'addresses':relation(Address, lazy=False)
-        })
-        mapper(Address, addresses)
-
-        adalias = addresses.alias('adalias')
-        selectquery = users.outerjoin(adalias).select(use_labels=True, order_by=[users.c.user_id, adalias.c.address_id])
-        q = create_session().query(User)
-
-        def go():
-            l = q.options(contains_eager('addresses', alias="adalias")).instances(selectquery.execute())
-            self.assert_result(l, User, *user_address_result)
-        self.assert_sql_count(testbase.db, go, 1)
-
-    def testcustomeagerwithalias(self):
-        mapper(User, users, properties={
-            'addresses':relation(Address, lazy=False)
-        })
-        mapper(Address, addresses)
-
-        adalias = addresses.alias('adalias')
-        selectquery = users.outerjoin(adalias).select(use_labels=True, order_by=[users.c.user_id, adalias.c.address_id])
-        q = create_session().query(User)
-
-        def go():
-            l = q.options(contains_eager('addresses', alias=adalias)).instances(selectquery.execute())
-            self.assert_result(l, User, *user_address_result)
-        self.assert_sql_count(testbase.db, go, 1)
-
-    def testcustomeagerwithdecorator(self):
-        mapper(User, users, properties={
-            'addresses':relation(Address, lazy=False)
-        })
-        mapper(Address, addresses)
-
-        adalias = addresses.alias('adalias')
-        selectquery = users.outerjoin(adalias).select(use_labels=True, order_by=[users.c.user_id, adalias.c.address_id])
-        def decorate(row):
-            d = {}
-            for c in addresses.columns:
-                d[c] = row[adalias.corresponding_column(c)]
-            return d
-            
-        q = create_session().query(User)
-
-        def go():
-            l = q.options(contains_eager('addresses', decorator=decorate)).instances(selectquery.execute())
-            self.assert_result(l, User, *user_address_result)
-        self.assert_sql_count(testbase.db, go, 1)
-    
-    def testmultiplemappers(self):
-        mapper(User, users, properties={
-            'addresses':relation(Address, lazy=True)
-        })
-        mapper(Address, addresses)
-
-        sess = create_session()
-        
-        (user7, user8, user9) = sess.query(User).select()
-        (address1, address2, address3, address4) = sess.query(Address).select()
-        
-        selectquery = users.outerjoin(addresses).select(use_labels=True, order_by=[users.c.user_id, addresses.c.address_id])
-        q = sess.query(User)
-        l = q.instances(selectquery.execute(), Address)
-        # note the result is a cartesian product
-        assert l == [
-            (user7, address1),
-            (user8, address2),
-            (user8, address3),
-            (user8, address4),
-            (user9, None)
-        ]
-    
-    def testmultipleonquery(self):
-        mapper(User, users, properties={
-            'addresses':relation(Address, lazy=True)
-        })
-        mapper(Address, addresses)
-        sess = create_session()
-        (user7, user8, user9) = sess.query(User).select()
-        (address1, address2, address3, address4) = sess.query(Address).select()
-        q = sess.query(User)
-        q = q.add_entity(Address).outerjoin('addresses')
-        l = q.list()
-        assert l == [
-            (user7, address1),
-            (user8, address2),
-            (user8, address3),
-            (user8, address4),
-            (user9, None)
-        ]
-
-    def testcolumnonquery(self):
-        mapper(User, users, properties={
-            'addresses':relation(Address, lazy=True)
-        })
-        mapper(Address, addresses)
-        
-        sess = create_session()
-        (user7, user8, user9) = sess.query(User).select()
-        q = sess.query(User)
-        q = q.group_by([c for c in users.c]).order_by(User.c.user_id).outerjoin('addresses').add_column(func.count(addresses.c.address_id).label('count'))
-        l = q.list()
-        assert l == [
-            (user7, 1),
-            (user8, 3),
-            (user9, 0)
-        ], repr(l)
-        
-    def testmapperspluscolumn(self):
-        mapper(User, users)
-        s = select([users, func.count(addresses.c.address_id).label('count')], from_obj=[users.outerjoin(addresses)], group_by=[c for c in users.c], order_by=[users.c.user_id])
-        sess = create_session()
-        (user7, user8, user9) = sess.query(User).select()
-        q = sess.query(User)
-        l = q.instances(s.execute(), "count")
-        assert l == [
-            (user7, 1),
-            (user8, 3),
-            (user9, 0)
-        ]
-        
-    def testmappersplustwocolumns(self):
-        mapper(User, users)
-
-        # Fixme ticket #475!
-        if db.engine.name == 'mysql':
-            col2 = func.concat("Name:", users.c.user_name).label('concat')
-        else:
-            col2 = ("Name:" + users.c.user_name).label('concat')
-        
-        s = select([users, func.count(addresses.c.address_id).label('count'), col2], from_obj=[users.outerjoin(addresses)], group_by=[c for c in users.c], order_by=[users.c.user_id])
-        sess = create_session()
-        (user7, user8, user9) = sess.query(User).select()
-        q = sess.query(User)
-        l = q.instances(s.execute(), "count", "concat")
-        print l
-        assert l == [
-            (user7, 1, "Name:jack"),
-            (user8, 3, "Name:ed"),
-            (user9, 0, "Name:fred")
-        ]
 
 
 if __name__ == "__main__":    
diff --git a/test/orm/query.py b/test/orm/query.py
new file mode 100644 (file)
index 0000000..f3172a1
--- /dev/null
@@ -0,0 +1,347 @@
+from sqlalchemy import *
+from sqlalchemy.orm import *
+import testbase
+from fixtures import *
+
+class Base(object):
+    def __init__(self, **kwargs):
+        for k in kwargs:
+            setattr(self, k, kwargs[k])
+            
+    def __ne__(self, other):
+        return not self.__eq__(other)
+        
+    def __eq__(self, other):
+        """'passively' compare this object to another.
+        
+        only look at attributes that are present on the source object.
+        
+        """
+        # use __dict__ to avoid instrumented properties
+        for attr in self.__dict__.keys():
+            if attr[0] == '_':
+                continue
+            value = getattr(self, attr)
+            if hasattr(value, '__iter__') and not isinstance(value, basestring):
+                if len(value) == 0:
+                    continue
+                for (us, them) in zip(value, getattr(other, attr)):
+                    if us != them:
+                        return False
+                else:
+                    continue
+            else:
+                if value is not None:
+                    if value != getattr(other, attr):
+                        return False
+        else:
+            return True
+
+class QueryTest(testbase.ORMTest):
+    keep_mappers = True
+    keep_data = True
+    
+    def setUpAll(self):
+        super(QueryTest, self).setUpAll()
+        install_fixture_data()
+        self.setup_mappers()
+    
+    def tearDownAll(self):
+        clear_mappers()
+        super(QueryTest, self).tearDownAll()
+          
+    def define_tables(self, meta):
+        # a slight dirty trick here. 
+        meta.tables = metadata.tables
+        metadata.connect(meta.engine)
+        
+    def setup_mappers(self):
+        mapper(User, users, properties={
+            'addresses':relation(Address),
+            'orders':relation(Order, backref='user'), # o2m, m2o
+        })
+        mapper(Address, addresses)
+        mapper(Order, orders, properties={
+            'items':relation(Item, secondary=order_items),  #m2m
+            'address':relation(Address),  # m2o
+        })
+        mapper(Item, items, properties={
+            'keywords':relation(Keyword, secondary=item_keywords) #m2m
+        })
+        mapper(Keyword, keywords)
+
+            
+class GetTest(QueryTest):
+    def test_get(self):
+        s = create_session()
+        assert s.query(User).get(19) is None
+        u = s.query(User).get(7)
+        u2 = s.query(User).get(7)
+        assert u is u2
+        s.clear()
+        u2 = s.query(User).get(7)
+        assert u is not u2
+
+    def test_unicode(self):
+        """test that Query.get properly sets up the type for the bind parameter.  using unicode would normally fail 
+        on postgres, mysql and oracle unless it is converted to an encoded string"""
+        
+        table = Table('unicode_data', users.metadata, 
+            Column('id', Unicode(40), primary_key=True),
+            Column('data', Unicode(40)))
+        table.create()
+        ustring = 'petit voix m\xe2\x80\x99a '.decode('utf-8')
+        table.insert().execute(id=ustring, data=ustring)
+        class LocalFoo(Base):pass
+        mapper(LocalFoo, table)
+        assert create_session().query(LocalFoo).get(ustring) == LocalFoo(id=ustring, data=ustring)
+
+class CompileTest(QueryTest):
+    def test_deferred(self):
+        session = create_session()
+        s = session.query(User).filter(and_(addresses.c.email_address == bindparam('emailad'), addresses.c.user_id==users.c.id)).compile()
+        
+        l = session.query(User).instances(s.execute(emailad = 'jack@bean.com'))
+        assert [User(id=7)] == l
+    
+class SliceTest(QueryTest):
+    def test_first(self):
+        assert  User(id=7) == create_session().query(User).first()
+        
+        assert create_session().query(User).filter(users.c.id==27).first() is None
+        
+class FilterTest(QueryTest):
+    def test_basic(self):
+        assert [User(id=7), User(id=8), User(id=9),User(id=10)] == create_session().query(User).all()
+
+    def test_onefilter(self):
+        assert [User(id=8), User(id=9)] == create_session().query(User).filter(users.c.name.endswith('ed')).all()
+
+        
+class ParentTest(QueryTest):
+    def test_o2m(self):
+        sess = create_session()
+        q = sess.query(User)
+        
+        u1 = q.filter_by(name='jack').one()
+
+        # test auto-lookup of property
+        o = sess.query(Order).with_parent(u1).all()
+        assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o
+
+        # test with explicit property
+        o = sess.query(Order).with_parent(u1, property='orders').all()
+        assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o
+
+        # test static method
+        o = Query.query_from_parent(u1, property='orders', session=sess).all()
+        assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o
+
+        # test generative criterion
+        o = sess.query(Order).with_parent(u1).filter(orders.c.id>2).all()
+        assert [Order(description="order 3"), Order(description="order 5")] == o
+
+    def test_noparent(self):
+        sess = create_session()
+        q = sess.query(User)
+        
+        u1 = q.filter_by(name='jack').one()
+
+        try:
+            q = sess.query(Item).with_parent(u1)
+            assert False
+        except exceptions.InvalidRequestError, e:
+            assert str(e) == "Could not locate a property which relates instances of class 'Item' to instances of class 'User'"
+
+    def test_m2m(self):
+        sess = create_session()
+        i1 = sess.query(Item).filter_by(id=2).one()
+        k = sess.query(Keyword).with_parent(i1).all()
+        assert [Keyword(name='red'), Keyword(name='small'), Keyword(name='square')] == k
+        
+    
+class JoinTest(QueryTest):
+    def test_overlapping_paths(self):
+        # load a user who has an order that contains item id 3 and address id 1 (order 3, owned by jack)
+        result = create_session().query(User).join(['orders', 'items']).filter_by(id=3).reset_joinpoint().join(['orders','address']).filter_by(id=1).all()
+        assert [User(id=7, name='jack')] == result
+
+    def test_overlapping_paths_outerjoin(self):
+        result = create_session().query(User).outerjoin(['orders', 'items']).filter_by(id=3).reset_joinpoint().outerjoin(['orders','address']).filter_by(id=1).all()
+        assert [User(id=7, name='jack')] == result
+    
+    def test_overlap_with_aliases(self):
+        oalias = orders.alias('oalias')
+
+        result = create_session().query(User).select_from(users.join(oalias)).filter(oalias.c.description.in_("order 1", "order 2", "order 3")).join(['orders', 'items']).all()
+        assert [User(id=7, name='jack'), User(id=9, name='fred')] == result
+        
+        result = create_session().query(User).select_from(users.join(oalias)).filter(oalias.c.description.in_("order 1", "order 2", "order 3")).join(['orders', 'items']).filter_by(id=4).all()
+        assert [User(id=7, name='jack')] == result
+
+
+class SynonymTest(QueryTest):
+    keep_mappers = True
+    keep_data = True
+
+    def setup_mappers(self):
+        mapper(User, users, properties={
+            'name_syn':synonym('name'),
+            'addresses':relation(Address),
+            'orders':relation(Order, backref='user'), # o2m, m2o
+            'orders_syn':synonym('orders')
+        })
+        mapper(Address, addresses)
+        mapper(Order, orders, properties={
+            'items':relation(Item, secondary=order_items),  #m2m
+            'address':relation(Address),  # m2o
+            'items_syn':synonym('items')
+        })
+        mapper(Item, items, properties={
+            'keywords':relation(Keyword, secondary=item_keywords) #m2m
+        })
+        mapper(Keyword, keywords)
+
+    def test_joins(self):
+        for j in (
+            ['orders', 'items'],
+            ['orders_syn', 'items'],
+            ['orders', 'items_syn'],
+            ['orders_syn', 'items_syn'],
+        ):
+            result = create_session().query(User).join(j).filter_by(id=3).all()
+            assert [User(id=7, name='jack'), User(id=9, name='fred')] == result
+
+    def test_with_parent(self):
+        for nameprop, orderprop in (
+            ('name', 'orders'),
+            ('name_syn', 'orders'),
+            ('name', 'orders_syn'),
+            ('name_syn', 'orders_syn'),
+        ):
+            sess = create_session()
+            q = sess.query(User)
+        
+            u1 = q.filter_by(**{nameprop:'jack'}).one()
+
+            o = sess.query(Order).with_parent(u1, property=orderprop).all()
+            assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o
+        
+class InstancesTest(QueryTest):
+
+    def test_from_alias(self):
+
+        query = users.select(users.c.id==7).union(users.select(users.c.id>7)).alias('ulist').outerjoin(addresses).select(use_labels=True,order_by=['ulist.id', addresses.c.id])
+        q = create_session().query(User)
+
+        def go():
+            l = q.options(contains_alias('ulist'), contains_eager('addresses')).instances(query.execute())
+            assert fixtures.user_address_result == l
+        self.assert_sql_count(testbase.db, go, 1)
+
+    def test_contains_eager(self):
+
+        selectquery = users.outerjoin(addresses).select(use_labels=True, order_by=[users.c.id, addresses.c.id])
+        q = create_session().query(User)
+
+        def go():
+            l = q.options(contains_eager('addresses')).instances(selectquery.execute())
+            assert fixtures.user_address_result == l
+        self.assert_sql_count(testbase.db, go, 1)
+
+    def test_contains_eager_alias(self):
+        adalias = addresses.alias('adalias')
+        selectquery = users.outerjoin(adalias).select(use_labels=True, order_by=[users.c.id, adalias.c.id])
+        q = create_session().query(User)
+
+        def go():
+            # test using a string alias name
+            l = q.options(contains_eager('addresses', alias="adalias")).instances(selectquery.execute())
+            assert fixtures.user_address_result == l
+        self.assert_sql_count(testbase.db, go, 1)
+
+        def go():
+            # test using the Alias object itself
+            l = q.options(contains_eager('addresses', alias=adalias)).instances(selectquery.execute())
+            assert fixtures.user_address_result == l
+        self.assert_sql_count(testbase.db, go, 1)
+
+        def decorate(row):
+            d = {}
+            for c in addresses.columns:
+                d[c] = row[adalias.corresponding_column(c)]
+            return d
+
+        def go():
+            # test using a custom 'decorate' function
+            l = q.options(contains_eager('addresses', decorator=decorate)).instances(selectquery.execute())
+            assert fixtures.user_address_result == l
+        self.assert_sql_count(testbase.db, go, 1)
+
+    def test_multi_mappers(self):
+        sess = create_session()
+
+        (user7, user8, user9, user10) = sess.query(User).all()
+        (address1, address2, address3, address4, address5) = sess.query(Address).all()
+
+        # note the result is a cartesian product
+        expected = [(user7, address1),
+            (user8, address2),
+            (user8, address3),
+            (user8, address4),
+            (user9, address5),
+            (user10, None)]
+        
+        selectquery = users.outerjoin(addresses).select(use_labels=True, order_by=[users.c.id, addresses.c.id])
+        q = sess.query(User)
+        l = q.instances(selectquery.execute(), Address)
+        assert l == expected
+
+        q = sess.query(User)
+        q = q.add_entity(Address).outerjoin('addresses')
+        l = q.all()
+        assert l == expected
+
+        q = sess.query(User).add_entity(Address)
+        l = q.join('addresses').filter_by(email_address='ed@bettyboop.com').all()
+        assert l == [(user8, address3)]
+
+    def test_multi_columns(self):
+        sess = create_session()
+        (user7, user8, user9, user10) = sess.query(User).all()
+        expected = [(user7, 1),
+            (user8, 3),
+            (user9, 1),
+            (user10, 0)
+            ]
+            
+        q = sess.query(User)
+        q = q.group_by([c for c in users.c]).order_by(User.c.id).outerjoin('addresses').add_column(func.count(addresses.c.id).label('count'))
+        l = q.all()
+        assert l == expected
+
+        s = select([users, func.count(addresses.c.id).label('count')], from_obj=[users.outerjoin(addresses)], group_by=[c for c in users.c], order_by=users.c.id)
+        q = sess.query(User)
+        l = q.instances(s.execute(), "count")
+        assert l == expected
+
+    @testbase.unsupported('mysql') # only because of "+" operator requiring "concat" in mysql (fix #475)
+    def test_two_columns(self):
+        sess = create_session()
+        (user7, user8, user9, user10) = sess.query(User).all()
+        expected = [
+            (user7, 1, "Name:jack"),
+            (user8, 3, "Name:ed"),
+            (user9, 1, "Name:fred"),
+            (user10, 0, "Name:chuck")]
+
+        s = select([users, func.count(addresses.c.id).label('count'), ("Name:" + users.c.name).label('concat')], from_obj=[users.outerjoin(addresses)], group_by=[c for c in users.c], order_by=[users.c.id])
+        q = create_session().query(User)
+        l = q.instances(s.execute(), "count", "concat")
+        assert l == expected
+
+
+if __name__ == '__main__':
+    testbase.main()
+
+