--- /dev/null
+.. change::
+ :tags: bug, sql
+ :tickets: 7036
+
+ Fixed issue related to new ``add_cte()`` feature where pairing two
+ "INSERT..FROM SELECT" statements simultaneously would lose track of the two
+ independent SELECT statements, leading to the wrong SQL.
returning_clause = None
if insert_stmt.select is not None:
- select_text = self.process(self._insert_from_select, **kw)
+ # placed here by crud.py
+ select_text = self.process(
+ self.stack[-1]["insert_from_select"], **kw
+ )
if self.ctes and toplevel and self.dialect.cte_follows_insert:
text += " %s%s" % (self._render_cte_clause(), select_text)
cols = [stmt.table.c[_column_as_key(name)] for name in stmt._select_names]
- compiler._insert_from_select = stmt.select
+ assert compiler.stack[-1]["selectable"] is stmt
+
+ compiler.stack[-1]["insert_from_select"] = stmt.select
add_select_cols = []
if stmt.include_insert_from_select_defaults:
if add_select_cols:
values.extend(add_select_cols)
- compiler._insert_from_select = compiler._insert_from_select._generate()
- compiler._insert_from_select._raw_columns = tuple(
- compiler._insert_from_select._raw_columns
+ ins_from_select = compiler.stack[-1]["insert_from_select"]
+ ins_from_select = ins_from_select._generate()
+ ins_from_select._raw_columns = tuple(
+ ins_from_select._raw_columns
) + tuple(expr for col, col_expr, expr in add_select_cols)
+ compiler.stack[-1]["insert_from_select"] = ins_from_select
def _scan_cols(
checkparams={"id": 1, "price": 20, "param_1": 10, "price_1": 50},
)
+ def test_insert_from_select_uses_independent_cte(self):
+ """test #7036"""
+
+ t1 = table("table1", column("id1"), column("a"))
+
+ t2 = table("table2", column("id2"), column("b"))
+
+ ins1 = t1.insert().from_select(["id1", "a"], select(1, text("'a'")))
+
+ cte1 = ins1.cte("cte1")
+
+ ins2 = t2.insert().from_select(["id2", "b"], select(2, text("'b'")))
+
+ ins2 = ins2.add_cte(cte1)
+
+ self.assert_compile(
+ ins2,
+ "WITH cte1 AS "
+ "(INSERT INTO table1 (id1, a) SELECT 1, 'a') "
+ "INSERT INTO table2 (id2, b) SELECT 2, 'b'",
+ checkparams={},
+ )
+
def test_update_uses_independent_cte(self):
products = table("products", column("id"), column("price"))