]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
downgrade batches for bindparam() in SET
authorMike Bayer <mike_mp@zzzcomputing.com>
Tue, 17 Feb 2026 20:58:22 +0000 (15:58 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Tue, 17 Feb 2026 21:11:09 +0000 (16:11 -0500)
Fixed issue where :meth:`_postgresql.Insert.on_conflict_do_update`
using parametrized bound parameters in the ``set_`` clause would fail
when used with executemany batching. For dialects that use the
``use_insertmanyvalues_wo_returning`` optimization (psycopg2),
insertmanyvalues is now disabled when there is an ON CONFLICT clause.
For cases with RETURNING, row-at-a-time mode is used when the SET
clause contains parametrized bindparams (bindparams that receive
values from the parameters dict), ensuring each row's parameters are
correctly applied. ON CONFLICT statements using expressions like
``excluded.<column>`` continue to batch normally.

Fixed issue where :meth:`_sqlite.Insert.on_conflict_do_update`
using parametrized bound parameters in the ``set_`` clause would fail
when used with executemany batching. Row-at-a-time mode is now used
for ON CONFLICT statements with RETURNING that contain parametrized
bindparams, ensuring each row's parameters are correctly applied. ON
CONFLICT statements using expressions like ``excluded.<column>``
continue to batch normally.

Fixes: #13130
Change-Id: I0c5a9142401c745d38e58d071c16e53610f9bfea
(cherry picked from commit 5b2e7aae01cc2e55e68a8445569ee709b17715dd)

doc/build/changelog/unreleased_20/13130.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/postgresql/base.py
lib/sqlalchemy/dialects/sqlite/base.py
lib/sqlalchemy/sql/compiler.py
lib/sqlalchemy/sql/crud.py
test/dialect/postgresql/test_on_conflict.py
test/dialect/sqlite/test_on_conflict.py

diff --git a/doc/build/changelog/unreleased_20/13130.rst b/doc/build/changelog/unreleased_20/13130.rst
new file mode 100644 (file)
index 0000000..0a9febc
--- /dev/null
@@ -0,0 +1,27 @@
+.. change::
+    :tags: bug, postgresql
+    :tickets: 13130
+
+    Fixed issue where :meth:`_postgresql.Insert.on_conflict_do_update`
+    using parametrized bound parameters in the ``set_`` clause would fail
+    when used with executemany batching. For dialects that use the
+    ``use_insertmanyvalues_wo_returning`` optimization (psycopg2),
+    insertmanyvalues is now disabled when there is an ON CONFLICT clause.
+    For cases with RETURNING, row-at-a-time mode is used when the SET
+    clause contains parametrized bindparams (bindparams that receive
+    values from the parameters dict), ensuring each row's parameters are
+    correctly applied. ON CONFLICT statements using expressions like
+    ``excluded.<column>`` continue to batch normally.
+
+
+.. change::
+    :tags: bug, sqlite
+    :tickets: 13130
+
+    Fixed issue where :meth:`_sqlite.Insert.on_conflict_do_update`
+    using parametrized bound parameters in the ``set_`` clause would fail
+    when used with executemany batching. Row-at-a-time mode is now used
+    for ON CONFLICT statements with RETURNING that contain parametrized
+    bindparams, ensuring each row's parameters are correctly applied. ON
+    CONFLICT statements using expressions like ``excluded.<column>``
+    continue to batch normally.
index 7746ca22467c4dc844d90b65ecf9a1bbfc78f630..a5a15bb361c9553c89a85a97cc3243f2934b24ff 100644 (file)
@@ -2250,7 +2250,9 @@ class PGCompiler(compiler.SQLCompiler):
                 ):
                     value = value._clone()
                     value.type = c.type
-            value_text = self.process(value.self_group(), **set_kw)
+            value_text = self.process(
+                value.self_group(), is_upsert_set=True, **set_kw
+            )
 
             key_text = self.preparer.quote(c.name)
             action_set_ops.append("%s = %s" % (key_text, value_text))
@@ -2273,6 +2275,7 @@ class PGCompiler(compiler.SQLCompiler):
                 )
                 value_text = self.process(
                     coercions.expect(roles.ExpressionElementRole, v),
+                    is_upsert_set=True,
                     **set_kw,
                 )
                 action_set_ops.append("%s = %s" % (key_text, value_text))
