]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
include pk cols in refresh() if relationships are requested
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 16 Dec 2022 20:44:32 +0000 (15:44 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sun, 18 Dec 2022 15:19:52 +0000 (10:19 -0500)
A series of changes and improvements regarding
:meth:`_orm.Session.refresh`. The overall change is that primary key
attributes for an object are now included in a refresh operation
unconditionally when relationship-bound attributes are to be refreshed,
even if not expired and even if not specified in the refresh.

* Improved :meth:`_orm.Session.refresh` so that if autoflush is enabled
(as is the default for :class:`_orm.Session`), the autoflush takes place
at an earlier part of the refresh process so that pending primary key
changes are applied without errors being raised.  Previously, this
autoflush took place too late in the process and the SELECT statement
would not use the correct key to locate the row and an
:class:`.InvalidRequestError` would be raised.

* When the above condition is present, that is, unflushed primary key
changes are present on the object, but autoflush is not enabled,
the refresh() method now explicitly disallows the operation to proceed,
and an informative :class:`.InvalidRequestError` is raised asking that
the pending primary key changes be flushed first.  Previously,
this use case was simply broken and :class:`.InvalidRequestError`
would be raised anyway. This restriction is so that it's safe for the
primary key attributes to be refreshed, as is necessary for the case of
being able to refresh the object with relationship-bound secondary
eagerloaders also being emitted. This rule applies in all cases to keep
API behavior consistent regardless of whether or not the PK cols are
actually needed in the refresh, as it is unusual to be refreshing
some attributes on an object while keeping other attributes "pending"
in any case.

* The :meth:`_orm.Session.refresh` method has been enhanced such that
attributes which are :func:`_orm.relationship`-bound and linked to an
eager loader, either at mapping time or via last-used loader options,
will be refreshed in all cases even when a list of attributes is passed
that does not include any columns on the parent row. This builds upon the
feature first implemented for non-column attributes as part of
:ticket:`1763` fixed in 1.4 allowing eagerly-loaded relationship-bound
attributes to participate in the :meth:`_orm.Session.refresh` operation.
If the refresh operation does not indicate any columns on the parent row
to be refreshed, the primary key columns will nonetheless be included
in the refresh operation, which allows the load to proceed into the
secondary relationship loaders indicated as it does normally.
Previously an :class:`.InvalidRequestError` error would be raised
for this condition (:ticket:`8703`)

* Fixed issue where an unnecessary additional SELECT would be emitted in
the case where :meth:`_orm.Session.refresh` were called with a
combination of expired attributes, as well as an eager loader such as
:func:`_orm.selectinload` that emits a "secondary" query, if the primary
key attributes were also in an expired state.  As the primary key
attributes are now included in the refresh automatically, there is no
additional load for these attributes when a relationship loader
goes to select for them (:ticket:`8997`)

* Fixed regression caused by :ticket:`8126` released in 2.0.0b1 where the
:meth:`_orm.Session.refresh` method would fail with an
``AttributeError``, if passed both an expired column name as well as the
name of a relationship-bound attribute that was linked to a "secondary"
eagerloader such as the :func:`_orm.selectinload` eager loader
(:ticket:`8996`)

Fixes: #8703
Fixes: #8996
Fixes: #8997
Fixes: #8126
Change-Id: I88dcbc0a9a8337f6af0bc4bcc5b0261819acd1c4

doc/build/changelog/unreleased_20/8703.rst [new file with mode: 0644]
lib/sqlalchemy/orm/context.py
lib/sqlalchemy/orm/loading.py
lib/sqlalchemy/orm/session.py
lib/sqlalchemy/orm/strategies.py
test/orm/inheritance/test_basic.py
test/orm/test_expire.py

diff --git a/doc/build/changelog/unreleased_20/8703.rst b/doc/build/changelog/unreleased_20/8703.rst
new file mode 100644 (file)
index 0000000..78b29e5
--- /dev/null
@@ -0,0 +1,63 @@
+.. change::
+    :tags: bug, orm
+    :tickets: 8703, 8997, 8996
+
+    A series of changes and improvements regarding
+    :meth:`_orm.Session.refresh`. The overall change is that primary key
+    attributes for an object are now included in a refresh operation
+    unconditionally when relationship-bound attributes are to be refreshed,
+    even if not expired and even if not specified in the refresh.
+
+    * Improved :meth:`_orm.Session.refresh` so that if autoflush is enabled
+      (as is the default for :class:`_orm.Session`), the autoflush takes place
+      at an earlier part of the refresh process so that pending primary key
+      changes are applied without errors being raised.  Previously, this
+      autoflush took place too late in the process and the SELECT statement
+      would not use the correct key to locate the row and an
+      :class:`.InvalidRequestError` would be raised.
+
+    * When the above condition is present, that is, unflushed primary key
+      changes are present on the object, but autoflush is not enabled,
+      the refresh() method now explicitly disallows the operation to proceed,
+      and an informative :class:`.InvalidRequestError` is raised asking that
+      the pending primary key changes be flushed first.  Previously,
+      this use case was simply broken and :class:`.InvalidRequestError`
+      would be raised anyway. This restriction is so that it's safe for the
+      primary key attributes to be refreshed, as is necessary for the case of
+      being able to refresh the object with relationship-bound secondary
+      eagerloaders also being emitted. This rule applies in all cases to keep
+      API behavior consistent regardless of whether or not the PK cols are
+      actually needed in the refresh, as it is unusual to be refreshing
+      some attributes on an object while keeping other attributes "pending"
+      in any case.
+
+    * The :meth:`_orm.Session.refresh` method has been enhanced such that
+      attributes which are :func:`_orm.relationship`-bound and linked to an
+      eager loader, either at mapping time or via last-used loader options,
+      will be refreshed in all cases even when a list of attributes is passed
+      that does not include any columns on the parent row. This builds upon the
+      feature first implemented for non-column attributes as part of
+      :ticket:`1763` fixed in 1.4 allowing eagerly-loaded relationship-bound
+      attributes to participate in the :meth:`_orm.Session.refresh` operation.
+      If the refresh operation does not indicate any columns on the parent row
+      to be refreshed, the primary key columns will nonetheless be included
+      in the refresh operation, which allows the load to proceed into the
+      secondary relationship loaders indicated as it does normally.
+      Previously an :class:`.InvalidRequestError` error would be raised
+      for this condition (:ticket:`8703`)
+
+    * Fixed issue where an unnecessary additional SELECT would be emitted in
+      the case where :meth:`_orm.Session.refresh` were called with a
+      combination of expired attributes, as well as an eager loader such as
+      :func:`_orm.selectinload` that emits a "secondary" query, if the primary
+      key attributes were also in an expired state.  As the primary key
+      attributes are now included in the refresh automatically, there is no
+      additional load for these attributes when a relationship loader
+      goes to select for them (:ticket:`8997`)
+
+    * Fixed regression caused by :ticket:`8126` released in 2.0.0b1 where the
+      :meth:`_orm.Session.refresh` method would fail with an
+      ``AttributeError``, if passed both an expired column name as well as the
+      name of a relationship-bound attribute that was linked to a "secondary"
+      eagerloader such as the :func:`_orm.selectinload` eager loader
+      (:ticket:`8996`)
index b3c8e78b337630b0578d0132135059760139e26d..3bd8b02a71be5bc5909f84e0a88acbb41a1facc6 100644 (file)
@@ -1188,15 +1188,11 @@ class ORMSelectCompileState(ORMCompileState, SelectState):
 
         if not self.primary_columns:
             if self.compile_options._only_load_props:
-                raise sa_exc.InvalidRequestError(
-                    "No column-based properties specified for "
-                    "refresh operation. Use session.expire() "
-                    "to reload collections and related items."
-                )
-            else:
-                raise sa_exc.InvalidRequestError(
-                    "Query contains no columns with which to SELECT from."
-                )
+                assert False, "no columns were included in _only_load_props"
+
+            raise sa_exc.InvalidRequestError(
+                "Query contains no columns with which to SELECT from."
+            )
 
         if not self.from_clauses:
             self.from_clauses = list(self._fallback_from_clauses)
index edfa61287fa793065e4bf5ae81d23e2569a048c1..6e7695f8610a2f08b358cf916b939e3d3d75dce2 100644 (file)
@@ -40,12 +40,12 @@ from .context import FromStatement
 from .util import _none_set
 from .util import state_str
 from .. import exc as sa_exc
-from .. import future
 from .. import util
 from ..engine import result_tuple
 from ..engine.result import ChunkedIteratorResult
 from ..engine.result import FrozenResult
 from ..engine.result import SimpleResultMetaData
+from ..sql import select
 from ..sql import util as sql_util
 from ..sql.selectable import ForUpdateArg
 from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
@@ -468,6 +468,7 @@ def load_on_ident(
     no_autoflush: bool = False,
     bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
     execution_options: _ExecuteOptions = util.EMPTY_DICT,
+    require_pk_cols: bool = False,
 ):
     """Load the given identity key from the database."""
     if key is not None:
@@ -488,6 +489,7 @@ def load_on_ident(
         no_autoflush=no_autoflush,
         bind_arguments=bind_arguments,
         execution_options=execution_options,
+        require_pk_cols=require_pk_cols,
     )
 
 
@@ -504,6 +506,7 @@ def load_on_pk_identity(
     no_autoflush: bool = False,
     bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
     execution_options: _ExecuteOptions = util.EMPTY_DICT,
+    require_pk_cols: bool = False,
 ):
 
     """Load the given primary key identity from the database."""
@@ -572,6 +575,71 @@ def load_on_pk_identity(
     else:
         version_check = False
 
+    if require_pk_cols and only_load_props:
+        if not refresh_state:
+            raise sa_exc.ArgumentError(
+                "refresh_state is required when require_pk_cols is present"
+            )
+
+        refresh_state_prokeys = refresh_state.mapper._primary_key_propkeys
+        has_changes = {
+            key
+            for key in refresh_state_prokeys.difference(only_load_props)
+            if refresh_state.attrs[key].history.has_changes()
+        }
+        if has_changes:
+            # raise if pending pk changes are present.
+            # technically, this could be limited to the case where we have
+            # relationships in the only_load_props collection to be refreshed
+            # also (and only ones that have a secondary eager loader, at that).
+            # however, the error is in place across the board so that behavior
+            # here is easier to predict.   The use case it prevents is one
+            # of mutating PK attrs, leaving them unflushed,
+            # calling session.refresh(), and expecting those attrs to remain
+            # still unflushed.   It seems likely someone doing all those
+            # things would be better off having the PK attributes flushed
+            # to the database before tinkering like that (session.refresh() is
+            # tinkering).
+            raise sa_exc.InvalidRequestError(
+                f"Please flush pending primary key changes on "
+                "attributes "
+                f"{has_changes} for mapper {refresh_state.mapper} before "
+                "proceeding with a refresh"
+            )
+
+        # overall, the ORM has no internal flow right now for "dont load the
+        # primary row of an object at all, but fire off
+        # selectinload/subqueryload/immediateload for some relationships".
+        # It would probably be a pretty big effort to add such a flow.  So
+        # here, the case for #8703 is introduced; user asks to refresh some
+        # relationship attributes only which are
+        # selectinload/subqueryload/immediateload/ etc. (not joinedload).
+        # ORM complains there's no columns in the primary row to load.
+        # So here, we just add the PK cols if that
+        # case is detected, so that there is a SELECT emitted for the primary
+        # row.
+        #
+        # Let's just state right up front, for this one little case,
+        # the ORM here is adding a whole extra SELECT just to satisfy
+        # limitations in the internal flow.  This is really not a thing
+        # SQLAlchemy finds itself doing like, ever, obviously, we are
+        # constantly working to *remove* SELECTs we don't need.   We
+        # rationalize this for now based on 1. session.refresh() is not
+        # commonly used 2. session.refresh() with only relationship attrs is
+        # even less commonly used 3. the SELECT in question is very low
+        # latency.
+        #
+        # to add the flow to not include the SELECT, the quickest way
+        # might be to just manufacture a single-row result set to send off to
+        # instances(), but we'd have to weave that into context.py and all
+        # that.  For 2.0.0, we have enough big changes to navigate for now.
+        #
+        mp = refresh_state.mapper._props
+        for p in only_load_props:
+            if mp[p]._is_relationship:
+                only_load_props = refresh_state_prokeys.union(only_load_props)
+                break
+
     if refresh_state and refresh_state.load_options:
         compile_options += {"_current_path": refresh_state.load_path.parent}
         q = q.options(*refresh_state.load_options)
@@ -584,6 +652,7 @@ def load_on_pk_identity(
         refresh_state=refresh_state,
         identity_token=identity_token,
     )
+
     q._compile_options = new_compile_options
     q._order_by = None
 
@@ -1456,10 +1525,6 @@ def load_scalar_attributes(mapper, state, attribute_names, passive):
             "attribute refresh operation cannot proceed" % (state_str(state))
         )
 
-    has_key = bool(state.key)
-
-    result = False
-
     no_autoflush = bool(passive & attributes.NO_AUTOFLUSH)
 
     # in the case of inheritance, particularly concrete and abstract
@@ -1472,16 +1537,17 @@ def load_scalar_attributes(mapper, state, attribute_names, passive):
         attribute_names = attribute_names.intersection(mapper.attrs.keys())
 
     if mapper.inherits and not mapper.concrete:
+        # load based on committed attributes in the object, formed into
+        # a truncated SELECT that only includes relevant tables.  does not
+        # currently use state.key
         statement = mapper._optimized_get_statement(state, attribute_names)
         if statement is not None:
 
-            from .query import FromStatement
-
             # undefer() isn't needed here because statement has the
             # columns needed already, this implicitly undefers that column
             stmt = FromStatement(mapper, statement)
 
-            result = load_on_ident(
+            return load_on_ident(
                 session,
                 stmt,
                 None,
@@ -1490,46 +1556,46 @@ def load_scalar_attributes(mapper, state, attribute_names, passive):
                 no_autoflush=no_autoflush,
             )
 
-    if result is False:
-        if has_key:
-            identity_key = state.key
-        else:
-            # this codepath is rare - only valid when inside a flush, and the
-            # object is becoming persistent but hasn't yet been assigned
-            # an identity_key.
-            # check here to ensure we have the attrs we need.
-            pk_attrs = [
-                mapper._columntoproperty[col].key for col in mapper.primary_key
-            ]
-            if state.expired_attributes.intersection(pk_attrs):
-                raise sa_exc.InvalidRequestError(
-                    "Instance %s cannot be refreshed - it's not "
-                    " persistent and does not "
-                    "contain a full primary key." % state_str(state)
-                )
-            identity_key = mapper._identity_key_from_state(state)
-
-        if (
-            _none_set.issubset(identity_key) and not mapper.allow_partial_pks
-        ) or _none_set.issuperset(identity_key):
-            util.warn_limited(
-                "Instance %s to be refreshed doesn't "
-                "contain a full primary key - can't be refreshed "
-                "(and shouldn't be expired, either).",
-                state_str(state),
+    # normal load, use state.key as the identity to SELECT
+    has_key = bool(state.key)
+
+    if has_key:
+        identity_key = state.key
+    else:
+        # this codepath is rare - only valid when inside a flush, and the
+        # object is becoming persistent but hasn't yet been assigned
+        # an identity_key.
+        # check here to ensure we have the attrs we need.
+        pk_attrs = [
+            mapper._columntoproperty[col].key for col in mapper.primary_key
+        ]
+        if state.expired_attributes.intersection(pk_attrs):
+            raise sa_exc.InvalidRequestError(
+                "Instance %s cannot be refreshed - it's not "
+                " persistent and does not "
+                "contain a full primary key." % state_str(state)
             )
-            return
+        identity_key = mapper._identity_key_from_state(state)
 
-        result = load_on_ident(
-            session,
-            future.select(mapper).set_label_style(
-                LABEL_STYLE_TABLENAME_PLUS_COL
-            ),
-            identity_key,
-            refresh_state=state,
-            only_load_props=attribute_names,
-            no_autoflush=no_autoflush,
+    if (
+        _none_set.issubset(identity_key) and not mapper.allow_partial_pks
+    ) or _none_set.issuperset(identity_key):
+        util.warn_limited(
+            "Instance %s to be refreshed doesn't "
+            "contain a full primary key - can't be refreshed "
+            "(and shouldn't be expired, either).",
+            state_str(state),
         )
+        return
+
+    result = load_on_ident(
+        session,
+        select(mapper).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL),
+        identity_key,
+        refresh_state=state,
+        only_load_props=attribute_names,
+        no_autoflush=no_autoflush,
+    )
 
     # if instance is pending, a refresh operation
     # may not complete (even if PK attributes are assigned)
index 1f7bd6d73af8044a4d118244c7a8995f2be7b055..bf3df05990699e5d8181227dc041b8e5a43e88b0 100644 (file)
@@ -2817,6 +2817,14 @@ class Session(_SessionClassMethods, EventTarget):
 
         self._expire_state(state, attribute_names)
 
+        # this autoflush previously used to occur as a secondary effect
+        # of the load_on_ident below.   Meaning we'd organize the SELECT
+        # based on current DB pks, then flush, then if pks changed in that
+        # flush, crash.  this was unticketed but discovered as part of
+        # #8703.  So here, autoflush up front, dont autoflush inside
+        # load_on_ident.
+        self._autoflush()
+
         if with_for_update == {}:
             raise sa_exc.ArgumentError(
                 "with_for_update should be the boolean value "
@@ -2835,6 +2843,11 @@ class Session(_SessionClassMethods, EventTarget):
                 refresh_state=state,
                 with_for_update=with_for_update,
                 only_load_props=attribute_names,
+                require_pk_cols=True,
+                # technically unnecessary as we just did autoflush
+                # above, however removes the additional unnecessary
+                # call to _autoflush()
+                no_autoflush=True,
             )
             is None
         ):
index 59031171da7a8f7ff281900b9f03c3496da66380..354c65828f74737f0bca4911ee27d9905bb67d0e 100644 (file)
@@ -32,6 +32,7 @@ from . import util as orm_util
 from .base import _DEFER_FOR_STATE
 from .base import _RAISE_FOR_STATE
 from .base import _SET_DEFERRED_EXPIRED
+from .base import ATTR_WAS_SET
 from .base import LoaderCallableStatus
 from .base import PASSIVE_OFF
 from .base import PassiveFlag
@@ -1422,7 +1423,10 @@ class ImmediateLoader(PostLoader):
                     alternate_effective_path=alternate_effective_path,
                     execution_options=execution_options,
                 )
-                state.get_impl(key).set_committed_value(state, dict_, value)
+                if value is not ATTR_WAS_SET:
+                    state.get_impl(key).set_committed_value(
+                        state, dict_, value
+                    )
 
 
 @log.class_logger
index 5803d51bc087cc52bcbcfc0435463a5131bfd222..d231bba63492e0102b2b1470ce2738dee9ea1354 100644 (file)
@@ -1902,6 +1902,41 @@ class OptimizedGetOnDeferredTest(fixtures.MappedTest):
             )
         )
 
