]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Keep already defined cte
authorEric Masseran <eric.masseran@gmail.com>
Fri, 30 Jul 2021 17:48:48 +0000 (19:48 +0200)
committerEric Masseran <eric.masseran@gmail.com>
Fri, 30 Jul 2021 17:48:48 +0000 (19:48 +0200)
lib/sqlalchemy/sql/compiler.py
test/sql/test_cte.py

index c191a5049dfd059f330107e3283338e6f6085ac9..6a55eea5ff92f8c2201c8da7a042530ed4232889 100644 (file)
@@ -2510,6 +2510,9 @@ class SQLCompiler(Compiled):
         is_new_cte = True
         embedded_in_current_named_cte = False
 
+        if cte in self.level_by_ctes:
+            cte_level = self.level_by_ctes[cte]
+
         cte_level_name = (cte_level, cte_name)
         if cte_level_name in self.ctes_by_name:
             existing_cte = self.ctes_by_name[cte_level_name]
index 45cbea3a881f50f79e703aef2cb36ccb0562e038..f74efa13a1e77b1bfbcd5dd2228d201c5498f1b4 100644 (file)
@@ -1888,6 +1888,49 @@ class NestingCTETest(fixtures.TestBase, AssertsCompiledSQL):
             "SELECT cte.outer_cte FROM cte",
         )
 
+    def test_same_nested_cte_is_not_generated_twice(self):
+        # Same = name and query
+        nesting_cte_used_twice = select([literal(1).label("inner_cte_1")]).cte(
+            "nesting_cte", nesting=True
+        )
+        select_add_cte = select(
+            [(nesting_cte_used_twice.c.inner_cte_1 + 1).label("next_value")]
+        ).cte("nesting_2", nesting=True)
+
+        union_cte = (
+            select(
+                [
+                    (nesting_cte_used_twice.c.inner_cte_1 - 1).label(
+                        "next_value"
+                    )
+                ]
+            )
+            .union(select([select_add_cte]))
+            .cte("wrapper", nesting=True)
+        )
+
+        stmt = (
+            select([union_cte])
+            .add_cte(nesting_cte_used_twice)
+            .union(select([nesting_cte_used_twice]))
+        )
+
+        self.assert_compile(
+            stmt,
+            "WITH nesting_cte AS "
+            "(SELECT %(param_1)s AS inner_cte_1)"
+            ", wrapper AS "
+            "(WITH nesting_2 AS "
+            "(SELECT nesting_cte.inner_cte_1 + %(inner_cte_1_2)s AS next_value "
+            "FROM nesting_cte)"
+            " SELECT nesting_cte.inner_cte_1 - %(inner_cte_1_1)s AS next_value "
+            "FROM nesting_cte UNION SELECT nesting_2.next_value AS next_value "
+            "FROM nesting_2)"
+            " SELECT wrapper.next_value "
+            "FROM wrapper UNION SELECT nesting_cte.inner_cte_1 "
+            "FROM nesting_cte",
+        )
+
     def test_recursive_nesting_cte_in_recursive_cte(self):
         nesting_cte = select([literal(1).label("inner_cte")]).cte(
             "nesting", nesting=True, recursive=True