index e221f1aed6a3ec12ad27a03b2cc9a10a00a83765..35eb74bfad16e3c7c2ab68e2f335fe6fbca98d38 100644 (file)
@@ -1650,7 +1650,9 @@ class SQLiteCompiler(compiler.SQLCompiler):
                 ):
                     value = value._clone()
                     value.type = c.type
-            value_text = self.process(value.self_group(), **set_kw)
+            value_text = self.process(
+                value.self_group(), is_upsert_set=True, **set_kw
+            )
 
             key_text = self.preparer.quote(c.name)
             action_set_ops.append("%s = %s" % (key_text, value_text))
@@ -1673,6 +1675,7 @@ class SQLiteCompiler(compiler.SQLCompiler):
                 )
                 value_text = self.process(
                     coercions.expect(roles.ExpressionElementRole, v),
+                    is_upsert_set=True,
                     **set_kw,
                 )
                 action_set_ops.append("%s = %s" % (key_text, value_text))
index d4ac047fe20ada83a9e984ea9db2c6d0109c7c08..7819d51ba8b243d17d03ca337bdb66b2fe402717 100644 (file)
@@ -592,6 +592,19 @@ class _InsertManyValues(NamedTuple):
 
     """
 
+    has_upsert_bound_parameters: bool = False
+    """if True, the upsert SET clause contains bound parameters that will
+    receive their values from the parameters dict (i.e., parametrized
+    bindparams where value is None and callable is None).
+
+    This means we can't batch multiple rows in a single statement, since
+    each row would need different values in the SET clause but there's only
+    one SET clause per statement. See issue #13130.
+
+    .. versionadded:: 2.0.37
+
+    """
+
     embed_values_counter: bool = False
     """Whether to embed an incrementing integer counter in each parameter
     set within the VALUES clause as parameters are batched over.
@@ -3675,8 +3688,19 @@ class SQLCompiler(Compiled):
         skip_bind_expression=False,
         literal_execute=False,
         render_postcompile=False,
+        is_upsert_set=False,
         **kwargs,
     ):
+        # Detect parametrized bindparams in upsert SET clause for issue #13130
+        if (
+            is_upsert_set
+            and bindparam.value is None
+            and bindparam.callable is None
+            and self._insertmanyvalues is not None
+        ):
+            self._insertmanyvalues = self._insertmanyvalues._replace(
+                has_upsert_bound_parameters=True
+            )
 
         if not skip_bind_expression:
             impl = bindparam.type.dialect_impl(self.dialect)
@@ -5555,6 +5579,19 @@ class SQLCompiler(Compiled):
             # so we can use batch mode even with upsert behaviors.
             use_row_at_a_time = True
             downgraded = True
+        elif (
+            imv.has_upsert_bound_parameters
+            and not imv.embed_values_counter
+            and self._result_columns
+        ):
+            # For upsert behaviors (ON CONFLICT DO UPDATE, etc.) with RETURNING
+            # and parametrized bindparams in the SET clause, we must use
+            # row-at-a-time. Batching multiple rows in a single statement
+            # doesn't work when the SET clause contains bound parameters that
+            # will receive different values per row, as there's only one SET
+            # clause per statement. See issue #13130.
+            use_row_at_a_time = True
+            downgraded = True
         else:
             use_row_at_a_time = False
             downgraded = False
@@ -5679,6 +5716,7 @@ class SQLCompiler(Compiled):
                 key: parameters[0][key]
                 for key in all_keys.difference(keys_to_replace)
             }
+
             executemany_values_w_comma = ""
         else:
             formatted_values_clause = ""
index 92c5432cdc29ec9e6da61f15d5a35f82700cd568..d37274c8e08e9ebd31a257de9f2a893ccad4d3d3 100644 (file)
@@ -1661,7 +1661,15 @@ def _get_returning_modifiers(compiler, stmt, compile_state, toplevel):
             and compiler.for_executemany
             and dialect.use_insertmanyvalues
             and (
-                explicit_returning or dialect.use_insertmanyvalues_wo_returning
+                explicit_returning
+                or (
+                    dialect.use_insertmanyvalues_wo_returning
+                    # Disable insertmanyvalues_wo_returning when there's a
+                    # post-values clause like ON CONFLICT DO UPDATE.
+                    # This is a performance optimization flag and the batching
+                    # doesn't work correctly with these clauses. See #13130.
+                    and stmt._post_values_clause is None
+                )
             )
         )
 
