From: Mike Bayer Date: Tue, 17 Feb 2026 20:58:22 +0000 (-0500) Subject: downgrade batches for bindparam() in SET X-Git-Tag: rel_2_0_47~4 X-Git-Url: http://git.ipfire.org/gitweb/?a=commitdiff_plain;h=b481c9ae9cf5887e313baed5c8fb4701528806ef;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git downgrade batches for bindparam() in SET Fixed issue where :meth:`_postgresql.Insert.on_conflict_do_update` using parametrized bound parameters in the ``set_`` clause would fail when used with executemany batching. For dialects that use the ``use_insertmanyvalues_wo_returning`` optimization (psycopg2), insertmanyvalues is now disabled when there is an ON CONFLICT clause. For cases with RETURNING, row-at-a-time mode is used when the SET clause contains parametrized bindparams (bindparams that receive values from the parameters dict), ensuring each row's parameters are correctly applied. ON CONFLICT statements using expressions like ``excluded.`` continue to batch normally. Fixed issue where :meth:`_sqlite.Insert.on_conflict_do_update` using parametrized bound parameters in the ``set_`` clause would fail when used with executemany batching. Row-at-a-time mode is now used for ON CONFLICT statements with RETURNING that contain parametrized bindparams, ensuring each row's parameters are correctly applied. ON CONFLICT statements using expressions like ``excluded.`` continue to batch normally. Fixes: #13130 Change-Id: I0c5a9142401c745d38e58d071c16e53610f9bfea (cherry picked from commit 5b2e7aae01cc2e55e68a8445569ee709b17715dd) --- diff --git a/doc/build/changelog/unreleased_20/13130.rst b/doc/build/changelog/unreleased_20/13130.rst new file mode 100644 index 0000000000..0a9febc88f --- /dev/null +++ b/doc/build/changelog/unreleased_20/13130.rst @@ -0,0 +1,27 @@ +.. change:: + :tags: bug, postgresql + :tickets: 13130 + + Fixed issue where :meth:`_postgresql.Insert.on_conflict_do_update` + using parametrized bound parameters in the ``set_`` clause would fail + when used with executemany batching. For dialects that use the + ``use_insertmanyvalues_wo_returning`` optimization (psycopg2), + insertmanyvalues is now disabled when there is an ON CONFLICT clause. + For cases with RETURNING, row-at-a-time mode is used when the SET + clause contains parametrized bindparams (bindparams that receive + values from the parameters dict), ensuring each row's parameters are + correctly applied. ON CONFLICT statements using expressions like + ``excluded.`` continue to batch normally. + + +.. change:: + :tags: bug, sqlite + :tickets: 13130 + + Fixed issue where :meth:`_sqlite.Insert.on_conflict_do_update` + using parametrized bound parameters in the ``set_`` clause would fail + when used with executemany batching. Row-at-a-time mode is now used + for ON CONFLICT statements with RETURNING that contain parametrized + bindparams, ensuring each row's parameters are correctly applied. ON + CONFLICT statements using expressions like ``excluded.`` + continue to batch normally. diff --git a/lib/sqlalchemy/dialects/postgresql/base.py b/lib/sqlalchemy/dialects/postgresql/base.py index 7746ca2246..a5a15bb361 100644 --- a/lib/sqlalchemy/dialects/postgresql/base.py +++ b/lib/sqlalchemy/dialects/postgresql/base.py @@ -2250,7 +2250,9 @@ class PGCompiler(compiler.SQLCompiler): ): value = value._clone() value.type = c.type - value_text = self.process(value.self_group(), **set_kw) + value_text = self.process( + value.self_group(), is_upsert_set=True, **set_kw + ) key_text = self.preparer.quote(c.name) action_set_ops.append("%s = %s" % (key_text, value_text)) @@ -2273,6 +2275,7 @@ class PGCompiler(compiler.SQLCompiler): ) value_text = self.process( coercions.expect(roles.ExpressionElementRole, v), + is_upsert_set=True, **set_kw, ) action_set_ops.append("%s = %s" % (key_text, value_text)) diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py index e221f1aed6..35eb74bfad 100644 --- a/lib/sqlalchemy/dialects/sqlite/base.py +++ b/lib/sqlalchemy/dialects/sqlite/base.py @@ -1650,7 +1650,9 @@ class SQLiteCompiler(compiler.SQLCompiler): ): value = value._clone() value.type = c.type - value_text = self.process(value.self_group(), **set_kw) + value_text = self.process( + value.self_group(), is_upsert_set=True, **set_kw + ) key_text = self.preparer.quote(c.name) action_set_ops.append("%s = %s" % (key_text, value_text)) @@ -1673,6 +1675,7 @@ class SQLiteCompiler(compiler.SQLCompiler): ) value_text = self.process( coercions.expect(roles.ExpressionElementRole, v), + is_upsert_set=True, **set_kw, ) action_set_ops.append("%s = %s" % (key_text, value_text)) diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py index d4ac047fe2..7819d51ba8 100644 --- a/lib/sqlalchemy/sql/compiler.py +++ b/lib/sqlalchemy/sql/compiler.py @@ -592,6 +592,19 @@ class _InsertManyValues(NamedTuple): """ + has_upsert_bound_parameters: bool = False + """if True, the upsert SET clause contains bound parameters that will + receive their values from the parameters dict (i.e., parametrized + bindparams where value is None and callable is None). + + This means we can't batch multiple rows in a single statement, since + each row would need different values in the SET clause but there's only + one SET clause per statement. See issue #13130. + + .. versionadded:: 2.0.37 + + """ + embed_values_counter: bool = False """Whether to embed an incrementing integer counter in each parameter set within the VALUES clause as parameters are batched over. @@ -3675,8 +3688,19 @@ class SQLCompiler(Compiled): skip_bind_expression=False, literal_execute=False, render_postcompile=False, + is_upsert_set=False, **kwargs, ): + # Detect parametrized bindparams in upsert SET clause for issue #13130 + if ( + is_upsert_set + and bindparam.value is None + and bindparam.callable is None + and self._insertmanyvalues is not None + ): + self._insertmanyvalues = self._insertmanyvalues._replace( + has_upsert_bound_parameters=True + ) if not skip_bind_expression: impl = bindparam.type.dialect_impl(self.dialect) @@ -5555,6 +5579,19 @@ class SQLCompiler(Compiled): # so we can use batch mode even with upsert behaviors. use_row_at_a_time = True downgraded = True + elif ( + imv.has_upsert_bound_parameters + and not imv.embed_values_counter + and self._result_columns + ): + # For upsert behaviors (ON CONFLICT DO UPDATE, etc.) with RETURNING + # and parametrized bindparams in the SET clause, we must use + # row-at-a-time. Batching multiple rows in a single statement + # doesn't work when the SET clause contains bound parameters that + # will receive different values per row, as there's only one SET + # clause per statement. See issue #13130. + use_row_at_a_time = True + downgraded = True else: use_row_at_a_time = False downgraded = False @@ -5679,6 +5716,7 @@ class SQLCompiler(Compiled): key: parameters[0][key] for key in all_keys.difference(keys_to_replace) } + executemany_values_w_comma = "" else: formatted_values_clause = "" diff --git a/lib/sqlalchemy/sql/crud.py b/lib/sqlalchemy/sql/crud.py index 92c5432cdc..d37274c8e0 100644 --- a/lib/sqlalchemy/sql/crud.py +++ b/lib/sqlalchemy/sql/crud.py @@ -1661,7 +1661,15 @@ def _get_returning_modifiers(compiler, stmt, compile_state, toplevel): and compiler.for_executemany and dialect.use_insertmanyvalues and ( - explicit_returning or dialect.use_insertmanyvalues_wo_returning + explicit_returning + or ( + dialect.use_insertmanyvalues_wo_returning + # Disable insertmanyvalues_wo_returning when there's a + # post-values clause like ON CONFLICT DO UPDATE. + # This is a performance optimization flag and the batching + # doesn't work correctly with these clauses. See #13130. + and stmt._post_values_clause is None + ) ) ) diff --git a/test/dialect/postgresql/test_on_conflict.py b/test/dialect/postgresql/test_on_conflict.py index 7e71ac72d5..7f8a21f8ae 100644 --- a/test/dialect/postgresql/test_on_conflict.py +++ b/test/dialect/postgresql/test_on_conflict.py @@ -1,3 +1,4 @@ +from sqlalchemy import bindparam from sqlalchemy import Column from sqlalchemy import exc from sqlalchemy import Integer @@ -7,6 +8,7 @@ from sqlalchemy import String from sqlalchemy import Table from sqlalchemy import testing from sqlalchemy import types as sqltypes +from sqlalchemy import UniqueConstraint from sqlalchemy.dialects.postgresql import insert from sqlalchemy.testing import config from sqlalchemy.testing import fixtures @@ -906,3 +908,144 @@ class OnConflictTest(fixtures.TablesTest, AssertsExecutionResults): (id4, "name4"), ], ) + + @testing.variation("use_returning", [True, False]) + @testing.variation("bindtype", ["samename", "differentname", "fixed"]) + def test_on_conflict_do_update_bindparam( + self, connection, metadata, use_returning, bindtype + ): + """Test issue #13130 - ON CONFLICT DO UPDATE with various bindparam + patterns. + + Tests insertmanyvalues batching behavior with ON CONFLICT DO UPDATE: + + - samename: bindparam with same name in VALUES and SET + - differentname: bindparam with different names in VALUES vs SET + - fixed: bindparam with fixed internal value in SET - should batch + normally + + Expected insertmanyvalues behavior: + + - samename/differentname + use_returning: row-at-a-time (batch_size=1) + - samename/differentname + !use_returning: insertmanyvalues disabled + - fixed + use_returning: normal batching + - fixed + !use_returning: insertmanyvalues disabled + """ + t = Table( + "test_upsert_params", + metadata, + Column("id", Integer, primary_key=True), + Column("name", String(50)), + Column("data", String(50)), + UniqueConstraint("name", name="uq_test_upsert_params"), + ) + t.create(connection) + + # Build the statement based on bindtype + stmt = insert(t).values({"name": bindparam("name")}) + + if bindtype.samename: + stmt = stmt.on_conflict_do_update( + set_={"name": bindparam("name")}, + constraint="uq_test_upsert_params", + ) + params_insert = [{"name": "Foo"}, {"name": "Bar"}] + params_update = [{"name": "Foo"}, {"name": "Bar"}] + expected_initial = [("Bar", None), ("Foo", None)] + expected_updated = [("Bar", None), ("Foo", None)] + elif bindtype.differentname: + stmt = insert(t).values({"name": bindparam("name1")}) + stmt = stmt.on_conflict_do_update( + set_={"name": bindparam("name2")}, + constraint="uq_test_upsert_params", + ) + params_insert = [ + {"name1": "Foo", "name2": "Foo"}, + {"name1": "Bar", "name2": "Bar"}, + ] + params_update = [ + {"name1": "Foo", "name2": "Foo_updated"}, + {"name1": "Bar", "name2": "Bar_updated"}, + ] + expected_initial = [("Bar", None), ("Foo", None)] + expected_updated = [("Bar_updated", None), ("Foo_updated", None)] + else: # bindtype.fixed + stmt = stmt.on_conflict_do_update( + set_={"data": "newdata"}, + constraint="uq_test_upsert_params", + ) + params_insert = [{"name": "Foo"}, {"name": "Bar"}] + params_update = [{"name": "Foo"}, {"name": "Bar"}] + expected_initial = [("Bar", None), ("Foo", None)] + expected_updated = [("Bar", "newdata"), ("Foo", "newdata")] + + if use_returning: + stmt = stmt.returning(t.c.id, t.c.name, t.c.data) + + # Initial insert + result = connection.execute(stmt, params_insert) + + # Verify _insertmanyvalues state + compiled = result.context.compiled + if use_returning: + # With RETURNING, insertmanyvalues should be enabled + assert compiled._insertmanyvalues is not None + if bindtype.samename or bindtype.differentname: + # Parametrized bindparams - flag should be True + eq_( + compiled._insertmanyvalues.has_upsert_bound_parameters, + True, + ) + else: # bindtype.fixed + # Fixed value bindparam - flag should be False + eq_( + compiled._insertmanyvalues.has_upsert_bound_parameters, + False, + ) + else: + # Without RETURNING, insertmanyvalues is disabled for ON CONFLICT + eq_(compiled._insertmanyvalues, None) + + if use_returning: + rows = result.all() + eq_(len(rows), 2) + eq_(sorted([(r[1], r[2]) for r in rows]), expected_initial) + + eq_( + connection.execute( + sql.select(t.c.name, t.c.data).order_by(t.c.name) + ).fetchall(), + expected_initial, + ) + + # Test the conflict scenario - update existing rows + result = connection.execute(stmt, params_update) + + # Verify _insertmanyvalues state for update scenario + compiled = result.context.compiled + if use_returning: + assert compiled._insertmanyvalues is not None + if bindtype.samename or bindtype.differentname: + eq_( + compiled._insertmanyvalues.has_upsert_bound_parameters, + True, + ) + else: # bindtype.fixed + eq_( + compiled._insertmanyvalues.has_upsert_bound_parameters, + False, + ) + else: + eq_(compiled._insertmanyvalues, None) + + if use_returning: + rows = result.all() + eq_(len(rows), 2) + eq_(sorted([(r[1], r[2]) for r in rows]), expected_updated) + + eq_( + connection.execute( + sql.select(t.c.name, t.c.data).order_by(t.c.name) + ).fetchall(), + expected_updated, + ) diff --git a/test/dialect/sqlite/test_on_conflict.py b/test/dialect/sqlite/test_on_conflict.py index b0c6712aa5..782cd0148a 100644 --- a/test/dialect/sqlite/test_on_conflict.py +++ b/test/dialect/sqlite/test_on_conflict.py @@ -1,5 +1,6 @@ """SQLite-specific tests.""" +from sqlalchemy import bindparam from sqlalchemy import Column from sqlalchemy import exc from sqlalchemy import schema @@ -7,6 +8,7 @@ from sqlalchemy import sql from sqlalchemy import Table from sqlalchemy import testing from sqlalchemy import types as sqltypes +from sqlalchemy import UniqueConstraint from sqlalchemy.dialects.sqlite import insert from sqlalchemy.testing import assert_raises from sqlalchemy.testing import eq_ @@ -632,3 +634,141 @@ class OnConflictTest(fixtures.TablesTest): conn.scalar(sql.select(bind_targets.c.data)), "new updated data processed", ) + + @testing.variation("use_returning", [True, False]) + @testing.variation("bindtype", ["samename", "differentname", "fixed"]) + def test_on_conflict_do_update_bindparam( + self, connection, metadata, use_returning, bindtype + ): + """Test issue #13130 - ON CONFLICT DO UPDATE with various bindparam + patterns. + + Tests insertmanyvalues batching behavior with ON CONFLICT DO UPDATE: + + - samename: bindparam with same name in VALUES and SET + - differentname: bindparam with different names in VALUES vs SET + - fixed: bindparam with fixed internal value in SET - should batch + normally + + Expected insertmanyvalues behavior: + + - samename/differentname + use_returning: row-at-a-time (batch_size=1) + - samename/differentname + !use_returning: insertmanyvalues disabled + - fixed + use_returning: normal batching + - fixed + !use_returning: insertmanyvalues disabled + """ + t = Table( + "test_upsert_params", + metadata, + Column("id", Integer, primary_key=True), + Column("name", String(50)), + Column("data", String(50)), + UniqueConstraint("name", name="uq_test_upsert_params"), + ) + t.create(connection) + + # Build the statement based on bindtype + stmt = insert(t).values({"name": bindparam("name")}) + + if bindtype.samename: + stmt = stmt.on_conflict_do_update( + set_={"name": bindparam("name")}, index_elements=["name"] + ) + params_insert = [{"name": "Foo"}, {"name": "Bar"}] + params_update = [{"name": "Foo"}, {"name": "Bar"}] + expected_initial = [("Bar", None), ("Foo", None)] + expected_updated = [("Bar", None), ("Foo", None)] + elif bindtype.differentname: + stmt = insert(t).values({"name": bindparam("name1")}) + stmt = stmt.on_conflict_do_update( + set_={"name": bindparam("name2")}, index_elements=["name"] + ) + params_insert = [ + {"name1": "Foo", "name2": "Foo"}, + {"name1": "Bar", "name2": "Bar"}, + ] + params_update = [ + {"name1": "Foo", "name2": "Foo_updated"}, + {"name1": "Bar", "name2": "Bar_updated"}, + ] + expected_initial = [("Bar", None), ("Foo", None)] + expected_updated = [("Bar_updated", None), ("Foo_updated", None)] + else: # bindtype.fixed + stmt = stmt.on_conflict_do_update( + set_={"data": "newdata"}, index_elements=["name"] + ) + params_insert = [{"name": "Foo"}, {"name": "Bar"}] + params_update = [{"name": "Foo"}, {"name": "Bar"}] + expected_initial = [("Bar", None), ("Foo", None)] + expected_updated = [("Bar", "newdata"), ("Foo", "newdata")] + + if use_returning: + stmt = stmt.returning(t.c.id, t.c.name, t.c.data) + + # Initial insert + result = connection.execute(stmt, params_insert) + + # Verify _insertmanyvalues state + compiled = result.context.compiled + if use_returning: + # With RETURNING, insertmanyvalues should be enabled + assert compiled._insertmanyvalues is not None + if bindtype.samename or bindtype.differentname: + # Parametrized bindparams - flag should be True + eq_( + compiled._insertmanyvalues.has_upsert_bound_parameters, + True, + ) + else: # bindtype.fixed + # Fixed value bindparam - flag should be False + eq_( + compiled._insertmanyvalues.has_upsert_bound_parameters, + False, + ) + else: + # Without RETURNING, insertmanyvalues is disabled for ON CONFLICT + eq_(compiled._insertmanyvalues, None) + + if use_returning: + rows = result.all() + eq_(len(rows), 2) + eq_(sorted([(r[1], r[2]) for r in rows]), expected_initial) + + eq_( + connection.execute( + sql.select(t.c.name, t.c.data).order_by(t.c.name) + ).fetchall(), + expected_initial, + ) + + # Test the conflict scenario - update existing rows + result = connection.execute(stmt, params_update) + + # Verify _insertmanyvalues state for update scenario + compiled = result.context.compiled + if use_returning: + assert compiled._insertmanyvalues is not None + if bindtype.samename or bindtype.differentname: + eq_( + compiled._insertmanyvalues.has_upsert_bound_parameters, + True, + ) + else: # bindtype.fixed + eq_( + compiled._insertmanyvalues.has_upsert_bound_parameters, + False, + ) + else: + eq_(compiled._insertmanyvalues, None) + + if use_returning: + rows = result.all() + eq_(len(rows), 2) + eq_(sorted([(r[1], r[2]) for r in rows]), expected_updated) + + eq_( + connection.execute( + sql.select(t.c.name, t.c.data).order_by(t.c.name) + ).fetchall(), + expected_updated, + )