+    def test_refresh_column(self):
+        """refresh currently does not use the mapper "optimized get".
+
+        This could be added later by generalizing the code in
+        loading.py->load_scalar_attributes() to be used by session.refresh().
+
+        For #8703, where we are revisiting some of this logic for 2.0.0,
+        not doing this yet as enough is changing in 2.0 already.
+
+        """
+        A, B = self.classes("A", "B")
+        sess = fixture_session()
+        b1 = B(data="x")
+        sess.add(b1)
+        sess.flush()
+        pk = b1.id
+        sess.expire(b1, ["data"])
+
+        with self.sql_execution_asserter(testing.db) as asserter:
+            sess.refresh(b1, ["data"])
+
+        asserter.assert_(
+            CompiledSQL(
+                # full statement that has a JOIN in it.  Note that
+                # a.id is not included in the SELECT list
+                "SELECT b.data FROM a JOIN b ON a.id = b.id "
+                "WHERE a.id = :pk_1",
+                [{"pk_1": pk}]
+                # if we used load_scalar_attributes(), it would look like
+                # this
+                # "SELECT b.data AS b_data FROM b WHERE :param_1 = b.id",
+                # [{"param_1": b_id}],
+            )
+        )
+
     def test_load_from_unloaded_subclass(self):
         A, B = self.classes("A", "B")
         sess = fixture_session()
