--- /dev/null
+.. change::
+ :tags: bug, postgresql
+ :tickets: 13107
+
+ Fixed issue in the :ref:`engine_insertmanyvalues` feature where using
+ PostgreSQL's ``ON CONFLICT`` clause with
+ :paramref:`_dml.Insert.returning.sort_by_parameter_order` enabled would
+ generate invalid SQL when the insert used an implicit sentinel (server-side
+ autoincrement primary key). The generated SQL would incorrectly declare a
+ sentinel counter column in the ``imp_sen`` table alias without providing
+ corresponding values in the ``VALUES`` clause, leading to a
+ ``ProgrammingError`` indicating column count mismatch. The fix allows batch
+ execution mode when ``embed_values_counter`` is active, as the embedded
+ counter provides the ordering capability needed even with upsert behaviors,
+ rather than unnecessarily downgrading to row-at-a-time execution.
@upsert.for_db("mariadb")
def _upsert(
- cfg, table, returning, *, set_lambda=None, sort_by_parameter_order=False
+ cfg,
+ table,
+ returning,
+ *,
+ set_lambda=None,
+ sort_by_parameter_order=False,
+ index_elements=None,
):
from sqlalchemy.dialects.mysql import insert
@upsert.for_db("postgresql")
def _upsert(
- cfg, table, returning, *, set_lambda=None, sort_by_parameter_order=False
+ cfg,
+ table,
+ returning,
+ *,
+ set_lambda=None,
+ sort_by_parameter_order=False,
+ index_elements=None,
):
from sqlalchemy.dialects.postgresql import insert
table_pk = inspect(table).selectable
if set_lambda:
+ if index_elements is None:
+ index_elements = table_pk.primary_key
stmt = stmt.on_conflict_do_update(
- index_elements=table_pk.primary_key, set_=set_lambda(stmt.excluded)
+ index_elements=index_elements, set_=set_lambda(stmt.excluded)
)
else:
stmt = stmt.on_conflict_do_nothing()
@upsert.for_db("sqlite")
def _upsert(
- cfg, table, returning, *, set_lambda=None, sort_by_parameter_order=False
+ cfg,
+ table,
+ returning,
+ *,
+ set_lambda=None,
+ sort_by_parameter_order=False,
+ index_elements=None,
):
from sqlalchemy.dialects.sqlite import insert
elif not self.dialect.supports_multivalues_insert or (
sort_by_parameter_order
and self._result_columns
- and (imv.sentinel_columns is None or imv.includes_upsert_behaviors)
+ and (
+ imv.sentinel_columns is None
+ or (
+ imv.includes_upsert_behaviors
+ and not imv.embed_values_counter
+ )
+ )
):
# deterministic order was requested and the compiler could
# not organize sentinel columns for this dialect/statement.
- # use row at a time
+ # use row at a time. Note: if embed_values_counter is True,
+ # the counter itself provides the ordering capability we need,
+ # so we can use batch mode even with upsert behaviors.
use_row_at_a_time = True
downgraded = True
else:
@register.init
def upsert(
- cfg, table, returning, *, set_lambda=None, sort_by_parameter_order=False
+ cfg,
+ table,
+ returning,
+ *,
+ set_lambda=None,
+ sort_by_parameter_order=False,
+ index_elements=None,
):
"""return the backends insert..on conflict / on dupe etc. construct.
from sqlalchemy.testing import config
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.assertions import assert_raises
+from sqlalchemy.testing.assertions import AssertsExecutionResults
from sqlalchemy.testing.assertions import eq_
+from sqlalchemy.testing.assertsql import CursorSQL
-class OnConflictTest(fixtures.TablesTest):
+class OnConflictTest(fixtures.TablesTest, AssertsExecutionResults):
__only_on__ = ("postgresql >= 9.5",)
__backend__ = True
run_define_tables = "each"
connection.scalar(sql.select(bind_targets.c.data)),
"new updated data processed",
)
+
+ def test_on_conflict_do_update_multirow_returning_ordered(
+ self, connection
+ ):
+ """Test that ON CONFLICT works with multiple rows,
+ RETURNING, and sort_by_parameter_order=True.
+
+ This is a regression test for issue #13107 where the
+ insertmanyvalues sentinel counter was not being added
+ to the VALUES clause when on_conflict_do_update was
+ present with sort_by_parameter_order=True and the
+ primary key was autoincrement (not provided in data).
+ """
+ users_xtra = self.tables.users_xtra
+
+ stmt = insert(users_xtra)
+ stmt = stmt.on_conflict_do_update(
+ index_elements=["login_email"],
+ set_={
+ "name": stmt.excluded.name,
+ },
+ )
+
+ result = connection.execute(
+ stmt.returning(
+ users_xtra.c.id,
+ users_xtra.c.name,
+ sort_by_parameter_order=True,
+ ),
+ [
+ {
+ "name": "name1",
+ "login_email": "user1@example.com",
+ "lets_index_this": "a",
+ },
+ {
+ "name": "name2",
+ "login_email": "user2@example.com",
+ "lets_index_this": "b",
+ },
+ {
+ "name": "name3",
+ "login_email": "user3@example.com",
+ "lets_index_this": "c",
+ },
+ ],
+ )
+
+ # Verify rows are returned in parameter order (names match)
+ rows = result.all()
+ eq_([row[1] for row in rows], ["name1", "name2", "name3"])
+ # Store IDs for later verification
+ id1, id2, id3 = [row[0] for row in rows]
+
+ # Verify data was inserted
+ all_rows = connection.execute(
+ sql.select(users_xtra.c.id, users_xtra.c.name).order_by(
+ users_xtra.c.id
+ )
+ ).all()
+ eq_(all_rows, [(id1, "name1"), (id2, "name2"), (id3, "name3")])
+
+ # Now update one and insert a new one
+
+ with self.sql_execution_asserter() as asserter:
+ result = connection.execute(
+ stmt.returning(
+ users_xtra.c.id,
+ users_xtra.c.name,
+ sort_by_parameter_order=True,
+ ),
+ [
+ {
+ "name": "name2_updated",
+ "login_email": "user2@example.com",
+ "lets_index_this": "b",
+ },
+ {
+ "name": "name4",
+ "login_email": "user4@example.com",
+ "lets_index_this": "d",
+ },
+ ],
+ )
+
+ if testing.against("+psycopg"):
+ asserter.assert_(
+ CursorSQL(
+ "INSERT INTO users_xtra (name, login_email,"
+ " lets_index_this) SELECT p0::VARCHAR, p1::VARCHAR,"
+ " p2::VARCHAR FROM (VALUES (%(name__0)s::VARCHAR,"
+ " %(login_email__0)s::VARCHAR,"
+ " %(lets_index_this__0)s::VARCHAR, 0),"
+ " (%(name__1)s::VARCHAR, %(login_email__1)s::VARCHAR,"
+ " %(lets_index_this__1)s::VARCHAR, 1)) AS imp_sen(p0, p1,"
+ " p2, sen_counter) ORDER BY sen_counter ON CONFLICT"
+ " (login_email) DO UPDATE SET name = excluded.name"
+ " RETURNING users_xtra.id, users_xtra.name, users_xtra.id"
+ " AS id__1",
+ {
+ "name__0": "name2_updated",
+ "login_email__0": "user2@example.com",
+ "lets_index_this__0": "b",
+ "name__1": "name4",
+ "login_email__1": "user4@example.com",
+ "lets_index_this__1": "d",
+ },
+ )
+ )
+ # Verify rows are returned in parameter order
+ rows = result.all()
+ eq_([row[1] for row in rows], ["name2_updated", "name4"])
+ # First should be update (same ID), second is insert (new ID)
+ eq_(rows[0][0], id2)
+ id4 = rows[1][0]
+
+ # Verify final state
+ eq_(
+ connection.execute(
+ sql.select(users_xtra.c.id, users_xtra.c.name).order_by(
+ users_xtra.c.id
+ )
+ ).all(),
+ [
+ (id1, "name1"),
+ (id2, "name2_updated"),
+ (id3, "name3"),
+ (id4, "name4"),
+ ],
+ )
client_side_pk=False,
autoincrement_is_sequence=False,
connection=None,
+ expect_warnings_override=None,
):
if connection:
dialect = connection.dialect
dialect = testing.db.dialect
if (
- sort_by_parameter_order
+ expect_warnings_override is not False
+ and sort_by_parameter_order
and warn_for_downgrades
and dialect.use_insertmanyvalues
):
and not server_autoincrement
and not client_side_pk
)
+ or (expect_warnings_override is True)
):
return expect_warnings(
"Batches were downgraded",
eq_(coll(result), coll(expected_data))
+ @testing.variation(
+ "sentinel_type",
+ [
+ "use_pk",
+ "separate_uuid",
+ "separate_sentinel",
+ ],
+ )
+ @testing.requires.provisioned_upsert
+ def test_upsert_autoincrement_downgrades(
+ self,
+ metadata,
+ connection,
+ sort_by_parameter_order,
+ randomize_returning,
+ sentinel_type,
+ warn_for_downgrades,
+ ):
+ pk_col = Column(
+ "id", Integer, test_needs_autoincrement=True, primary_key=True
+ )
+
+ if sentinel_type.separate_uuid:
+ extra_col = Column(
+ "sent_col",
+ Uuid(),
+ default=uuid.uuid4,
+ insert_sentinel=True,
+ nullable=False,
+ )
+ elif sentinel_type.separate_sentinel:
+ extra_col = insert_sentinel("sent_col")
+ else:
+ extra_col = Column("sent_col", Integer)
+
+ t1 = Table(
+ "upsert_table",
+ metadata,
+ pk_col,
+ Column("otherid", Integer, unique=True),
+ Column("data", String(50)),
+ extra_col,
+ Column(
+ "has_server_default",
+ String(30),
+ server_default="some_server_default",
+ ),
+ )
+ metadata.create_all(connection)
+
+ result = connection.execute(
+ insert(t1).returning(
+ t1.c.id, t1.c.data, sort_by_parameter_order=True
+ ),
+ [{"otherid": 1, "data": "d1"}, {"otherid": 2, "data": "d2"}],
+ )
+
+ upsert_data = [
+ {"otherid": 1, "data": "d1 new"},
+ {"otherid": 3, "data": "d10"},
+ {"otherid": 4, "data": "d15"},
+ {"otherid": 2, "data": "d2 new"},
+ {"otherid": 5, "data": "d3"},
+ ]
+
+ fixtures.insertmanyvalues_fixture(
+ connection,
+ randomize_rows=bool(randomize_returning),
+ warn_on_downgraded=bool(warn_for_downgrades),
+ )
+
+ stmt = provision.upsert(
+ config,
+ t1,
+ (t1.c.data, t1.c.has_server_default),
+ set_lambda=lambda inserted: {
+ "data": inserted.data + " upserted",
+ },
+ sort_by_parameter_order=bool(sort_by_parameter_order),
+ index_elements=["otherid"],
+ )
+
+ with self._expect_downgrade_warnings(
+ warn_for_downgrades=warn_for_downgrades,
+ sort_by_parameter_order=sort_by_parameter_order,
+ expect_warnings_override=(
+ testing.against("mysql", "mariadb", "sqlite")
+ or (testing.against("postgresql") and not sentinel_type.use_pk)
+ ),
+ ):
+ result = connection.execute(stmt, upsert_data)
+
+ if sentinel_type.use_pk and testing.against("postgresql"):
+ expected_data = [
+ ("d1 new upserted", "some_server_default"),
+ ("d2 new upserted", "some_server_default"),
+ ("d10", "some_server_default"),
+ ("d15", "some_server_default"),
+ ("d3", "some_server_default"),
+ ]
+ else:
+ expected_data = [
+ ("d1 new upserted", "some_server_default"),
+ ("d10", "some_server_default"),
+ ("d15", "some_server_default"),
+ ("d2 new upserted", "some_server_default"),
+ ("d3", "some_server_default"),
+ ]
+
+ if sort_by_parameter_order:
+ coll = list
+ else:
+ coll = set
+
+ eq_(coll(result), coll(expected_data))
+
def test_auto_downgraded_non_mvi_dialect(
self,
metadata,