From 574facaaf4207b952379c28673c44b62835535fb Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Tue, 3 Feb 2026 21:07:59 -0500 Subject: [PATCH] allow batch with upsert if embed_values_counter is True Fixed issue in the :ref:`engine_insertmanyvalues` feature where using PostgreSQL's ``ON CONFLICT`` clause with :paramref:`_dml.Insert.returning.sort_by_parameter_order` enabled would generate invalid SQL when the insert used an implicit sentinel (server-side autoincrement primary key). The generated SQL would incorrectly declare a sentinel counter column in the ``imp_sen`` table alias without providing corresponding values in the ``VALUES`` clause, leading to a ``ProgrammingError`` indicating column count mismatch. The fix allows batch execution mode when ``embed_values_counter`` is active, as the embedded counter provides the ordering capability needed even with upsert behaviors, rather than unnecessarily downgrading to row-at-a-time execution. Fixes: #13107 Change-Id: I382472b2cf2991b520344adea5783584e27425d0 --- doc/build/changelog/unreleased_20/13107.rst | 15 ++ lib/sqlalchemy/dialects/mysql/provision.py | 8 +- .../dialects/postgresql/provision.py | 12 +- lib/sqlalchemy/dialects/sqlite/provision.py | 8 +- lib/sqlalchemy/sql/compiler.py | 12 +- lib/sqlalchemy/testing/provision.py | 8 +- test/dialect/postgresql/test_on_conflict.py | 134 +++++++++++++++++- test/sql/test_insert_exec.py | 121 +++++++++++++++- 8 files changed, 309 insertions(+), 9 deletions(-) create mode 100644 doc/build/changelog/unreleased_20/13107.rst diff --git a/doc/build/changelog/unreleased_20/13107.rst b/doc/build/changelog/unreleased_20/13107.rst new file mode 100644 index 0000000000..0d877d904f --- /dev/null +++ b/doc/build/changelog/unreleased_20/13107.rst @@ -0,0 +1,15 @@ +.. change:: + :tags: bug, postgresql + :tickets: 13107 + + Fixed issue in the :ref:`engine_insertmanyvalues` feature where using + PostgreSQL's ``ON CONFLICT`` clause with + :paramref:`_dml.Insert.returning.sort_by_parameter_order` enabled would + generate invalid SQL when the insert used an implicit sentinel (server-side + autoincrement primary key). The generated SQL would incorrectly declare a + sentinel counter column in the ``imp_sen`` table alias without providing + corresponding values in the ``VALUES`` clause, leading to a + ``ProgrammingError`` indicating column count mismatch. The fix allows batch + execution mode when ``embed_values_counter`` is active, as the embedded + counter provides the ordering capability needed even with upsert behaviors, + rather than unnecessarily downgrading to row-at-a-time execution. diff --git a/lib/sqlalchemy/dialects/mysql/provision.py b/lib/sqlalchemy/dialects/mysql/provision.py index e828cdcfab..46cd8abf7d 100644 --- a/lib/sqlalchemy/dialects/mysql/provision.py +++ b/lib/sqlalchemy/dialects/mysql/provision.py @@ -100,7 +100,13 @@ def _mysql_temp_table_keyword_args(cfg, eng): @upsert.for_db("mariadb") def _upsert( - cfg, table, returning, *, set_lambda=None, sort_by_parameter_order=False + cfg, + table, + returning, + *, + set_lambda=None, + sort_by_parameter_order=False, + index_elements=None, ): from sqlalchemy.dialects.mysql import insert diff --git a/lib/sqlalchemy/dialects/postgresql/provision.py b/lib/sqlalchemy/dialects/postgresql/provision.py index 2ea7e974f4..dfe0f627d2 100644 --- a/lib/sqlalchemy/dialects/postgresql/provision.py +++ b/lib/sqlalchemy/dialects/postgresql/provision.py @@ -137,7 +137,13 @@ def prepare_for_drop_tables(config, connection): @upsert.for_db("postgresql") def _upsert( - cfg, table, returning, *, set_lambda=None, sort_by_parameter_order=False + cfg, + table, + returning, + *, + set_lambda=None, + sort_by_parameter_order=False, + index_elements=None, ): from sqlalchemy.dialects.postgresql import insert @@ -146,8 +152,10 @@ def _upsert( table_pk = inspect(table).selectable if set_lambda: + if index_elements is None: + index_elements = table_pk.primary_key stmt = stmt.on_conflict_do_update( - index_elements=table_pk.primary_key, set_=set_lambda(stmt.excluded) + index_elements=index_elements, set_=set_lambda(stmt.excluded) ) else: stmt = stmt.on_conflict_do_nothing() diff --git a/lib/sqlalchemy/dialects/sqlite/provision.py b/lib/sqlalchemy/dialects/sqlite/provision.py index 7cd8847842..fa63ab5ed9 100644 --- a/lib/sqlalchemy/dialects/sqlite/provision.py +++ b/lib/sqlalchemy/dialects/sqlite/provision.py @@ -206,7 +206,13 @@ def _reap_sqlite_dbs(url, idents): @upsert.for_db("sqlite") def _upsert( - cfg, table, returning, *, set_lambda=None, sort_by_parameter_order=False + cfg, + table, + returning, + *, + set_lambda=None, + sort_by_parameter_order=False, + index_elements=None, ): from sqlalchemy.dialects.sqlite import insert diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py index 31c8ac8b8a..4fc2d2aa36 100644 --- a/lib/sqlalchemy/sql/compiler.py +++ b/lib/sqlalchemy/sql/compiler.py @@ -5735,11 +5735,19 @@ class SQLCompiler(Compiled): elif not self.dialect.supports_multivalues_insert or ( sort_by_parameter_order and self._result_columns - and (imv.sentinel_columns is None or imv.includes_upsert_behaviors) + and ( + imv.sentinel_columns is None + or ( + imv.includes_upsert_behaviors + and not imv.embed_values_counter + ) + ) ): # deterministic order was requested and the compiler could # not organize sentinel columns for this dialect/statement. - # use row at a time + # use row at a time. Note: if embed_values_counter is True, + # the counter itself provides the ordering capability we need, + # so we can use batch mode even with upsert behaviors. use_row_at_a_time = True downgraded = True else: diff --git a/lib/sqlalchemy/testing/provision.py b/lib/sqlalchemy/testing/provision.py index 1f88686e66..97d64ceb1e 100644 --- a/lib/sqlalchemy/testing/provision.py +++ b/lib/sqlalchemy/testing/provision.py @@ -517,7 +517,13 @@ def set_default_schema_on_connection(cfg, dbapi_connection, schema_name): @register.init def upsert( - cfg, table, returning, *, set_lambda=None, sort_by_parameter_order=False + cfg, + table, + returning, + *, + set_lambda=None, + sort_by_parameter_order=False, + index_elements=None, ): """return the backends insert..on conflict / on dupe etc. construct. diff --git a/test/dialect/postgresql/test_on_conflict.py b/test/dialect/postgresql/test_on_conflict.py index 691f6c3962..7e71ac72d5 100644 --- a/test/dialect/postgresql/test_on_conflict.py +++ b/test/dialect/postgresql/test_on_conflict.py @@ -11,10 +11,12 @@ from sqlalchemy.dialects.postgresql import insert from sqlalchemy.testing import config from sqlalchemy.testing import fixtures from sqlalchemy.testing.assertions import assert_raises +from sqlalchemy.testing.assertions import AssertsExecutionResults from sqlalchemy.testing.assertions import eq_ +from sqlalchemy.testing.assertsql import CursorSQL -class OnConflictTest(fixtures.TablesTest): +class OnConflictTest(fixtures.TablesTest, AssertsExecutionResults): __only_on__ = ("postgresql >= 9.5",) __backend__ = True run_define_tables = "each" @@ -774,3 +776,133 @@ class OnConflictTest(fixtures.TablesTest): connection.scalar(sql.select(bind_targets.c.data)), "new updated data processed", ) + + def test_on_conflict_do_update_multirow_returning_ordered( + self, connection + ): + """Test that ON CONFLICT works with multiple rows, + RETURNING, and sort_by_parameter_order=True. + + This is a regression test for issue #13107 where the + insertmanyvalues sentinel counter was not being added + to the VALUES clause when on_conflict_do_update was + present with sort_by_parameter_order=True and the + primary key was autoincrement (not provided in data). + """ + users_xtra = self.tables.users_xtra + + stmt = insert(users_xtra) + stmt = stmt.on_conflict_do_update( + index_elements=["login_email"], + set_={ + "name": stmt.excluded.name, + }, + ) + + result = connection.execute( + stmt.returning( + users_xtra.c.id, + users_xtra.c.name, + sort_by_parameter_order=True, + ), + [ + { + "name": "name1", + "login_email": "user1@example.com", + "lets_index_this": "a", + }, + { + "name": "name2", + "login_email": "user2@example.com", + "lets_index_this": "b", + }, + { + "name": "name3", + "login_email": "user3@example.com", + "lets_index_this": "c", + }, + ], + ) + + # Verify rows are returned in parameter order (names match) + rows = result.all() + eq_([row[1] for row in rows], ["name1", "name2", "name3"]) + # Store IDs for later verification + id1, id2, id3 = [row[0] for row in rows] + + # Verify data was inserted + all_rows = connection.execute( + sql.select(users_xtra.c.id, users_xtra.c.name).order_by( + users_xtra.c.id + ) + ).all() + eq_(all_rows, [(id1, "name1"), (id2, "name2"), (id3, "name3")]) + + # Now update one and insert a new one + + with self.sql_execution_asserter() as asserter: + result = connection.execute( + stmt.returning( + users_xtra.c.id, + users_xtra.c.name, + sort_by_parameter_order=True, + ), + [ + { + "name": "name2_updated", + "login_email": "user2@example.com", + "lets_index_this": "b", + }, + { + "name": "name4", + "login_email": "user4@example.com", + "lets_index_this": "d", + }, + ], + ) + + if testing.against("+psycopg"): + asserter.assert_( + CursorSQL( + "INSERT INTO users_xtra (name, login_email," + " lets_index_this) SELECT p0::VARCHAR, p1::VARCHAR," + " p2::VARCHAR FROM (VALUES (%(name__0)s::VARCHAR," + " %(login_email__0)s::VARCHAR," + " %(lets_index_this__0)s::VARCHAR, 0)," + " (%(name__1)s::VARCHAR, %(login_email__1)s::VARCHAR," + " %(lets_index_this__1)s::VARCHAR, 1)) AS imp_sen(p0, p1," + " p2, sen_counter) ORDER BY sen_counter ON CONFLICT" + " (login_email) DO UPDATE SET name = excluded.name" + " RETURNING users_xtra.id, users_xtra.name, users_xtra.id" + " AS id__1", + { + "name__0": "name2_updated", + "login_email__0": "user2@example.com", + "lets_index_this__0": "b", + "name__1": "name4", + "login_email__1": "user4@example.com", + "lets_index_this__1": "d", + }, + ) + ) + # Verify rows are returned in parameter order + rows = result.all() + eq_([row[1] for row in rows], ["name2_updated", "name4"]) + # First should be update (same ID), second is insert (new ID) + eq_(rows[0][0], id2) + id4 = rows[1][0] + + # Verify final state + eq_( + connection.execute( + sql.select(users_xtra.c.id, users_xtra.c.name).order_by( + users_xtra.c.id + ) + ).all(), + [ + (id1, "name1"), + (id2, "name2_updated"), + (id3, "name3"), + (id4, "name4"), + ], + ) diff --git a/test/sql/test_insert_exec.py b/test/sql/test_insert_exec.py index e362a011b9..be3f880b99 100644 --- a/test/sql/test_insert_exec.py +++ b/test/sql/test_insert_exec.py @@ -1233,6 +1233,7 @@ class IMVSentinelTest(fixtures.TestBase): client_side_pk=False, autoincrement_is_sequence=False, connection=None, + expect_warnings_override=None, ): if connection: dialect = connection.dialect @@ -1240,7 +1241,8 @@ class IMVSentinelTest(fixtures.TestBase): dialect = testing.db.dialect if ( - sort_by_parameter_order + expect_warnings_override is not False + and sort_by_parameter_order and warn_for_downgrades and dialect.use_insertmanyvalues ): @@ -1267,6 +1269,7 @@ class IMVSentinelTest(fixtures.TestBase): and not server_autoincrement and not client_side_pk ) + or (expect_warnings_override is True) ): return expect_warnings( "Batches were downgraded", @@ -2883,6 +2886,122 @@ class IMVSentinelTest(fixtures.TestBase): eq_(coll(result), coll(expected_data)) + @testing.variation( + "sentinel_type", + [ + "use_pk", + "separate_uuid", + "separate_sentinel", + ], + ) + @testing.requires.provisioned_upsert + def test_upsert_autoincrement_downgrades( + self, + metadata, + connection, + sort_by_parameter_order, + randomize_returning, + sentinel_type, + warn_for_downgrades, + ): + pk_col = Column( + "id", Integer, test_needs_autoincrement=True, primary_key=True + ) + + if sentinel_type.separate_uuid: + extra_col = Column( + "sent_col", + Uuid(), + default=uuid.uuid4, + insert_sentinel=True, + nullable=False, + ) + elif sentinel_type.separate_sentinel: + extra_col = insert_sentinel("sent_col") + else: + extra_col = Column("sent_col", Integer) + + t1 = Table( + "upsert_table", + metadata, + pk_col, + Column("otherid", Integer, unique=True), + Column("data", String(50)), + extra_col, + Column( + "has_server_default", + String(30), + server_default="some_server_default", + ), + ) + metadata.create_all(connection) + + result = connection.execute( + insert(t1).returning( + t1.c.id, t1.c.data, sort_by_parameter_order=True + ), + [{"otherid": 1, "data": "d1"}, {"otherid": 2, "data": "d2"}], + ) + + upsert_data = [ + {"otherid": 1, "data": "d1 new"}, + {"otherid": 3, "data": "d10"}, + {"otherid": 4, "data": "d15"}, + {"otherid": 2, "data": "d2 new"}, + {"otherid": 5, "data": "d3"}, + ] + + fixtures.insertmanyvalues_fixture( + connection, + randomize_rows=bool(randomize_returning), + warn_on_downgraded=bool(warn_for_downgrades), + ) + + stmt = provision.upsert( + config, + t1, + (t1.c.data, t1.c.has_server_default), + set_lambda=lambda inserted: { + "data": inserted.data + " upserted", + }, + sort_by_parameter_order=bool(sort_by_parameter_order), + index_elements=["otherid"], + ) + + with self._expect_downgrade_warnings( + warn_for_downgrades=warn_for_downgrades, + sort_by_parameter_order=sort_by_parameter_order, + expect_warnings_override=( + testing.against("mysql", "mariadb", "sqlite") + or (testing.against("postgresql") and not sentinel_type.use_pk) + ), + ): + result = connection.execute(stmt, upsert_data) + + if sentinel_type.use_pk and testing.against("postgresql"): + expected_data = [ + ("d1 new upserted", "some_server_default"), + ("d2 new upserted", "some_server_default"), + ("d10", "some_server_default"), + ("d15", "some_server_default"), + ("d3", "some_server_default"), + ] + else: + expected_data = [ + ("d1 new upserted", "some_server_default"), + ("d10", "some_server_default"), + ("d15", "some_server_default"), + ("d2 new upserted", "some_server_default"), + ("d3", "some_server_default"), + ] + + if sort_by_parameter_order: + coll = list + else: + coll = set + + eq_(coll(result), coll(expected_data)) + def test_auto_downgraded_non_mvi_dialect( self, metadata, -- 2.47.3