index f851f3698c07e0677a539e4bfefed9317a42e8ad..d2d3d3aa4dd5df18a037bd2d1d9de5933e90dacf 100644 (file)
@@ -1,5 +1,7 @@
 """Attribute/instance expiration, deferral of attributes, etc."""
 
+import re
+
 import sqlalchemy as sa
 from sqlalchemy import exc as sa_exc
 from sqlalchemy import FetchedValue
@@ -13,12 +15,15 @@ from sqlalchemy.orm import contains_eager
 from sqlalchemy.orm import defer
 from sqlalchemy.orm import deferred
 from sqlalchemy.orm import exc as orm_exc
+from sqlalchemy.orm import immediateload
 from sqlalchemy.orm import joinedload
 from sqlalchemy.orm import lazyload
 from sqlalchemy.orm import make_transient_to_detached
 from sqlalchemy.orm import relationship
+from sqlalchemy.orm import selectinload
 from sqlalchemy.orm import Session
 from sqlalchemy.orm import strategies
+from sqlalchemy.orm import subqueryload
 from sqlalchemy.orm import undefer
 from sqlalchemy.sql import select
 from sqlalchemy.testing import assert_raises
@@ -298,39 +303,6 @@ class ExpireTest(_fixtures.FixtureTest):
             ],
         )
 
