]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- Added official support for a CTE used by the SELECT present
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 8 May 2015 16:37:55 +0000 (12:37 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 8 May 2015 16:42:36 +0000 (12:42 -0400)
inside of :meth:`.Insert.from_select`.  This behavior worked
accidentally up until 0.9.9, when it no longer worked due to
unrelated changes as part of :ticket:`3248`.   Note that this
is the rendering of the WITH clause after the INSERT, before the
SELECT; the full functionality of CTEs rendered at the top
level of INSERT, UPDATE, DELETE is a new feature targeted for a
later release.
fixes #3418

(cherry picked from commit eb1bb84fbc10c801c7269a3d38c9e0235327857e)

doc/build/changelog/changelog_09.rst
lib/sqlalchemy/sql/compiler.py
test/sql/test_insert.py

index 175da416146450359473fbeae60518b0233e1e0a..1a653af96fc64d922c5594d676d64b07f68c57ba 100644 (file)
 .. changelog::
     :version: 0.9.10
 
+    .. change::
+        :tags: feature, sql
+        :tickets: 3418
+        :versions: 1.0.5
+
+        Added official support for a CTE used by the SELECT present
+        inside of :meth:`.Insert.from_select`.  This behavior worked
+        accidentally up until 0.9.9, when it no longer worked due to
+        unrelated changes as part of :ticket:`3248`.   Note that this
+        is the rendering of the WITH clause after the INSERT, before the
+        SELECT; the full functionality of CTEs rendered at the top
+        level of INSERT, UPDATE, DELETE is a new feature targeted for a
+        later release.
+
     .. change::
         :tags: bug, ext
         :tickets: 3408
index 9e27d8cb51350772e5e3a7aedb43791b73c497f4..f86a3581071170786ab8c77a405ed137c1fa2fd0 100644 (file)
@@ -759,7 +759,8 @@ class SQLCompiler(Compiled):
             {
                 'correlate_froms': entry['correlate_froms'],
                 'iswrapper': toplevel,
-                'asfrom_froms': entry['asfrom_froms']
+                'asfrom_froms': entry['asfrom_froms'],
+                "selectable": cs
             })
 
         keyword = self.compound_keywords.get(cs.keyword)
@@ -1478,7 +1479,8 @@ class SQLCompiler(Compiled):
         new_entry = {
             'asfrom_froms': new_correlate_froms,
             'iswrapper': iswrapper,
-            'correlate_froms': all_correlate_froms
+            'correlate_froms': all_correlate_froms,
+            "selectable": select
         }
         self.stack.append(new_entry)
 
@@ -1572,7 +1574,7 @@ class SQLCompiler(Compiled):
             text += self.for_update_clause(select)
 
         if self.ctes and \
-                compound_index == 0 and toplevel:
+                compound_index == 0 and self._is_toplevel_select(select):
             text = self._render_cte_clause() + text
 
         self.stack.pop(-1)
@@ -1582,6 +1584,20 @@ class SQLCompiler(Compiled):
         else:
             return text
 
+    def _is_toplevel_select(self, select):
+        """Return True if the stack is placed at the given select, and
+        is also the outermost SELECT, meaning there is either no stack
+        before this one, or the enclosing stack is a topmost INSERT.
+
+        """
+        return (
+            self.stack[-1]['selectable'] is select and
+            (
+                len(self.stack) == 1 or self.isinsert and len(self.stack) == 2
+                and self.statement is self.stack[0]['selectable']
+            )
+        )
+
     def _generate_prefixes(self, stmt, prefixes, **kw):
         clause = " ".join(
             prefix._compiler_dispatch(self, **kw)
@@ -1796,7 +1812,8 @@ class SQLCompiler(Compiled):
         self.stack.append(
             {'correlate_froms': set([update_stmt.table]),
              "iswrapper": False,
-             "asfrom_froms": set([update_stmt.table])})
+             "asfrom_froms": set([update_stmt.table]),
+             "selectable": update_stmt})
 
         self.isupdate = True
 
@@ -2251,7 +2268,8 @@ class SQLCompiler(Compiled):
     def visit_delete(self, delete_stmt, **kw):
         self.stack.append({'correlate_froms': set([delete_stmt.table]),
                            "iswrapper": False,
-                           "asfrom_froms": set([delete_stmt.table])})
+                           "asfrom_froms": set([delete_stmt.table]),
+                           "selectable": delete_stmt})
         self.isdelete = True
 
         text = "DELETE "
index bedaa72d43333c73ad6f1d0cb46f19e06d3ee853..1d662e76fb4f5972c84f243d53735bd617c72a62 100644 (file)
@@ -153,6 +153,41 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
             checkparams={"name_1": "foo"}
         )
 
+    def test_insert_from_select_cte_one(self):
+        table1 = self.tables.mytable
+
+        cte = select([table1.c.name]).where(table1.c.name == 'bar').cte()
+
+        sel = select([table1.c.myid, table1.c.name]).where(
+            table1.c.name == cte.c.name)
+
+        ins = self.tables.myothertable.insert().\
+            from_select(("otherid", "othername"), sel)
+        self.assert_compile(
+            ins,
+            "INSERT INTO myothertable (otherid, othername) WITH anon_1 AS "
+            "(SELECT mytable.name AS name FROM mytable "
+            "WHERE mytable.name = :name_1) "
+            "SELECT mytable.myid, mytable.name FROM mytable, anon_1 "
+            "WHERE mytable.name = anon_1.name",
+            checkparams={"name_1": "bar"}
+        )
+
+    def test_insert_from_select_cte_two(self):
+        table1 = self.tables.mytable
+
+        cte = table1.select().cte("c")
+        stmt = cte.select()
+        ins = table1.insert().from_select(table1.c, stmt)
+
+        self.assert_compile(
+            ins,
+            "INSERT INTO mytable (myid, name, description) "
+            "WITH c AS (SELECT mytable.myid AS myid, mytable.name AS name, "
+            "mytable.description AS description FROM mytable) "
+            "SELECT c.myid, c.name, c.description FROM c"
+        )
+
     def test_insert_from_select_select_alt_ordering(self):
         table1 = self.tables.mytable
         sel = select([table1.c.name, table1.c.myid]).where(