]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
anonymize CRUD params if visiting_cte is present
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 14 Mar 2025 14:33:22 +0000 (10:33 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 14 Mar 2025 14:33:22 +0000 (10:33 -0400)
Fixed issue in :class:`.CTE` constructs involving multiple DDL
:class:`.Insert` statements with multiple VALUES parameter sets where the
bound parameter names generated for these parameter sets would conflict,
generating a compile time error.

Fixes: #12363
Change-Id: If8344ff725d4e0ec58d3ff61f38a0edcfc5bdebd

doc/build/changelog/unreleased_20/12363.rst [new file with mode: 0644]
lib/sqlalchemy/sql/crud.py
test/sql/test_cte.py

diff --git a/doc/build/changelog/unreleased_20/12363.rst b/doc/build/changelog/unreleased_20/12363.rst
new file mode 100644 (file)
index 0000000..e04e51f
--- /dev/null
@@ -0,0 +1,9 @@
+.. change::
+    :tags: bug, sql
+    :tickets: 12363
+
+    Fixed issue in :class:`.CTE` constructs involving multiple DDL
+    :class:`.Insert` statements with multiple VALUES parameter sets where the
+    bound parameter names generated for these parameter sets would conflict,
+    generating a compile time error.
+
index 19af40ff08038e375d51e87f4d95378287713963..c0c0c86bb9c6d808dd2bd382ce2a605aae31fe82 100644 (file)
@@ -393,6 +393,7 @@ def _create_bind_param(
     process: Literal[True] = ...,
     required: bool = False,
     name: Optional[str] = None,
+    force_anonymous: bool = False,
     **kw: Any,
 ) -> str: ...
 
@@ -413,10 +414,14 @@ def _create_bind_param(
     process: bool = True,
     required: bool = False,
     name: Optional[str] = None,
+    force_anonymous: bool = False,
     **kw: Any,
 ) -> Union[str, elements.BindParameter[Any]]:
-    if name is None:
+    if force_anonymous:
+        name = None
+    elif name is None:
         name = col.key
+
     bindparam = elements.BindParameter(
         name, value, type_=col.type, required=required
     )
@@ -486,7 +491,7 @@ def _key_getters_for_crud_column(
         )
 
         def _column_as_key(
-            key: Union[ColumnClause[Any], str]
+            key: Union[ColumnClause[Any], str],
         ) -> Union[str, Tuple[str, str]]:
             str_key = c_key_role(key)
             if hasattr(key, "table") and key.table in _et:
@@ -832,6 +837,7 @@ def _append_param_parameter(
 ):
     value = parameters.pop(col_key)
 
+    has_visiting_cte = kw.get("visiting_cte") is not None
     col_value = compiler.preparer.format_column(
         c, use_table=compile_state.include_table_with_column_exprs
     )
@@ -864,6 +870,7 @@ def _append_param_parameter(
                 else "%s_m0" % _col_bind_name(c)
             ),
             accumulate_bind_names=accumulated_bind_names,
+            force_anonymous=has_visiting_cte,
             **kw,
         )
     elif value._is_bind_parameter:
@@ -1435,6 +1442,7 @@ def _extend_values_for_multiparams(
     values_0 = initial_values
     values = [initial_values]
 
+    has_visiting_cte = kw.get("visiting_cte") is not None
     mp = compile_state._multi_parameters
     assert mp is not None
     for i, row in enumerate(mp[1:]):
@@ -1451,7 +1459,8 @@ def _extend_values_for_multiparams(
                         compiler,
                         col,
                         row[key],
-                        name="%s_m%d" % (col.key, i + 1),
+                        name=("%s_m%d" % (col.key, i + 1)),
+                        force_anonymous=has_visiting_cte,
                         **kw,
                     )
                 else:
index d0ecc38c86f26dcf035daf1fc8fb2337c9fc9056..92b83b7fe3517f6e74c91ac3d102a94e0176d00d 100644 (file)
@@ -1900,6 +1900,37 @@ class CTETest(fixtures.TestBase, AssertsCompiledSQL):
             checkparams={"id": 1, "price": 20, "param_1": 10, "price_1": 50},
         )
 
+    @testing.variation("num_ctes", ["one", "two"])
+    def test_multiple_multivalues_inserts(self, num_ctes):
+        """test #12363"""
+
+        t1 = table("table1", column("id"), column("a"), column("b"))
+
+        t2 = table("table2", column("id"), column("a"), column("b"))
+
+        if num_ctes.one:
+            self.assert_compile(
+                insert(t1)
+                .values([{"a": 1}, {"a": 2}])
+                .add_cte(insert(t2).values([{"a": 5}, {"a": 6}]).cte()),
+                "WITH anon_1 AS "
+                "(INSERT INTO table2 (a) VALUES (:param_1), (:param_2)) "
+                "INSERT INTO table1 (a) VALUES (:a_m0), (:a_m1)",
+            )
+
+        elif num_ctes.two:
+            self.assert_compile(
+                insert(t1)
+                .values([{"a": 1}, {"a": 2}])
+                .add_cte(insert(t1).values([{"b": 5}, {"b": 6}]).cte())
+                .add_cte(insert(t2).values([{"a": 5}, {"a": 6}]).cte()),
+                "WITH anon_1 AS "
+                "(INSERT INTO table1 (b) VALUES (:param_1), (:param_2)), "
+                "anon_2 AS "
+                "(INSERT INTO table2 (a) VALUES (:param_3), (:param_4)) "
+                "INSERT INTO table1 (a) VALUES (:a_m0), (:a_m1)",
+            )
+
     def test_insert_from_select_uses_independent_cte(self):
         """test #7036"""