-    def test_refresh_collection_exception(self):
-        """test graceful failure for currently unsupported
-        immediate refresh of a collection"""
-
-        users, Address, addresses, User = (
-            self.tables.users,
-            self.classes.Address,
-            self.tables.addresses,
-            self.classes.User,
-        )
-
-        self.mapper_registry.map_imperatively(
-            User,
-            users,
-            properties={
-                "addresses": relationship(
-                    Address, order_by=addresses.c.email_address
-                )
-            },
-        )
-        self.mapper_registry.map_imperatively(Address, addresses)
-        s = fixture_session(
-            autoflush=True,
-        )
-        u = s.get(User, 8)
-        assert_raises_message(
-            sa_exc.InvalidRequestError,
-            "properties specified for refresh",
-            s.refresh,
-            u,
-            ["addresses"],
-        )
-
     def test_refresh_cancels_expire(self):
         users, User = self.tables.users, self.classes.User
 
@@ -987,6 +959,213 @@ class ExpireTest(_fixtures.FixtureTest):
 
         self.assert_sql_count(testing.db, go, 1)
 
+    @testing.combinations(
+        "selectin", "joined", "subquery", "immediate", argnames="lazy"
+    )
+    @testing.variation(
+        "as_option",
+        [True, False],
+    )
+    @testing.variation(
+        "expire_first", [True, False, "not_pk", "not_pk_plus_pending"]
+    )
+    @testing.variation("include_column", [True, False, "no_attrs"])
+    @testing.variation("autoflush", [True, False])
+    def test_load_only_relationships(
+        self, lazy, expire_first, include_column, as_option, autoflush
+    ):
+        """test #8703, #8997 as well as a regression for #8996"""
+
+        users, Address, addresses, User = (
+            self.tables.users,
+            self.classes.Address,
+            self.tables.addresses,
+            self.classes.User,
+        )
+
+        if expire_first.not_pk_plus_pending:
+            # if the test will be flushing a pk change, choose the user_id
+            # that has no address rows so that we dont get an FK violation.
+            # test only looks for the presence of "addresses" collection,
+            # not the contents
+            target_id = 10
+            target_name = "chuck"
+        else:
+            # for all the other cases use user_id 8 where the addresses
+            # collection has some elements.  this could theoretically catch
+            # any odd per-row issues with the collection load
+            target_id = 8
+            target_name = "ed"
+
+        self.mapper_registry.map_imperatively(
+            User,
+            users,
+            properties={
+                "addresses": relationship(
+                    Address,
+                    backref="user",
+                    lazy=lazy if not as_option else "select",
+                )
+            },
+        )
+        self.mapper_registry.map_imperatively(Address, addresses)
+        sess = fixture_session(autoflush=bool(autoflush))
+        if as_option:
+            fn = {
+                "joined": joinedload,
+                "selectin": selectinload,
+                "subquery": subqueryload,
+                "immediate": immediateload,
+            }[lazy]
+
+        u = sess.get(
+            User,
+            target_id,
+            options=([fn(User.addresses)] if as_option else []),
+        )
+
+        if expire_first.not_pk_plus_pending:
+            u.id = 25
+            sess.expire(u, ["name", "addresses"])
+
+            assert "addresses" not in u.__dict__
+            assert "name" not in u.__dict__
+            name_is_expired = True
+        elif expire_first.not_pk:
+            sess.expire(u, ["name", "addresses"])
+            assert "id" in u.__dict__
+            assert "addresses" not in u.__dict__
+            assert "name" not in u.__dict__
+            name_is_expired = True
+        elif expire_first:
+            sess.expire(u)
+            assert "id" not in u.__dict__
+            assert "addresses" not in u.__dict__
+            assert "name" not in u.__dict__
+            name_is_expired = True
+        else:
+            name_is_expired = False
+
+        if (
+            expire_first.not_pk_plus_pending
+            and not autoflush
+            and not include_column.no_attrs
+        ):
+            with expect_raises_message(
+                sa_exc.InvalidRequestError,
+                re.escape(
+                    "Please flush pending primary key changes on attributes "
+                    "{'id'} for mapper Mapper[User(users)] before proceeding "
+                    "with a refresh"
+                ),
+            ):
+                if include_column:
+                    sess.refresh(u, ["name", "addresses"])
+                else:
+                    sess.refresh(u, ["addresses"])
+
+            return
+
+        with self.sql_execution_asserter(testing.db) as asserter:
+            if include_column.no_attrs:
+                sess.refresh(u)
+                name_is_expired = False
+                id_was_refreshed = True
+            elif include_column:
+                sess.refresh(u, ["name", "addresses"])
+                name_is_expired = False
+                id_was_refreshed = False
+            else:
+                sess.refresh(u, ["addresses"])
+                id_was_refreshed = False
+
+        expected_count = 2 if lazy != "joined" else 1
+        if (
+            autoflush
+            and expire_first.not_pk_plus_pending
+            and not id_was_refreshed
+        ):
+            expected_count += 1
+
+        asserter.assert_(CountStatements(expected_count))
+
+        # pk there for all cases
+        assert "id" in u.__dict__
+
+        if name_is_expired:
+            assert "name" not in u.__dict__
+        else:
+            assert "name" in u.__dict__
+
+        assert "addresses" in u.__dict__
+        u.addresses
+        assert "addresses" in u.__dict__
+        if include_column:
+            eq_(u.__dict__["name"], target_name)
+
+        if expire_first.not_pk_plus_pending and not id_was_refreshed:
+            eq_(u.__dict__["id"], 25)
+        else:
+            eq_(u.__dict__["id"], target_id)
+
+    @testing.variation("expire_first", [True, False])
+    @testing.variation("autoflush", [True, False])
+    @testing.variation("ensure_name_cleared", [True, False])
+    def test_no_pending_pks_on_refresh(
+        self, expire_first, autoflush, ensure_name_cleared
+    ):
+        users = self.tables.users
+        User = self.classes.User
+
+        self.mapper_registry.map_imperatively(
+            User,
+            users,
+        )
+        sess = fixture_session(autoflush=bool(autoflush))
+
+        u = sess.get(User, 10)
+
+        u.id = 25
+
+        if ensure_name_cleared:
+            u.name = "newname"
+
+        if expire_first:
+            sess.expire(u, ["name"])
+
+        if ensure_name_cleared and not expire_first:
+            eq_(inspect(u).attrs.name.history, (["newname"], (), ["chuck"]))
+
+        if not autoflush:
+            with expect_raises_message(
+                sa_exc.InvalidRequestError,
+                re.escape(
+                    "Please flush pending primary key changes on attributes "
+                    "{'id'} for mapper Mapper[User(users)] before proceeding "
+                    "with a refresh"
+                ),
+            ):
+
+                sess.refresh(u, ["name"])
+
+            # id was not expired
+            eq_(inspect(u).attrs.id.history, ([25], (), [10]))
+
+            # name was expired
+            eq_(inspect(u).attrs.name.history, ((), (), ()))
+
+        else:
+            sess.refresh(u, ["name"])
+
+            # new id value stayed
+            eq_(u.__dict__["id"], 25)
+
+            # was autoflushed
+            eq_(inspect(u).attrs.id.history, ((), [25], ()))
+
+            # new value for "name" is lost, value was refreshed
+            eq_(inspect(u).attrs.name.history, ((), ["chuck"], ()))
+
     def test_expire_synonym(self):
         User, users = self.classes.User, self.tables.users