index 7e71ac72d57c071e8adeef44b5d301622e8600f2..7f8a21f8ae52aa073f1b87ea7c87e5e1c9c77412 100644 (file)
@@ -1,3 +1,4 @@
+from sqlalchemy import bindparam
 from sqlalchemy import Column
 from sqlalchemy import exc
 from sqlalchemy import Integer
@@ -7,6 +8,7 @@ from sqlalchemy import String
 from sqlalchemy import Table
 from sqlalchemy import testing
 from sqlalchemy import types as sqltypes
+from sqlalchemy import UniqueConstraint
 from sqlalchemy.dialects.postgresql import insert
 from sqlalchemy.testing import config
 from sqlalchemy.testing import fixtures
@@ -906,3 +908,144 @@ class OnConflictTest(fixtures.TablesTest, AssertsExecutionResults):
                 (id4, "name4"),
             ],
         )
+
+    @testing.variation("use_returning", [True, False])
+    @testing.variation("bindtype", ["samename", "differentname", "fixed"])
+    def test_on_conflict_do_update_bindparam(
+        self, connection, metadata, use_returning, bindtype
+    ):
+        """Test issue #13130 - ON CONFLICT DO UPDATE with various bindparam
+        patterns.
+
+        Tests insertmanyvalues batching behavior with ON CONFLICT DO UPDATE:
+
+        - samename: bindparam with same name in VALUES and SET
+        - differentname: bindparam with different names in VALUES vs SET
+        - fixed: bindparam with fixed internal value in SET - should batch
+          normally
+
+        Expected insertmanyvalues behavior:
+
+        - samename/differentname + use_returning: row-at-a-time (batch_size=1)
+        - samename/differentname + !use_returning: insertmanyvalues disabled
+        - fixed + use_returning: normal batching
+        - fixed + !use_returning: insertmanyvalues disabled
+        """
+        t = Table(
+            "test_upsert_params",
+            metadata,
+            Column("id", Integer, primary_key=True),
+            Column("name", String(50)),
+            Column("data", String(50)),
+            UniqueConstraint("name", name="uq_test_upsert_params"),
+        )
+        t.create(connection)
+
+        # Build the statement based on bindtype
+        stmt = insert(t).values({"name": bindparam("name")})
+
+        if bindtype.samename:
+            stmt = stmt.on_conflict_do_update(
+                set_={"name": bindparam("name")},
+                constraint="uq_test_upsert_params",
+            )
+            params_insert = [{"name": "Foo"}, {"name": "Bar"}]
+            params_update = [{"name": "Foo"}, {"name": "Bar"}]
+            expected_initial = [("Bar", None), ("Foo", None)]
+            expected_updated = [("Bar", None), ("Foo", None)]
+        elif bindtype.differentname:
+            stmt = insert(t).values({"name": bindparam("name1")})
+            stmt = stmt.on_conflict_do_update(
+                set_={"name": bindparam("name2")},
+                constraint="uq_test_upsert_params",
+            )
+            params_insert = [
+                {"name1": "Foo", "name2": "Foo"},
+                {"name1": "Bar", "name2": "Bar"},
+            ]
+            params_update = [
+                {"name1": "Foo", "name2": "Foo_updated"},
+                {"name1": "Bar", "name2": "Bar_updated"},
+            ]
+            expected_initial = [("Bar", None), ("Foo", None)]
+            expected_updated = [("Bar_updated", None), ("Foo_updated", None)]
+        else:  # bindtype.fixed
+            stmt = stmt.on_conflict_do_update(
+                set_={"data": "newdata"},
+                constraint="uq_test_upsert_params",
+            )
+            params_insert = [{"name": "Foo"}, {"name": "Bar"}]
+            params_update = [{"name": "Foo"}, {"name": "Bar"}]
+            expected_initial = [("Bar", None), ("Foo", None)]
+            expected_updated = [("Bar", "newdata"), ("Foo", "newdata")]
+
+        if use_returning:
+            stmt = stmt.returning(t.c.id, t.c.name, t.c.data)
+
+        # Initial insert
+        result = connection.execute(stmt, params_insert)
+
+        # Verify _insertmanyvalues state
+        compiled = result.context.compiled
+        if use_returning:
+            # With RETURNING, insertmanyvalues should be enabled
+            assert compiled._insertmanyvalues is not None
+            if bindtype.samename or bindtype.differentname:
+                # Parametrized bindparams - flag should be True
+                eq_(
+                    compiled._insertmanyvalues.has_upsert_bound_parameters,
+                    True,
+                )
+            else:  # bindtype.fixed
+                # Fixed value bindparam - flag should be False
+                eq_(
+                    compiled._insertmanyvalues.has_upsert_bound_parameters,
+                    False,
+                )
+        else:
+            # Without RETURNING, insertmanyvalues is disabled for ON CONFLICT
+            eq_(compiled._insertmanyvalues, None)
+
+        if use_returning:
+            rows = result.all()
+            eq_(len(rows), 2)
+            eq_(sorted([(r[1], r[2]) for r in rows]), expected_initial)
+
+        eq_(
+            connection.execute(
+                sql.select(t.c.name, t.c.data).order_by(t.c.name)
+            ).fetchall(),
+            expected_initial,
+        )
+
+        # Test the conflict scenario - update existing rows
+        result = connection.execute(stmt, params_update)
+
+        # Verify _insertmanyvalues state for update scenario
+        compiled = result.context.compiled
+        if use_returning:
+            assert compiled._insertmanyvalues is not None
+            if bindtype.samename or bindtype.differentname:
+                eq_(
+                    compiled._insertmanyvalues.has_upsert_bound_parameters,
+                    True,
+                )
+            else:  # bindtype.fixed
+                eq_(
+                    compiled._insertmanyvalues.has_upsert_bound_parameters,
+                    False,
+                )
+        else:
+            eq_(compiled._insertmanyvalues, None)
+
+        if use_returning:
+            rows = result.all()
+            eq_(len(rows), 2)
+            eq_(sorted([(r[1], r[2]) for r in rows]), expected_updated)
+
+        eq_(
+            connection.execute(
+                sql.select(t.c.name, t.c.data).order_by(t.c.name)
+            ).fetchall(),
+            expected_updated,
+        )
index b0c6712aa57669068dea76230ab673ded6ed5b09..782cd0148abd9bc9bb069ebf967f9f90ff9b2f99 100644 (file)
@@ -1,5 +1,6 @@
 """SQLite-specific tests."""
 
