]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
allow batch with upsert if embed_values_counter is True
authorMike Bayer <mike_mp@zzzcomputing.com>
Wed, 4 Feb 2026 02:07:59 +0000 (21:07 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Thu, 5 Feb 2026 15:09:09 +0000 (10:09 -0500)
Fixed issue in the :ref:`engine_insertmanyvalues` feature where using
PostgreSQL's ``ON CONFLICT`` clause with
:paramref:`_dml.Insert.returning.sort_by_parameter_order` enabled would
generate invalid SQL when the insert used an implicit sentinel (server-side
autoincrement primary key). The generated SQL would incorrectly declare a
sentinel counter column in the ``imp_sen`` table alias without providing
corresponding values in the ``VALUES`` clause, leading to a
``ProgrammingError`` indicating column count mismatch. The fix allows batch
execution mode when ``embed_values_counter`` is active, as the embedded
counter provides the ordering capability needed even with upsert behaviors,
rather than unnecessarily downgrading to row-at-a-time execution.

Fixes: #13107
Change-Id: I382472b2cf2991b520344adea5783584e27425d0

doc/build/changelog/unreleased_20/13107.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/mysql/provision.py
lib/sqlalchemy/dialects/postgresql/provision.py
lib/sqlalchemy/dialects/sqlite/provision.py
lib/sqlalchemy/sql/compiler.py
lib/sqlalchemy/testing/provision.py
test/dialect/postgresql/test_on_conflict.py
test/sql/test_insert_exec.py

diff --git a/doc/build/changelog/unreleased_20/13107.rst b/doc/build/changelog/unreleased_20/13107.rst
new file mode 100644 (file)
index 0000000..0d877d9
--- /dev/null
@@ -0,0 +1,15 @@
+.. change::
+    :tags: bug, postgresql
+    :tickets: 13107
+
+    Fixed issue in the :ref:`engine_insertmanyvalues` feature where using
+    PostgreSQL's ``ON CONFLICT`` clause with
+    :paramref:`_dml.Insert.returning.sort_by_parameter_order` enabled would
+    generate invalid SQL when the insert used an implicit sentinel (server-side
+    autoincrement primary key). The generated SQL would incorrectly declare a
+    sentinel counter column in the ``imp_sen`` table alias without providing
+    corresponding values in the ``VALUES`` clause, leading to a
+    ``ProgrammingError`` indicating column count mismatch. The fix allows batch
+    execution mode when ``embed_values_counter`` is active, as the embedded
+    counter provides the ordering capability needed even with upsert behaviors,
+    rather than unnecessarily downgrading to row-at-a-time execution.
index e828cdcfab476132679ede7bfd87c33d0ef8fb33..46cd8abf7def5b6616bba86a511d261c972707b4 100644 (file)
@@ -100,7 +100,13 @@ def _mysql_temp_table_keyword_args(cfg, eng):
 
 @upsert.for_db("mariadb")
 def _upsert(
-    cfg, table, returning, *, set_lambda=None, sort_by_parameter_order=False
+    cfg,
+    table,
+    returning,
+    *,
+    set_lambda=None,
+    sort_by_parameter_order=False,
+    index_elements=None,
 ):
     from sqlalchemy.dialects.mysql import insert
 
index 2ea7e974f4c24620730dca1bec247ed3445ef77d..dfe0f627d2e7c0fe41bcdd7633d19c3b1403e6df 100644 (file)
@@ -137,7 +137,13 @@ def prepare_for_drop_tables(config, connection):
 
 @upsert.for_db("postgresql")
 def _upsert(
-    cfg, table, returning, *, set_lambda=None, sort_by_parameter_order=False
+    cfg,
+    table,
+    returning,
+    *,
+    set_lambda=None,
+    sort_by_parameter_order=False,
+    index_elements=None,
 ):
     from sqlalchemy.dialects.postgresql import insert
 
@@ -146,8 +152,10 @@ def _upsert(
     table_pk = inspect(table).selectable
 
     if set_lambda:
+        if index_elements is None:
+            index_elements = table_pk.primary_key
         stmt = stmt.on_conflict_do_update(
-            index_elements=table_pk.primary_key, set_=set_lambda(stmt.excluded)
+            index_elements=index_elements, set_=set_lambda(stmt.excluded)
         )
     else:
         stmt = stmt.on_conflict_do_nothing()
index 7cd8847842cb1bde71fcf56a2c1d3a96a40399e2..fa63ab5ed97e8403899c32bf48bbb2af153535db 100644 (file)
@@ -206,7 +206,13 @@ def _reap_sqlite_dbs(url, idents):
 
 @upsert.for_db("sqlite")
 def _upsert(
-    cfg, table, returning, *, set_lambda=None, sort_by_parameter_order=False
+    cfg,
+    table,
+    returning,
+    *,
+    set_lambda=None,
+    sort_by_parameter_order=False,
+    index_elements=None,
 ):
     from sqlalchemy.dialects.sqlite import insert
 
index 31c8ac8b8a4498fbec9ad1d006e25296128df471..4fc2d2aa366affeb4610fc5430684f467b96e5cc 100644 (file)
@@ -5735,11 +5735,19 @@ class SQLCompiler(Compiled):
         elif not self.dialect.supports_multivalues_insert or (
             sort_by_parameter_order
             and self._result_columns
-            and (imv.sentinel_columns is None or imv.includes_upsert_behaviors)
+            and (
+                imv.sentinel_columns is None
+                or (
+                    imv.includes_upsert_behaviors
+                    and not imv.embed_values_counter
+                )
+            )
         ):
             # deterministic order was requested and the compiler could
             # not organize sentinel columns for this dialect/statement.
-            # use row at a time
+            # use row at a time.  Note: if embed_values_counter is True,
+            # the counter itself provides the ordering capability we need,
+            # so we can use batch mode even with upsert behaviors.
             use_row_at_a_time = True
             downgraded = True
         else:
index 1f88686e6662194187d3b423d73a37316e96a9a2..97d64ceb1eaf71636b46e757689713c0f16c2d62 100644 (file)
@@ -517,7 +517,13 @@ def set_default_schema_on_connection(cfg, dbapi_connection, schema_name):
 
 @register.init
 def upsert(
-    cfg, table, returning, *, set_lambda=None, sort_by_parameter_order=False
+    cfg,
+    table,
+    returning,
+    *,
+    set_lambda=None,
+    sort_by_parameter_order=False,
+    index_elements=None,
 ):
     """return the backends insert..on conflict / on dupe etc. construct.
 
index 691f6c3962035bdd389eb1a8073c86e9a91fa979..7e71ac72d57c071e8adeef44b5d301622e8600f2 100644 (file)
@@ -11,10 +11,12 @@ from sqlalchemy.dialects.postgresql import insert
 from sqlalchemy.testing import config
 from sqlalchemy.testing import fixtures
 from sqlalchemy.testing.assertions import assert_raises
+from sqlalchemy.testing.assertions import AssertsExecutionResults
 from sqlalchemy.testing.assertions import eq_
+from sqlalchemy.testing.assertsql import CursorSQL
 
 
-class OnConflictTest(fixtures.TablesTest):
+class OnConflictTest(fixtures.TablesTest, AssertsExecutionResults):
     __only_on__ = ("postgresql >= 9.5",)
     __backend__ = True
     run_define_tables = "each"
@@ -774,3 +776,133 @@ class OnConflictTest(fixtures.TablesTest):
             connection.scalar(sql.select(bind_targets.c.data)),
             "new updated data processed",
         )
+
+    def test_on_conflict_do_update_multirow_returning_ordered(
+        self, connection
+    ):
+        """Test that ON CONFLICT works with multiple rows,
+        RETURNING, and sort_by_parameter_order=True.
+
+        This is a regression test for issue #13107 where the
+        insertmanyvalues sentinel counter was not being added
+        to the VALUES clause when on_conflict_do_update was
+        present with sort_by_parameter_order=True and the
+        primary key was autoincrement (not provided in data).
+        """
+        users_xtra = self.tables.users_xtra
+
+        stmt = insert(users_xtra)
+        stmt = stmt.on_conflict_do_update(
+            index_elements=["login_email"],
+            set_={
+                "name": stmt.excluded.name,
+            },
+        )
+
+        result = connection.execute(
+            stmt.returning(
+                users_xtra.c.id,
+                users_xtra.c.name,
+                sort_by_parameter_order=True,
+            ),
+            [
+                {
+                    "name": "name1",
+                    "login_email": "user1@example.com",
+                    "lets_index_this": "a",
+                },
+                {
+                    "name": "name2",
+                    "login_email": "user2@example.com",
+                    "lets_index_this": "b",
+                },
+                {
+                    "name": "name3",
+                    "login_email": "user3@example.com",
+                    "lets_index_this": "c",
+                },
+            ],
+        )
+
+        # Verify rows are returned in parameter order (names match)
+        rows = result.all()
+        eq_([row[1] for row in rows], ["name1", "name2", "name3"])
+        # Store IDs for later verification
+        id1, id2, id3 = [row[0] for row in rows]
+
+        # Verify data was inserted
+        all_rows = connection.execute(
+            sql.select(users_xtra.c.id, users_xtra.c.name).order_by(
+                users_xtra.c.id
+            )
+        ).all()
+        eq_(all_rows, [(id1, "name1"), (id2, "name2"), (id3, "name3")])
+
+        # Now update one and insert a new one
+
+        with self.sql_execution_asserter() as asserter:
+            result = connection.execute(
+                stmt.returning(
+                    users_xtra.c.id,
+                    users_xtra.c.name,
+                    sort_by_parameter_order=True,
+                ),
+                [
+                    {
+                        "name": "name2_updated",
+                        "login_email": "user2@example.com",
+                        "lets_index_this": "b",
+                    },
+                    {
+                        "name": "name4",
+                        "login_email": "user4@example.com",
+                        "lets_index_this": "d",
+                    },
+                ],
+            )
+
+        if testing.against("+psycopg"):
+            asserter.assert_(
+                CursorSQL(
+                    "INSERT INTO users_xtra (name, login_email,"
+                    " lets_index_this) SELECT p0::VARCHAR, p1::VARCHAR,"
+                    " p2::VARCHAR FROM (VALUES (%(name__0)s::VARCHAR,"
+                    " %(login_email__0)s::VARCHAR,"
+                    " %(lets_index_this__0)s::VARCHAR, 0),"
+                    " (%(name__1)s::VARCHAR, %(login_email__1)s::VARCHAR,"
+                    " %(lets_index_this__1)s::VARCHAR, 1)) AS imp_sen(p0, p1,"
+                    " p2, sen_counter) ORDER BY sen_counter ON CONFLICT"
+                    " (login_email) DO UPDATE SET name = excluded.name"
+                    " RETURNING users_xtra.id, users_xtra.name, users_xtra.id"
+                    " AS id__1",
+                    {
+                        "name__0": "name2_updated",
+                        "login_email__0": "user2@example.com",
+                        "lets_index_this__0": "b",
+                        "name__1": "name4",
+                        "login_email__1": "user4@example.com",
+                        "lets_index_this__1": "d",
+                    },
+                )
+            )
+        # Verify rows are returned in parameter order
+        rows = result.all()
+        eq_([row[1] for row in rows], ["name2_updated", "name4"])
+        # First should be update (same ID), second is insert (new ID)
+        eq_(rows[0][0], id2)
+        id4 = rows[1][0]
+
+        # Verify final state
+        eq_(
+            connection.execute(
+                sql.select(users_xtra.c.id, users_xtra.c.name).order_by(
+                    users_xtra.c.id
+                )
+            ).all(),
+            [
+                (id1, "name1"),
+                (id2, "name2_updated"),
+                (id3, "name3"),
+                (id4, "name4"),
+            ],
+        )
index e362a011b9c025b3f30dc534e52d866892f83cd3..be3f880b99f1429242a05b750c085f8e851ceffd 100644 (file)
@@ -1233,6 +1233,7 @@ class IMVSentinelTest(fixtures.TestBase):
         client_side_pk=False,
         autoincrement_is_sequence=False,
         connection=None,
+        expect_warnings_override=None,
     ):
         if connection:
             dialect = connection.dialect
@@ -1240,7 +1241,8 @@ class IMVSentinelTest(fixtures.TestBase):
             dialect = testing.db.dialect
 
         if (
-            sort_by_parameter_order
+            expect_warnings_override is not False
+            and sort_by_parameter_order
             and warn_for_downgrades
             and dialect.use_insertmanyvalues
         ):
@@ -1267,6 +1269,7 @@ class IMVSentinelTest(fixtures.TestBase):
                     and not server_autoincrement
                     and not client_side_pk
                 )
+                or (expect_warnings_override is True)
             ):
                 return expect_warnings(
                     "Batches were downgraded",
@@ -2883,6 +2886,122 @@ class IMVSentinelTest(fixtures.TestBase):
 
         eq_(coll(result), coll(expected_data))
 
+    @testing.variation(
+        "sentinel_type",
+        [
+            "use_pk",
+            "separate_uuid",
+            "separate_sentinel",
+        ],
+    )
+    @testing.requires.provisioned_upsert
+    def test_upsert_autoincrement_downgrades(
+        self,
+        metadata,
+        connection,
+        sort_by_parameter_order,
+        randomize_returning,
+        sentinel_type,
+        warn_for_downgrades,
+    ):
+        pk_col = Column(
+            "id", Integer, test_needs_autoincrement=True, primary_key=True
+        )
+
+        if sentinel_type.separate_uuid:
+            extra_col = Column(
+                "sent_col",
+                Uuid(),
+                default=uuid.uuid4,
+                insert_sentinel=True,
+                nullable=False,
+            )
+        elif sentinel_type.separate_sentinel:
+            extra_col = insert_sentinel("sent_col")
+        else:
+            extra_col = Column("sent_col", Integer)
+
+        t1 = Table(
+            "upsert_table",
+            metadata,
+            pk_col,
+            Column("otherid", Integer, unique=True),
+            Column("data", String(50)),
+            extra_col,
+            Column(
+                "has_server_default",
+                String(30),
+                server_default="some_server_default",
+            ),
+        )
+        metadata.create_all(connection)
+
+        result = connection.execute(
+            insert(t1).returning(
+                t1.c.id, t1.c.data, sort_by_parameter_order=True
+            ),
+            [{"otherid": 1, "data": "d1"}, {"otherid": 2, "data": "d2"}],
+        )
+
+        upsert_data = [
+            {"otherid": 1, "data": "d1 new"},
+            {"otherid": 3, "data": "d10"},
+            {"otherid": 4, "data": "d15"},
+            {"otherid": 2, "data": "d2 new"},
+            {"otherid": 5, "data": "d3"},
+        ]
+
+        fixtures.insertmanyvalues_fixture(
+            connection,
+            randomize_rows=bool(randomize_returning),
+            warn_on_downgraded=bool(warn_for_downgrades),
+        )
+
+        stmt = provision.upsert(
+            config,
+            t1,
+            (t1.c.data, t1.c.has_server_default),
+            set_lambda=lambda inserted: {
+                "data": inserted.data + " upserted",
+            },
+            sort_by_parameter_order=bool(sort_by_parameter_order),
+            index_elements=["otherid"],
+        )
+
+        with self._expect_downgrade_warnings(
+            warn_for_downgrades=warn_for_downgrades,
+            sort_by_parameter_order=sort_by_parameter_order,
+            expect_warnings_override=(
+                testing.against("mysql", "mariadb", "sqlite")
+                or (testing.against("postgresql") and not sentinel_type.use_pk)
+            ),
+        ):
+            result = connection.execute(stmt, upsert_data)
+
+        if sentinel_type.use_pk and testing.against("postgresql"):
+            expected_data = [
+                ("d1 new upserted", "some_server_default"),
+                ("d2 new upserted", "some_server_default"),
+                ("d10", "some_server_default"),
+                ("d15", "some_server_default"),
+                ("d3", "some_server_default"),
+            ]
+        else:
+            expected_data = [
+                ("d1 new upserted", "some_server_default"),
+                ("d10", "some_server_default"),
+                ("d15", "some_server_default"),
+                ("d2 new upserted", "some_server_default"),
+                ("d3", "some_server_default"),
+            ]
+
+        if sort_by_parameter_order:
+            coll = list
+        else:
+            coll = set
+
+        eq_(coll(result), coll(expected_data))
+
     def test_auto_downgraded_non_mvi_dialect(
         self,
         metadata,