]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- add full series of unit tests for PropertyOption._get_paths, thus enabling us to
authorMike Bayer <mike_mp@zzzcomputing.com>
Sun, 12 Dec 2010 00:50:41 +0000 (19:50 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sun, 12 Dec 2010 00:50:41 +0000 (19:50 -0500)
rewrite it

lib/sqlalchemy/orm/interfaces.py
test/orm/test_query.py

index 0694ee6966b6922b87b4437f415c2fa389f82d83..127f070cc80b59626112b52e64982eea717a9745 100644 (file)
@@ -23,7 +23,8 @@ import sqlalchemy.exceptions as sa_exc
 from sqlalchemy import log, util, event
 from sqlalchemy.sql import expression
 
-class_mapper = None
+mapperutil = util.importlater('sqlalchemy.orm', 'util')
+
 collections = None
 
 __all__ = (
@@ -366,11 +367,7 @@ def deserialize_path(path):
     if path is None:
         return None
 
-    global class_mapper
-    if class_mapper is None:
-        from sqlalchemy.orm import class_mapper
-    
-    p = tuple(chain(*[(class_mapper(cls), key) for cls, key in path]))
+    p = tuple(chain(*[(mapperutil.class_mapper(cls), key) for cls, key in path]))
     if p and p[-1] is None:
         p = p[0:-1]
     return p
@@ -439,13 +436,11 @@ class PropertyOption(MapperOption):
         self.__dict__ = state
 
     def _find_entity( self, query, mapper, raiseerr):
-        from sqlalchemy.orm.util import _class_to_mapper, \
-            _is_aliased_class
-        if _is_aliased_class(mapper):
+        if mapperutil._is_aliased_class(mapper):
             searchfor = mapper
             isa = False
         else:
-            searchfor = _class_to_mapper(mapper)
+            searchfor = mapperutil._class_to_mapper(mapper)
             isa = True
         for ent in query._mapper_entities:
             if searchfor is ent.path_entity or isa \
@@ -469,6 +464,7 @@ class PropertyOption(MapperOption):
         # existing path
 
         current_path = list(query._current_path)
+        
         tokens = []
         for key in util.to_list(self.key):
             if isinstance(key, basestring):
index c7e9bc7caa4a94a4d7ccf1e509a3c2df567cfe2f..b6d9947b4c4c9de9056191b2e858b160c826e7d5 100644 (file)
@@ -1525,10 +1525,6 @@ class ImmediateTest(_fixtures.FixtureTest):
         sess.bind = testing.db
         eq_(sess.query().value(sa.literal_column('1').label('x')), 1)
 
-
-
-    
-    
 class UpdateDeleteTest(_base.MappedTest):
     @classmethod
     def define_tables(cls, metadata):
@@ -1866,8 +1862,6 @@ class UpdateDeleteTest(_base.MappedTest):
         
 
 class StatementOptionsTest(QueryTest):
-    """ Make sure a Query's execution_options are passed on to the
-    resulting statement. """
 
     def test_query_with_statement_option(self):
         sess = create_session(bind=testing.db, autocommit=False)
@@ -1891,3 +1885,226 @@ class StatementOptionsTest(QueryTest):
     # TODO: Test that statement options are passed on to
     # updates/deletes, but currently there are no such options
     # applicable for them.
+
+class OptionsTest(QueryTest):
+    """Test the _get_paths() method of PropertyOption."""
+    
+    def _option_fixture(self, *arg):
+        from sqlalchemy.orm import interfaces
+        class Opt(interfaces.PropertyOption):
+            pass
+        return Opt(arg)
+    
+    def _make_path(self, path):
+        r = []
+        for i, item in enumerate(path):
+            if i % 2 == 0:
+                if isinstance(item, type):
+                    item = class_mapper(item)
+            r.append(item)
+        return tuple(r)
+        
+    def _assert_path_result(self, opt, q, paths, mappers):
+        eq_(
+            opt._get_paths(q, False),
+            ([self._make_path(p) for p in paths], 
+            [class_mapper(c) for c in mappers])
+        )
+        
+    def test_get_path_one_level_string(self):
+        sess = Session()
+        q = sess.query(User)
+        
+        opt = self._option_fixture("addresses")
+        self._assert_path_result(opt, q, [(User, 'addresses')], [User])
+
+    def test_get_path_one_level_attribute(self):
+        sess = Session()
+        q = sess.query(User)
+        
+        opt = self._option_fixture(User.addresses)
+        self._assert_path_result(opt, q, [(User, 'addresses')], [User])
+
+    def test_get_path_one_level_with_unrelated(self):
+        sess = Session()
+        q = sess.query(Order)
+        
+        opt = self._option_fixture("addresses")
+        self._assert_path_result(opt, q, [], [])
+
+    def test_path_multilevel_string(self):
+        sess = Session()
+        q = sess.query(User)
+        
+        opt = self._option_fixture("orders.items.keywords")
+        self._assert_path_result(opt, q, [
+            (User, 'orders'), 
+            (User, 'orders', Order, 'items'),
+            (User, 'orders', Order, 'items', Item, 'keywords')
+        ], 
+        [User, Order, Item])
+
+    def test_path_multilevel_attribute(self):
+        sess = Session()
+        q = sess.query(User)
+        
+        opt = self._option_fixture(User.orders, Order.items, Item.keywords)
+        self._assert_path_result(opt, q, [
+            (User, 'orders'), 
+            (User, 'orders', Order, 'items'),
+            (User, 'orders', Order, 'items', Item, 'keywords')
+        ], 
+        [User, Order, Item])
+
+    def test_with_current_matching_string(self):
+        sess = Session()
+        q = sess.query(Item)._with_current_path(
+                self._make_path([User, 'orders', Order, 'items'])
+            )
+
+        opt = self._option_fixture("orders.items.keywords")
+        self._assert_path_result(opt, q, [
+            (Item, 'keywords')
+        ], [Item])
+
+    def test_with_current_matching_attribute(self):
+        sess = Session()
+        q = sess.query(Item)._with_current_path(
+                self._make_path([User, 'orders', Order, 'items'])
+            )
+
+        opt = self._option_fixture(User.orders, Order.items, Item.keywords)
+        self._assert_path_result(opt, q, [
+            (Item, 'keywords')
+        ], [Item])
+
+    def test_with_current_nonmatching_string(self):
+        sess = Session()
+        q = sess.query(Item)._with_current_path(
+                self._make_path([User, 'orders', Order, 'items'])
+            )
+
+        opt = self._option_fixture("keywords")
+        self._assert_path_result(opt, q, [], [])
+
+        opt = self._option_fixture("items.keywords")
+        self._assert_path_result(opt, q, [], [])
+
+    def test_with_current_nonmatching_attribute(self):
+        sess = Session()
+        q = sess.query(Item)._with_current_path(
+                self._make_path([User, 'orders', Order, 'items'])
+            )
+
+        opt = self._option_fixture(Item.keywords)
+        self._assert_path_result(opt, q, [], [])
+
+        opt = self._option_fixture(Order.items, Item.keywords)
+        self._assert_path_result(opt, q, [], [])
+
+    def test_from_base_to_subclass_attr(self):
+        sess = Session()
+        class SubAddr(Address):
+            pass
+        mapper(SubAddr, inherits=Address, properties={
+            'flub':relationship(Dingaling)
+        })
+        
+        q = sess.query(Address)
+        opt = self._option_fixture(SubAddr.flub)
+        
+        self._assert_path_result(opt, q, [(Address, 'flub')], [SubAddr])
+
+    def test_from_subclass_to_subclass_attr(self):
+        sess = Session()
+        class SubAddr(Address):
+            pass
+        mapper(SubAddr, inherits=Address, properties={
+            'flub':relationship(Dingaling)
+        })
+        
+        q = sess.query(SubAddr)
+        opt = self._option_fixture(SubAddr.flub)
+        
+        self._assert_path_result(opt, q, [(SubAddr, 'flub')], [SubAddr])
+
+    def test_from_base_to_base_attr_via_subclass(self):
+        sess = Session()
+        class SubAddr(Address):
+            pass
+        mapper(SubAddr, inherits=Address, properties={
+            'flub':relationship(Dingaling)
+        })
+        
+        q = sess.query(Address)
+        opt = self._option_fixture(SubAddr.user)
+        
+        self._assert_path_result(opt, q, [(Address, 'user')], [Address])
+
+    def test_of_type(self):
+        sess = Session()
+        class SubAddr(Address):
+            pass
+        mapper(SubAddr, inherits=Address)
+        
+        q = sess.query(User)
+        opt = self._option_fixture(User.addresses.of_type(SubAddr), SubAddr.user)
+        
+        self._assert_path_result(opt, q, [
+            (User, 'addresses'),
+            (User, 'addresses', SubAddr, 'user')
+        ], [User, Address])
+
+    def test_of_type_plus_level(self):
+        sess = Session()
+        class SubAddr(Address):
+            pass
+        mapper(SubAddr, inherits=Address, properties={
+            'flub':relationship(Dingaling)
+        })
+        
+        q = sess.query(User)
+        opt = self._option_fixture(User.addresses.of_type(SubAddr), SubAddr.flub)
+        
+        self._assert_path_result(opt, q, [
+            (User, 'addresses'),
+            (User, 'addresses', SubAddr, 'flub')
+        ], [User, SubAddr])
+        
+    def test_aliased_single(self):
+        sess = Session()
+        ualias = aliased(User)
+        q = sess.query(ualias)
+        opt = self._option_fixture(ualias.addresses)
+        self._assert_path_result(opt, q, [(ualias, 'addresses')], [User])
+
+    def test_with_current_aliased_single(self):
+        sess = Session()
+        ualias = aliased(User)
+        q = sess.query(ualias)._with_current_path(
+                        self._make_path([Address, 'user'])
+                )
+        opt = self._option_fixture(Address.user, ualias.addresses)
+        self._assert_path_result(opt, q, [(ualias, 'addresses')], [User])
+
+    def test_with_current_aliased_single_nonmatching_option(self):
+        sess = Session()
+        ualias = aliased(User)
+        q = sess.query(User)._with_current_path(
+                        self._make_path([Address, 'user'])
+                )
+        opt = self._option_fixture(Address.user, ualias.addresses)
+        self._assert_path_result(opt, q, [], [])
+
+    @testing.fails_if(lambda: True, "Broken feature")
+    def test_with_current_aliased_single_nonmatching_entity(self):
+        sess = Session()
+        ualias = aliased(User)
+        q = sess.query(ualias)._with_current_path(
+                        self._make_path([Address, 'user'])
+                )
+        opt = self._option_fixture(Address.user, User.addresses)
+        self._assert_path_result(opt, q, [], [])
+        
+        
+