+from sqlalchemy import bindparam
 from sqlalchemy import Column
 from sqlalchemy import exc
 from sqlalchemy import schema
@@ -7,6 +8,7 @@ from sqlalchemy import sql
 from sqlalchemy import Table
 from sqlalchemy import testing
 from sqlalchemy import types as sqltypes
+from sqlalchemy import UniqueConstraint
 from sqlalchemy.dialects.sqlite import insert
 from sqlalchemy.testing import assert_raises
 from sqlalchemy.testing import eq_
@@ -632,3 +634,141 @@ class OnConflictTest(fixtures.TablesTest):
             conn.scalar(sql.select(bind_targets.c.data)),
             "new updated data processed",
         )
+
+    @testing.variation("use_returning", [True, False])
+    @testing.variation("bindtype", ["samename", "differentname", "fixed"])
+    def test_on_conflict_do_update_bindparam(
+        self, connection, metadata, use_returning, bindtype
+    ):
+        """Test issue #13130 - ON CONFLICT DO UPDATE with various bindparam
+        patterns.
+
+        Tests insertmanyvalues batching behavior with ON CONFLICT DO UPDATE:
+
+        - samename: bindparam with same name in VALUES and SET
+        - differentname: bindparam with different names in VALUES vs SET
+        - fixed: bindparam with fixed internal value in SET - should batch
+          normally
+
+        Expected insertmanyvalues behavior:
+
+        - samename/differentname + use_returning: row-at-a-time (batch_size=1)
+        - samename/differentname + !use_returning: insertmanyvalues disabled
+        - fixed + use_returning: normal batching
+        - fixed + !use_returning: insertmanyvalues disabled
+        """
+        t = Table(
+            "test_upsert_params",
+            metadata,
+            Column("id", Integer, primary_key=True),
+            Column("name", String(50)),
+            Column("data", String(50)),
+            UniqueConstraint("name", name="uq_test_upsert_params"),
+        )
+        t.create(connection)
+
+        # Build the statement based on bindtype
+        stmt = insert(t).values({"name": bindparam("name")})
+
+        if bindtype.samename:
+            stmt = stmt.on_conflict_do_update(
+                set_={"name": bindparam("name")}, index_elements=["name"]
+            )
+            params_insert = [{"name": "Foo"}, {"name": "Bar"}]
+            params_update = [{"name": "Foo"}, {"name": "Bar"}]
+            expected_initial = [("Bar", None), ("Foo", None)]
+            expected_updated = [("Bar", None), ("Foo", None)]
+        elif bindtype.differentname:
+            stmt = insert(t).values({"name": bindparam("name1")})
+            stmt = stmt.on_conflict_do_update(
+                set_={"name": bindparam("name2")}, index_elements=["name"]
+            )
+            params_insert = [
+                {"name1": "Foo", "name2": "Foo"},
+                {"name1": "Bar", "name2": "Bar"},
+            ]
+            params_update = [
+                {"name1": "Foo", "name2": "Foo_updated"},
+                {"name1": "Bar", "name2": "Bar_updated"},
+            ]
+            expected_initial = [("Bar", None), ("Foo", None)]
+            expected_updated = [("Bar_updated", None), ("Foo_updated", None)]
+        else:  # bindtype.fixed
+            stmt = stmt.on_conflict_do_update(
+                set_={"data": "newdata"}, index_elements=["name"]
+            )
+            params_insert = [{"name": "Foo"}, {"name": "Bar"}]
+            params_update = [{"name": "Foo"}, {"name": "Bar"}]
+            expected_initial = [("Bar", None), ("Foo", None)]
+            expected_updated = [("Bar", "newdata"), ("Foo", "newdata")]
+
+        if use_returning:
+            stmt = stmt.returning(t.c.id, t.c.name, t.c.data)
+
+        # Initial insert
+        result = connection.execute(stmt, params_insert)
+
+        # Verify _insertmanyvalues state
+        compiled = result.context.compiled
+        if use_returning:
+            # With RETURNING, insertmanyvalues should be enabled
+            assert compiled._insertmanyvalues is not None
+            if bindtype.samename or bindtype.differentname:
+                # Parametrized bindparams - flag should be True
+                eq_(
+                    compiled._insertmanyvalues.has_upsert_bound_parameters,
+                    True,
+                )
+            else:  # bindtype.fixed
+                # Fixed value bindparam - flag should be False
+                eq_(
+                    compiled._insertmanyvalues.has_upsert_bound_parameters,
+                    False,
+                )
+        else:
+            # Without RETURNING, insertmanyvalues is disabled for ON CONFLICT
+            eq_(compiled._insertmanyvalues, None)
+
+        if use_returning:
+            rows = result.all()
+            eq_(len(rows), 2)
+            eq_(sorted([(r[1], r[2]) for r in rows]), expected_initial)
+
+        eq_(
+            connection.execute(
+                sql.select(t.c.name, t.c.data).order_by(t.c.name)
+            ).fetchall(),
+            expected_initial,
+        )
+
+        # Test the conflict scenario - update existing rows
+        result = connection.execute(stmt, params_update)
+
+        # Verify _insertmanyvalues state for update scenario
+        compiled = result.context.compiled
+        if use_returning:
+            assert compiled._insertmanyvalues is not None
+            if bindtype.samename or bindtype.differentname:
+                eq_(
+                    compiled._insertmanyvalues.has_upsert_bound_parameters,
+                    True,
+                )
+            else:  # bindtype.fixed
+                eq_(
+                    compiled._insertmanyvalues.has_upsert_bound_parameters,
+                    False,
+                )
+        else:
+            eq_(compiled._insertmanyvalues, None)
+
+        if use_returning:
+            rows = result.all()
+            eq_(len(rows), 2)
+            eq_(sorted([(r[1], r[2]) for r in rows]), expected_updated)
+
+        eq_(
+            connection.execute(
+                sql.select(t.c.name, t.c.data).order_by(t.c.name)
+            ).fetchall(),
+            expected_updated,
+        )