]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- port query options test suite over from 0.7, plus extra test for [ticket:2098]
authorMike Bayer <mike_mp@zzzcomputing.com>
Thu, 17 Mar 2011 20:56:30 +0000 (16:56 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Thu, 17 Mar 2011 20:56:30 +0000 (16:56 -0400)
- apply fix for [ticket:2098] that just checks "if mapper is None".  0.7 has a
more comprehensive rework of this section already.

CHANGES
lib/sqlalchemy/orm/interfaces.py
test/orm/test_query.py

diff --git a/CHANGES b/CHANGES
index dd44030fbd08a5f57ca877d686c7905e988340bb..397845ab86f9d7af645605a262c2dae4017e5593 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -39,6 +39,12 @@ CHANGES
     in the session.  This will be an exception in 0.7.
     [ticket:2046]
 
+  - Fixed bug in query.options() whereby a path 
+    applied to a lazyload using string keys could 
+    overlap a same named attribute on the wrong 
+    entity.  Note 0.7 has an updated version of this
+    fix.  [ticket:2098]
+
 - sql
   - Column.copy(), as used in table.tometadata(), copies the 
     'doc' attribute.  [ticket:2028]
index 8211d7396f5262ac235485d6ffab898233400036..7b5e96554c1902ff4f0879e894eff91db4a0d73b 100644 (file)
@@ -839,8 +839,11 @@ class PropertyOption(MapperOption):
                     path_element = entity.path_entity
                     mapper = entity.mapper
                 mappers.append(mapper)
-                prop = mapper.get_property(token,
+                if mapper is not None:
+                    prop = mapper.get_property(token,
                         resolve_synonyms=True, raiseerr=raiseerr)
+                else:
+                    prop = None
                 key = token
             elif isinstance(token, PropComparator):
                 prop = token.property
index 120984e95a1aec7d434868fd7ea71b4dfffba86e..4b4d8a7cd70faecc5c542e0d190cc37b3bb38c0d 100644 (file)
@@ -4888,3 +4888,233 @@ class StatementOptionsTest(QueryTest):
     # TODO: Test that statement options are passed on to
     # updates/deletes, but currently there are no such options
     # applicable for them.
+
+class OptionsTest(QueryTest):
+    """Test the _get_paths() method of PropertyOption."""
+
+    def _option_fixture(self, *arg):
+        from sqlalchemy.orm import interfaces
+        class Opt(interfaces.PropertyOption):
+            pass
+        return Opt(arg)
+
+    def _make_path(self, path):
+        r = []
+        for i, item in enumerate(path):
+            if i % 2 == 0:
+                if isinstance(item, type):
+                    item = class_mapper(item)
+            r.append(item)
+        return tuple(r)
+
+    def _assert_path_result(self, opt, q, paths, mappers):
+        eq_(
+            opt._get_paths(q, False),
+            ([self._make_path(p) for p in paths], 
+            [class_mapper(c) for c in mappers])
+        )
+
+    def test_get_path_one_level_string(self):
+        sess = Session()
+        q = sess.query(User)
+
+        opt = self._option_fixture("addresses")
+        self._assert_path_result(opt, q, [(User, 'addresses')], [User])
+
+    def test_get_path_one_level_attribute(self):
+        sess = Session()
+        q = sess.query(User)
+
+        opt = self._option_fixture(User.addresses)
+        self._assert_path_result(opt, q, [(User, 'addresses')], [User])
+
+    def test_path_on_entity_but_doesnt_match_currentpath(self):
+        # ensure "current path" is fully consumed before
+        # matching against current entities.
+        # see [ticket:2098]
+        sess = Session()
+        q = sess.query(User)
+        opt = self._option_fixture('email_address', 'id')
+        q = sess.query(Address)._with_current_path([class_mapper(User), 'addresses'])
+        self._assert_path_result(opt, q, [], [])
+
+    def test_get_path_one_level_with_unrelated(self):
+        sess = Session()
+        q = sess.query(Order)
+
+        opt = self._option_fixture("addresses")
+        self._assert_path_result(opt, q, [], [])
+
+    def test_path_multilevel_string(self):
+        sess = Session()
+        q = sess.query(User)
+
+        opt = self._option_fixture("orders.items.keywords")
+        self._assert_path_result(opt, q, [
+            (User, 'orders'), 
+            (User, 'orders', Order, 'items'),
+            (User, 'orders', Order, 'items', Item, 'keywords')
+        ], 
+        [User, Order, Item])
+
+    def test_path_multilevel_attribute(self):
+        sess = Session()
+        q = sess.query(User)
+
+        opt = self._option_fixture(User.orders, Order.items, Item.keywords)
+        self._assert_path_result(opt, q, [
+            (User, 'orders'), 
+            (User, 'orders', Order, 'items'),
+            (User, 'orders', Order, 'items', Item, 'keywords')
+        ], 
+        [User, Order, Item])
+
+    def test_with_current_matching_string(self):
+        sess = Session()
+        q = sess.query(Item)._with_current_path(
+                self._make_path([User, 'orders', Order, 'items'])
+            )
+
+        opt = self._option_fixture("orders.items.keywords")
+        self._assert_path_result(opt, q, [
+            (Item, 'keywords')
+        ], [Item])
+
+    def test_with_current_matching_attribute(self):
+        sess = Session()
+        q = sess.query(Item)._with_current_path(
+                self._make_path([User, 'orders', Order, 'items'])
+            )
+
+        opt = self._option_fixture(User.orders, Order.items, Item.keywords)
+        self._assert_path_result(opt, q, [
+            (Item, 'keywords')
+        ], [Item])
+
+    def test_with_current_nonmatching_string(self):
+        sess = Session()
+        q = sess.query(Item)._with_current_path(
+                self._make_path([User, 'orders', Order, 'items'])
+            )
+
+        opt = self._option_fixture("keywords")
+        self._assert_path_result(opt, q, [], [])
+
+        opt = self._option_fixture("items.keywords")
+        self._assert_path_result(opt, q, [], [])
+
+    def test_with_current_nonmatching_attribute(self):
+        sess = Session()
+        q = sess.query(Item)._with_current_path(
+                self._make_path([User, 'orders', Order, 'items'])
+            )
+
+        opt = self._option_fixture(Item.keywords)
+        self._assert_path_result(opt, q, [], [])
+
+        opt = self._option_fixture(Order.items, Item.keywords)
+        self._assert_path_result(opt, q, [], [])
+
+    def test_from_base_to_subclass_attr(self):
+        sess = Session()
+        class SubAddr(Address):
+            pass
+        mapper(SubAddr, inherits=Address, properties={
+            'flub':relationship(Dingaling)
+        })
+
+        q = sess.query(Address)
+        opt = self._option_fixture(SubAddr.flub)
+
+        self._assert_path_result(opt, q, [(Address, 'flub')], [SubAddr])
+
+    def test_from_subclass_to_subclass_attr(self):
+        sess = Session()
+        class SubAddr(Address):
+            pass
+        mapper(SubAddr, inherits=Address, properties={
+            'flub':relationship(Dingaling)
+        })
+
+        q = sess.query(SubAddr)
+        opt = self._option_fixture(SubAddr.flub)
+
+        self._assert_path_result(opt, q, [(SubAddr, 'flub')], [SubAddr])
+
+    def test_from_base_to_base_attr_via_subclass(self):
+        sess = Session()
+        class SubAddr(Address):
+            pass
+        mapper(SubAddr, inherits=Address, properties={
+            'flub':relationship(Dingaling)
+        })
+
+        q = sess.query(Address)
+        opt = self._option_fixture(SubAddr.user)
+
+        self._assert_path_result(opt, q, [(Address, 'user')], [Address])
+
+    def test_of_type(self):
+        sess = Session()
+        class SubAddr(Address):
+            pass
+        mapper(SubAddr, inherits=Address)
+
+        q = sess.query(User)
+        opt = self._option_fixture(User.addresses.of_type(SubAddr), SubAddr.user)
+
+        self._assert_path_result(opt, q, [
+            (User, 'addresses'),
+            (User, 'addresses', SubAddr, 'user')
+        ], [User, Address])
+
+    def test_of_type_plus_level(self):
+        sess = Session()
+        class SubAddr(Address):
+            pass
+        mapper(SubAddr, inherits=Address, properties={
+            'flub':relationship(Dingaling)
+        })
+
+        q = sess.query(User)
+        opt = self._option_fixture(User.addresses.of_type(SubAddr), SubAddr.flub)
+
+        self._assert_path_result(opt, q, [
+            (User, 'addresses'),
+            (User, 'addresses', SubAddr, 'flub')
+        ], [User, SubAddr])
+
+    def test_aliased_single(self):
+        sess = Session()
+        ualias = aliased(User)
+        q = sess.query(ualias)
+        opt = self._option_fixture(ualias.addresses)
+        self._assert_path_result(opt, q, [(ualias, 'addresses')], [User])
+
+    def test_with_current_aliased_single(self):
+        sess = Session()
+        ualias = aliased(User)
+        q = sess.query(ualias)._with_current_path(
+                        self._make_path([Address, 'user'])
+                )
+        opt = self._option_fixture(Address.user, ualias.addresses)
+        self._assert_path_result(opt, q, [(ualias, 'addresses')], [User])
+
+    def test_with_current_aliased_single_nonmatching_option(self):
+        sess = Session()
+        ualias = aliased(User)
+        q = sess.query(User)._with_current_path(
+                        self._make_path([Address, 'user'])
+                )
+        opt = self._option_fixture(Address.user, ualias.addresses)
+        self._assert_path_result(opt, q, [], [])
+
+    @testing.fails_if(lambda: True, "Broken feature")
+    def test_with_current_aliased_single_nonmatching_entity(self):
+        sess = Session()
+        ualias = aliased(User)
+        q = sess.query(ualias)._with_current_path(
+                        self._make_path([Address, 'user'])
+                )
+        opt = self._option_fixture(Address.user, User.addresses)
+        self._assert_path_result(opt, q, [], [])