load_options = execution_options.get(
"_sa_orm_load_options", QueryContext.default_load_options
)
+
querycontext = QueryContext(
compile_state.from_statement_ctx,
compile_state.select_statement,
_return_defaults: bool = False
_subject_mapper: Optional[Mapper[Any]] = None
_autoflush: bool = True
+ _populate_existing: bool = False
select_statement: Optional[FromStatement] = None
execution_options,
) = BulkORMInsert.default_insert_options.from_execution_options(
"_sa_orm_insert_options",
- {"dml_strategy", "autoflush"},
+ {"dml_strategy", "autoflush", "populate_existing"},
execution_options,
statement._execution_options,
)
if not bool(statement._returning):
return result
+ if insert_options._populate_existing:
+ load_options = execution_options.get(
+ "_sa_orm_load_options", QueryContext.default_load_options
+ )
+ load_options += {"_populate_existing": True}
+ execution_options = execution_options.union(
+ {"_sa_orm_load_options": load_options}
+ )
+
return cls._return_orm_returning(
session,
statement,
else:
eq_(result.first(), (10, expected_qs[0]))
+ @testing.variation("populate_existing", [True, False])
+ @testing.requires.provisioned_upsert
+ def test_upsert_populate_existing(self, decl_base, populate_existing):
+ """test #9742"""
+
+ class Employee(ComparableEntity, decl_base):
+ __tablename__ = "employee"
+
+ uuid: Mapped[uuid.UUID] = mapped_column(primary_key=True)
+ user_name: Mapped[str] = mapped_column(nullable=False)
+
+ decl_base.metadata.create_all(testing.db)
+ s = fixture_session()
+
+ uuid1 = uuid.uuid4()
+ uuid2 = uuid.uuid4()
+ e1 = Employee(uuid=uuid1, user_name="e1 old name")
+ e2 = Employee(uuid=uuid2, user_name="e2 old name")
+ s.add_all([e1, e2])
+ s.flush()
+
+ stmt = provision.upsert(
+ config,
+ Employee,
+ (Employee,),
+ set_lambda=lambda inserted: {"user_name": inserted.user_name},
+ ).values(
+ [
+ dict(uuid=uuid1, user_name="e1 new name"),
+ dict(uuid=uuid2, user_name="e2 new name"),
+ ]
+ )
+ if populate_existing:
+ rows = s.scalars(
+ stmt, execution_options={"populate_existing": True}
+ )
+ # SPECIAL: before we actually receive the returning rows,
+ # the existing objects have not been updated yet
+ eq_(e1.user_name, "e1 old name")
+ eq_(e2.user_name, "e2 old name")
+
+ eq_(
+ set(rows),
+ {
+ Employee(uuid=uuid1, user_name="e1 new name"),
+ Employee(uuid=uuid2, user_name="e2 new name"),
+ },
+ )
+
+ # now they are updated
+ eq_(e1.user_name, "e1 new name")
+ eq_(e2.user_name, "e2 new name")
+ else:
+ # no populate existing
+ rows = s.scalars(stmt)
+ eq_(e1.user_name, "e1 old name")
+ eq_(e2.user_name, "e2 old name")
+ eq_(
+ set(rows),
+ {
+ Employee(uuid=uuid1, user_name="e1 old name"),
+ Employee(uuid=uuid2, user_name="e2 old name"),
+ },
+ )
+ eq_(e1.user_name, "e1 old name")
+ eq_(e2.user_name, "e2 old name")
+ s.commit()
+ s.expire_all()
+ eq_(e1.user_name, "e1 new name")
+ eq_(e2.user_name, "e2 new name")
+
class UpdateStmtTest(fixtures.TestBase):
__backend__ = True