]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- Fixed bug in Query whereby the usage of aliased() constructs
authorMike Bayer <mike_mp@zzzcomputing.com>
Tue, 23 Mar 2010 18:54:26 +0000 (14:54 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Tue, 23 Mar 2010 18:54:26 +0000 (14:54 -0400)
would fail if the underlying table (but not the actual alias)
were referenced inside the subquery generated by
q.from_self() or q.select_from().

CHANGES
lib/sqlalchemy/sql/util.py
test/orm/test_query.py
test/sql/test_generative.py

diff --git a/CHANGES b/CHANGES
index baa8e03188d480611609c527717a626c085a34f1..1cb4625d0e4c4eaec49a08bf6a36b05628de784c 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -13,6 +13,11 @@ CHANGES
     subquery, when joining on the same criterion as was on the 
     inside.
 
+  - Fixed bug in Query whereby the usage of aliased() constructs
+    would fail if the underlying table (but not the actual alias) 
+    were referenced inside the subquery generated by 
+    q.from_self() or q.select_from().
+    
 0.6beta2
 ========
 
index 74651a9d1cd697fa70da92fa1d84b6ab302704ff..d5575e0e73e7f546428e9208adc28f08a6f017a0 100644 (file)
@@ -579,7 +579,7 @@ class ClauseAdapter(visitors.ReplacingCloningVisitor):
             return None
         elif self.exclude and col in self.exclude:
             return None
-
+        
         return self._corresponding_column(col, True)
 
 class ColumnAdapter(ClauseAdapter):
@@ -587,11 +587,13 @@ class ColumnAdapter(ClauseAdapter):
     
     Provides the ability to "wrap" this ClauseAdapter 
     around another, a columns dictionary which returns
-    cached, adapted elements given an original, and an 
+    adapted elements given an original, and an 
     adapted_row() factory.
     
     """
-    def __init__(self, selectable, equivalents=None, chain_to=None, include=None, exclude=None, adapt_required=False):
+    def __init__(self, selectable, equivalents=None, 
+                        chain_to=None, include=None, 
+                        exclude=None, adapt_required=False):
         ClauseAdapter.__init__(self, selectable, equivalents, include, exclude)
         if chain_to:
             self.chain(chain_to)
@@ -617,7 +619,7 @@ class ColumnAdapter(ClauseAdapter):
         return locate
 
     def _locate_col(self, col):
-        c = self._corresponding_column(col, False)
+        c = self._corresponding_column(col, True)
         if c is None:
             c = self.adapt_clause(col)
             
index cd76f53b0cea422fe3e6fa1e05654b5405d83c5d..0ca196a56ec89a6d6b6c94fa9dc543e361e9b6bf 100644 (file)
@@ -3283,7 +3283,7 @@ class CustomJoinTest(QueryTest):
 
         assert [User(id=7)] == q.join('open_orders', 'items', aliased=True).filter(Item.id==4).join('closed_orders', 'items', aliased=True).filter(Item.id==3).all()
 
-class SelfReferentialTest(_base.MappedTest):
+class SelfReferentialTest(_base.MappedTest, AssertsCompiledSQL):
     run_setup_mappers = 'once'
     run_inserts = 'once'
     run_deletes = None
@@ -3337,7 +3337,60 @@ class SelfReferentialTest(_base.MappedTest):
         node = sess.query(Node).filter_by(data='n122').join('parent', aliased=True).filter_by(data='n12').\
             join('parent', aliased=True, from_joinpoint=True).filter_by(data='n1').first()
         assert node.data == 'n122'
+    
+    def test_from_self_inside_excludes_outside(self):
+        """test the propagation of aliased() from inside to outside
+        on a from_self()..
+        """
+        sess = create_session()
+        
+        n1 = aliased(Node)
+        
+        # n1 is not inside the from_self(), so all cols must be maintained
+        # on the outside
+        self.assert_compile(
+            sess.query(Node).filter(Node.data=='n122').from_self(n1, Node.id),
+            "SELECT nodes_1.id AS nodes_1_id, nodes_1.parent_id AS nodes_1_parent_id, "
+            "nodes_1.data AS nodes_1_data, anon_1.nodes_id AS anon_1_nodes_id "
+            "FROM nodes AS nodes_1, (SELECT nodes.id AS nodes_id, "
+            "nodes.parent_id AS nodes_parent_id, nodes.data AS nodes_data FROM "
+            "nodes WHERE nodes.data = :data_1) AS anon_1",
+            use_default_dialect=True
+        )
 
+        parent = aliased(Node)
+        grandparent = aliased(Node)
+        q = sess.query(Node, parent, grandparent).\
+            join((Node.parent, parent), (parent.parent, grandparent)).\
+                filter(Node.data=='n122').filter(parent.data=='n12').\
+                filter(grandparent.data=='n1').from_self().limit(1)
+        
+        # parent, grandparent *are* inside the from_self(), so they 
+        # should get aliased to the outside.
+        self.assert_compile(
+            q,
+            "SELECT anon_1.nodes_id AS anon_1_nodes_id, "
+            "anon_1.nodes_parent_id AS anon_1_nodes_parent_id, "
+            "anon_1.nodes_data AS anon_1_nodes_data, "
+            "anon_1.nodes_1_id AS anon_1_nodes_1_id, "
+            "anon_1.nodes_1_parent_id AS anon_1_nodes_1_parent_id, "
+            "anon_1.nodes_1_data AS anon_1_nodes_1_data, "
+            "anon_1.nodes_2_id AS anon_1_nodes_2_id, "
+            "anon_1.nodes_2_parent_id AS anon_1_nodes_2_parent_id, "
+            "anon_1.nodes_2_data AS anon_1_nodes_2_data "
+            "FROM (SELECT nodes.id AS nodes_id, nodes.parent_id "
+            "AS nodes_parent_id, nodes.data AS nodes_data, "
+            "nodes_1.id AS nodes_1_id, nodes_1.parent_id AS nodes_1_parent_id, "
+            "nodes_1.data AS nodes_1_data, nodes_2.id AS nodes_2_id, "
+            "nodes_2.parent_id AS nodes_2_parent_id, nodes_2.data AS "
+            "nodes_2_data FROM nodes JOIN nodes AS nodes_1 ON "
+            "nodes_1.id = nodes.parent_id JOIN nodes AS nodes_2 "
+            "ON nodes_2.id = nodes_1.parent_id "
+            "WHERE nodes.data = :data_1 AND nodes_1.data = :data_2 AND "
+            "nodes_2.data = :data_3) AS anon_1  LIMIT 1",
+            use_default_dialect=True
+        )
+        
     def test_explicit_join(self):
         sess = create_session()
     
@@ -3382,6 +3435,7 @@ class SelfReferentialTest(_base.MappedTest):
             [Node(parent_id=1,data=u'n11',id=2), Node(parent_id=1,data=u'n12',id=3), Node(parent_id=1,data=u'n13',id=4)]
         )
     
+        
     def test_multiple_explicit_entities(self):
         sess = create_session()
     
index a6f8c5956058a277ff60a23f127b27d18e162790..5457c7a7974e3029662d4e0e649d2c6ac9b2efe3 100644 (file)
@@ -207,6 +207,31 @@ class ClauseTest(TestBase, AssertsCompiledSQL):
         assert c1 == str(clause)
         assert str(clause2) == str(t1.join(t2, t1.c.col2==t2.c.col3))
     
+    def test_aliased_column_adapt(self):
+        clause = t1.select()
+        
+        aliased = t1.select().alias()
+        aliased2 = t1.alias()
+        adapter = sql_util.ColumnAdapter(aliased)
+        
+        f = select([
+            adapter.columns[c]
+            for c in aliased2.c
+        ]).select_from(aliased)
+    
+        s = select([aliased2]).select_from(aliased)
+        eq_(str(s), str(f))
+        f = select([
+            adapter.columns[func.count(aliased2.c.col1)]
+        ]).select_from(aliased)
+        eq_(
+            str(select([func.count(aliased2.c.col1)]).select_from(aliased)),
+            str(f)
+        )
+        
+        
     def test_text(self):
         clause = text("select * from table where foo=:bar", bindparams=[bindparam('bar')])
         c1 = str(clause)