--- /dev/null
+.. change::
+ :tags: bug, orm
+ :tickets: 8703, 8997, 8996
+
+ A series of changes and improvements regarding
+ :meth:`_orm.Session.refresh`. The overall change is that primary key
+ attributes for an object are now included in a refresh operation
+ unconditionally when relationship-bound attributes are to be refreshed,
+ even if not expired and even if not specified in the refresh.
+
+ * Improved :meth:`_orm.Session.refresh` so that if autoflush is enabled
+ (as is the default for :class:`_orm.Session`), the autoflush takes place
+ at an earlier part of the refresh process so that pending primary key
+ changes are applied without errors being raised. Previously, this
+ autoflush took place too late in the process and the SELECT statement
+ would not use the correct key to locate the row and an
+ :class:`.InvalidRequestError` would be raised.
+
+ * When the above condition is present, that is, unflushed primary key
+ changes are present on the object, but autoflush is not enabled,
+ the refresh() method now explicitly disallows the operation to proceed,
+ and an informative :class:`.InvalidRequestError` is raised asking that
+ the pending primary key changes be flushed first. Previously,
+ this use case was simply broken and :class:`.InvalidRequestError`
+ would be raised anyway. This restriction is so that it's safe for the
+ primary key attributes to be refreshed, as is necessary for the case of
+ being able to refresh the object with relationship-bound secondary
+ eagerloaders also being emitted. This rule applies in all cases to keep
+ API behavior consistent regardless of whether or not the PK cols are
+ actually needed in the refresh, as it is unusual to be refreshing
+ some attributes on an object while keeping other attributes "pending"
+ in any case.
+
+ * The :meth:`_orm.Session.refresh` method has been enhanced such that
+ attributes which are :func:`_orm.relationship`-bound and linked to an
+ eager loader, either at mapping time or via last-used loader options,
+ will be refreshed in all cases even when a list of attributes is passed
+ that does not include any columns on the parent row. This builds upon the
+ feature first implemented for non-column attributes as part of
+ :ticket:`1763` fixed in 1.4 allowing eagerly-loaded relationship-bound
+ attributes to participate in the :meth:`_orm.Session.refresh` operation.
+ If the refresh operation does not indicate any columns on the parent row
+ to be refreshed, the primary key columns will nonetheless be included
+ in the refresh operation, which allows the load to proceed into the
+ secondary relationship loaders indicated as it does normally.
+ Previously an :class:`.InvalidRequestError` error would be raised
+ for this condition (:ticket:`8703`)
+
+ * Fixed issue where an unnecessary additional SELECT would be emitted in
+ the case where :meth:`_orm.Session.refresh` were called with a
+ combination of expired attributes, as well as an eager loader such as
+ :func:`_orm.selectinload` that emits a "secondary" query, if the primary
+ key attributes were also in an expired state. As the primary key
+ attributes are now included in the refresh automatically, there is no
+ additional load for these attributes when a relationship loader
+ goes to select for them (:ticket:`8997`)
+
+ * Fixed regression caused by :ticket:`8126` released in 2.0.0b1 where the
+ :meth:`_orm.Session.refresh` method would fail with an
+ ``AttributeError``, if passed both an expired column name as well as the
+ name of a relationship-bound attribute that was linked to a "secondary"
+ eagerloader such as the :func:`_orm.selectinload` eager loader
+ (:ticket:`8996`)
if not self.primary_columns:
if self.compile_options._only_load_props:
- raise sa_exc.InvalidRequestError(
- "No column-based properties specified for "
- "refresh operation. Use session.expire() "
- "to reload collections and related items."
- )
- else:
- raise sa_exc.InvalidRequestError(
- "Query contains no columns with which to SELECT from."
- )
+ assert False, "no columns were included in _only_load_props"
+
+ raise sa_exc.InvalidRequestError(
+ "Query contains no columns with which to SELECT from."
+ )
if not self.from_clauses:
self.from_clauses = list(self._fallback_from_clauses)
from .util import _none_set
from .util import state_str
from .. import exc as sa_exc
-from .. import future
from .. import util
from ..engine import result_tuple
from ..engine.result import ChunkedIteratorResult
from ..engine.result import FrozenResult
from ..engine.result import SimpleResultMetaData
+from ..sql import select
from ..sql import util as sql_util
from ..sql.selectable import ForUpdateArg
from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
no_autoflush: bool = False,
bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
execution_options: _ExecuteOptions = util.EMPTY_DICT,
+ require_pk_cols: bool = False,
):
"""Load the given identity key from the database."""
if key is not None:
no_autoflush=no_autoflush,
bind_arguments=bind_arguments,
execution_options=execution_options,
+ require_pk_cols=require_pk_cols,
)
no_autoflush: bool = False,
bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
execution_options: _ExecuteOptions = util.EMPTY_DICT,
+ require_pk_cols: bool = False,
):
"""Load the given primary key identity from the database."""
else:
version_check = False
+ if require_pk_cols and only_load_props:
+ if not refresh_state:
+ raise sa_exc.ArgumentError(
+ "refresh_state is required when require_pk_cols is present"
+ )
+
+ refresh_state_prokeys = refresh_state.mapper._primary_key_propkeys
+ has_changes = {
+ key
+ for key in refresh_state_prokeys.difference(only_load_props)
+ if refresh_state.attrs[key].history.has_changes()
+ }
+ if has_changes:
+ # raise if pending pk changes are present.
+ # technically, this could be limited to the case where we have
+ # relationships in the only_load_props collection to be refreshed
+ # also (and only ones that have a secondary eager loader, at that).
+ # however, the error is in place across the board so that behavior
+ # here is easier to predict. The use case it prevents is one
+ # of mutating PK attrs, leaving them unflushed,
+ # calling session.refresh(), and expecting those attrs to remain
+ # still unflushed. It seems likely someone doing all those
+ # things would be better off having the PK attributes flushed
+ # to the database before tinkering like that (session.refresh() is
+ # tinkering).
+ raise sa_exc.InvalidRequestError(
+ f"Please flush pending primary key changes on "
+ "attributes "
+ f"{has_changes} for mapper {refresh_state.mapper} before "
+ "proceeding with a refresh"
+ )
+
+ # overall, the ORM has no internal flow right now for "dont load the
+ # primary row of an object at all, but fire off
+ # selectinload/subqueryload/immediateload for some relationships".
+ # It would probably be a pretty big effort to add such a flow. So
+ # here, the case for #8703 is introduced; user asks to refresh some
+ # relationship attributes only which are
+ # selectinload/subqueryload/immediateload/ etc. (not joinedload).
+ # ORM complains there's no columns in the primary row to load.
+ # So here, we just add the PK cols if that
+ # case is detected, so that there is a SELECT emitted for the primary
+ # row.
+ #
+ # Let's just state right up front, for this one little case,
+ # the ORM here is adding a whole extra SELECT just to satisfy
+ # limitations in the internal flow. This is really not a thing
+ # SQLAlchemy finds itself doing like, ever, obviously, we are
+ # constantly working to *remove* SELECTs we don't need. We
+ # rationalize this for now based on 1. session.refresh() is not
+ # commonly used 2. session.refresh() with only relationship attrs is
+ # even less commonly used 3. the SELECT in question is very low
+ # latency.
+ #
+ # to add the flow to not include the SELECT, the quickest way
+ # might be to just manufacture a single-row result set to send off to
+ # instances(), but we'd have to weave that into context.py and all
+ # that. For 2.0.0, we have enough big changes to navigate for now.
+ #
+ mp = refresh_state.mapper._props
+ for p in only_load_props:
+ if mp[p]._is_relationship:
+ only_load_props = refresh_state_prokeys.union(only_load_props)
+ break
+
if refresh_state and refresh_state.load_options:
compile_options += {"_current_path": refresh_state.load_path.parent}
q = q.options(*refresh_state.load_options)
refresh_state=refresh_state,
identity_token=identity_token,
)
+
q._compile_options = new_compile_options
q._order_by = None
"attribute refresh operation cannot proceed" % (state_str(state))
)
- has_key = bool(state.key)
-
- result = False
-
no_autoflush = bool(passive & attributes.NO_AUTOFLUSH)
# in the case of inheritance, particularly concrete and abstract
attribute_names = attribute_names.intersection(mapper.attrs.keys())
if mapper.inherits and not mapper.concrete:
+ # load based on committed attributes in the object, formed into
+ # a truncated SELECT that only includes relevant tables. does not
+ # currently use state.key
statement = mapper._optimized_get_statement(state, attribute_names)
if statement is not None:
- from .query import FromStatement
-
# undefer() isn't needed here because statement has the
# columns needed already, this implicitly undefers that column
stmt = FromStatement(mapper, statement)
- result = load_on_ident(
+ return load_on_ident(
session,
stmt,
None,
no_autoflush=no_autoflush,
)
- if result is False:
- if has_key:
- identity_key = state.key
- else:
- # this codepath is rare - only valid when inside a flush, and the
- # object is becoming persistent but hasn't yet been assigned
- # an identity_key.
- # check here to ensure we have the attrs we need.
- pk_attrs = [
- mapper._columntoproperty[col].key for col in mapper.primary_key
- ]
- if state.expired_attributes.intersection(pk_attrs):
- raise sa_exc.InvalidRequestError(
- "Instance %s cannot be refreshed - it's not "
- " persistent and does not "
- "contain a full primary key." % state_str(state)
- )
- identity_key = mapper._identity_key_from_state(state)
-
- if (
- _none_set.issubset(identity_key) and not mapper.allow_partial_pks
- ) or _none_set.issuperset(identity_key):
- util.warn_limited(
- "Instance %s to be refreshed doesn't "
- "contain a full primary key - can't be refreshed "
- "(and shouldn't be expired, either).",
- state_str(state),
+ # normal load, use state.key as the identity to SELECT
+ has_key = bool(state.key)
+
+ if has_key:
+ identity_key = state.key
+ else:
+ # this codepath is rare - only valid when inside a flush, and the
+ # object is becoming persistent but hasn't yet been assigned
+ # an identity_key.
+ # check here to ensure we have the attrs we need.
+ pk_attrs = [
+ mapper._columntoproperty[col].key for col in mapper.primary_key
+ ]
+ if state.expired_attributes.intersection(pk_attrs):
+ raise sa_exc.InvalidRequestError(
+ "Instance %s cannot be refreshed - it's not "
+ " persistent and does not "
+ "contain a full primary key." % state_str(state)
)
- return
+ identity_key = mapper._identity_key_from_state(state)
- result = load_on_ident(
- session,
- future.select(mapper).set_label_style(
- LABEL_STYLE_TABLENAME_PLUS_COL
- ),
- identity_key,
- refresh_state=state,
- only_load_props=attribute_names,
- no_autoflush=no_autoflush,
+ if (
+ _none_set.issubset(identity_key) and not mapper.allow_partial_pks
+ ) or _none_set.issuperset(identity_key):
+ util.warn_limited(
+ "Instance %s to be refreshed doesn't "
+ "contain a full primary key - can't be refreshed "
+ "(and shouldn't be expired, either).",
+ state_str(state),
)
+ return
+
+ result = load_on_ident(
+ session,
+ select(mapper).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL),
+ identity_key,
+ refresh_state=state,
+ only_load_props=attribute_names,
+ no_autoflush=no_autoflush,
+ )
# if instance is pending, a refresh operation
# may not complete (even if PK attributes are assigned)
self._expire_state(state, attribute_names)
+ # this autoflush previously used to occur as a secondary effect
+ # of the load_on_ident below. Meaning we'd organize the SELECT
+ # based on current DB pks, then flush, then if pks changed in that
+ # flush, crash. this was unticketed but discovered as part of
+ # #8703. So here, autoflush up front, dont autoflush inside
+ # load_on_ident.
+ self._autoflush()
+
if with_for_update == {}:
raise sa_exc.ArgumentError(
"with_for_update should be the boolean value "
refresh_state=state,
with_for_update=with_for_update,
only_load_props=attribute_names,
+ require_pk_cols=True,
+ # technically unnecessary as we just did autoflush
+ # above, however removes the additional unnecessary
+ # call to _autoflush()
+ no_autoflush=True,
)
is None
):
from .base import _DEFER_FOR_STATE
from .base import _RAISE_FOR_STATE
from .base import _SET_DEFERRED_EXPIRED
+from .base import ATTR_WAS_SET
from .base import LoaderCallableStatus
from .base import PASSIVE_OFF
from .base import PassiveFlag
alternate_effective_path=alternate_effective_path,
execution_options=execution_options,
)
- state.get_impl(key).set_committed_value(state, dict_, value)
+ if value is not ATTR_WAS_SET:
+ state.get_impl(key).set_committed_value(
+ state, dict_, value
+ )
@log.class_logger
)
)
+ def test_refresh_column(self):
+ """refresh currently does not use the mapper "optimized get".
+
+ This could be added later by generalizing the code in
+ loading.py->load_scalar_attributes() to be used by session.refresh().
+
+ For #8703, where we are revisiting some of this logic for 2.0.0,
+ not doing this yet as enough is changing in 2.0 already.
+
+ """
+ A, B = self.classes("A", "B")
+ sess = fixture_session()
+ b1 = B(data="x")
+ sess.add(b1)
+ sess.flush()
+ pk = b1.id
+ sess.expire(b1, ["data"])
+
+ with self.sql_execution_asserter(testing.db) as asserter:
+ sess.refresh(b1, ["data"])
+
+ asserter.assert_(
+ CompiledSQL(
+ # full statement that has a JOIN in it. Note that
+ # a.id is not included in the SELECT list
+ "SELECT b.data FROM a JOIN b ON a.id = b.id "
+ "WHERE a.id = :pk_1",
+ [{"pk_1": pk}]
+ # if we used load_scalar_attributes(), it would look like
+ # this
+ # "SELECT b.data AS b_data FROM b WHERE :param_1 = b.id",
+ # [{"param_1": b_id}],
+ )
+ )
+
def test_load_from_unloaded_subclass(self):
A, B = self.classes("A", "B")
sess = fixture_session()
"""Attribute/instance expiration, deferral of attributes, etc."""
+import re
+
import sqlalchemy as sa
from sqlalchemy import exc as sa_exc
from sqlalchemy import FetchedValue
from sqlalchemy.orm import defer
from sqlalchemy.orm import deferred
from sqlalchemy.orm import exc as orm_exc
+from sqlalchemy.orm import immediateload
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import lazyload
from sqlalchemy.orm import make_transient_to_detached
from sqlalchemy.orm import relationship
+from sqlalchemy.orm import selectinload
from sqlalchemy.orm import Session
from sqlalchemy.orm import strategies
+from sqlalchemy.orm import subqueryload
from sqlalchemy.orm import undefer
from sqlalchemy.sql import select
from sqlalchemy.testing import assert_raises
],
)
- def test_refresh_collection_exception(self):
- """test graceful failure for currently unsupported
- immediate refresh of a collection"""
-
- users, Address, addresses, User = (
- self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User,
- )
-
- self.mapper_registry.map_imperatively(
- User,
- users,
- properties={
- "addresses": relationship(
- Address, order_by=addresses.c.email_address
- )
- },
- )
- self.mapper_registry.map_imperatively(Address, addresses)
- s = fixture_session(
- autoflush=True,
- )
- u = s.get(User, 8)
- assert_raises_message(
- sa_exc.InvalidRequestError,
- "properties specified for refresh",
- s.refresh,
- u,
- ["addresses"],
- )
-
def test_refresh_cancels_expire(self):
users, User = self.tables.users, self.classes.User
self.assert_sql_count(testing.db, go, 1)
+ @testing.combinations(
+ "selectin", "joined", "subquery", "immediate", argnames="lazy"
+ )
+ @testing.variation(
+ "as_option",
+ [True, False],
+ )
+ @testing.variation(
+ "expire_first", [True, False, "not_pk", "not_pk_plus_pending"]
+ )
+ @testing.variation("include_column", [True, False, "no_attrs"])
+ @testing.variation("autoflush", [True, False])
+ def test_load_only_relationships(
+ self, lazy, expire_first, include_column, as_option, autoflush
+ ):
+ """test #8703, #8997 as well as a regression for #8996"""
+
+ users, Address, addresses, User = (
+ self.tables.users,
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User,
+ )
+
+ if expire_first.not_pk_plus_pending:
+ # if the test will be flushing a pk change, choose the user_id
+ # that has no address rows so that we dont get an FK violation.
+ # test only looks for the presence of "addresses" collection,
+ # not the contents
+ target_id = 10
+ target_name = "chuck"
+ else:
+ # for all the other cases use user_id 8 where the addresses
+ # collection has some elements. this could theoretically catch
+ # any odd per-row issues with the collection load
+ target_id = 8
+ target_name = "ed"
+
+ self.mapper_registry.map_imperatively(
+ User,
+ users,
+ properties={
+ "addresses": relationship(
+ Address,
+ backref="user",
+ lazy=lazy if not as_option else "select",
+ )
+ },
+ )
+ self.mapper_registry.map_imperatively(Address, addresses)
+ sess = fixture_session(autoflush=bool(autoflush))
+ if as_option:
+ fn = {
+ "joined": joinedload,
+ "selectin": selectinload,
+ "subquery": subqueryload,
+ "immediate": immediateload,
+ }[lazy]
+
+ u = sess.get(
+ User,
+ target_id,
+ options=([fn(User.addresses)] if as_option else []),
+ )
+
+ if expire_first.not_pk_plus_pending:
+ u.id = 25
+ sess.expire(u, ["name", "addresses"])
+
+ assert "addresses" not in u.__dict__
+ assert "name" not in u.__dict__
+ name_is_expired = True
+ elif expire_first.not_pk:
+ sess.expire(u, ["name", "addresses"])
+ assert "id" in u.__dict__
+ assert "addresses" not in u.__dict__
+ assert "name" not in u.__dict__
+ name_is_expired = True
+ elif expire_first:
+ sess.expire(u)
+ assert "id" not in u.__dict__
+ assert "addresses" not in u.__dict__
+ assert "name" not in u.__dict__
+ name_is_expired = True
+ else:
+ name_is_expired = False
+
+ if (
+ expire_first.not_pk_plus_pending
+ and not autoflush
+ and not include_column.no_attrs
+ ):
+ with expect_raises_message(
+ sa_exc.InvalidRequestError,
+ re.escape(
+ "Please flush pending primary key changes on attributes "
+ "{'id'} for mapper Mapper[User(users)] before proceeding "
+ "with a refresh"
+ ),
+ ):
+ if include_column:
+ sess.refresh(u, ["name", "addresses"])
+ else:
+ sess.refresh(u, ["addresses"])
+
+ return
+
+ with self.sql_execution_asserter(testing.db) as asserter:
+ if include_column.no_attrs:
+ sess.refresh(u)
+ name_is_expired = False
+ id_was_refreshed = True
+ elif include_column:
+ sess.refresh(u, ["name", "addresses"])
+ name_is_expired = False
+ id_was_refreshed = False
+ else:
+ sess.refresh(u, ["addresses"])
+ id_was_refreshed = False
+
+ expected_count = 2 if lazy != "joined" else 1
+ if (
+ autoflush
+ and expire_first.not_pk_plus_pending
+ and not id_was_refreshed
+ ):
+ expected_count += 1
+
+ asserter.assert_(CountStatements(expected_count))
+
+ # pk there for all cases
+ assert "id" in u.__dict__
+
+ if name_is_expired:
+ assert "name" not in u.__dict__
+ else:
+ assert "name" in u.__dict__
+
+ assert "addresses" in u.__dict__
+ u.addresses
+ assert "addresses" in u.__dict__
+ if include_column:
+ eq_(u.__dict__["name"], target_name)
+
+ if expire_first.not_pk_plus_pending and not id_was_refreshed:
+ eq_(u.__dict__["id"], 25)
+ else:
+ eq_(u.__dict__["id"], target_id)
+
+ @testing.variation("expire_first", [True, False])
+ @testing.variation("autoflush", [True, False])
+ @testing.variation("ensure_name_cleared", [True, False])
+ def test_no_pending_pks_on_refresh(
+ self, expire_first, autoflush, ensure_name_cleared
+ ):
+ users = self.tables.users
+ User = self.classes.User
+
+ self.mapper_registry.map_imperatively(
+ User,
+ users,
+ )
+ sess = fixture_session(autoflush=bool(autoflush))
+
+ u = sess.get(User, 10)
+
+ u.id = 25
+
+ if ensure_name_cleared:
+ u.name = "newname"
+
+ if expire_first:
+ sess.expire(u, ["name"])
+
+ if ensure_name_cleared and not expire_first:
+ eq_(inspect(u).attrs.name.history, (["newname"], (), ["chuck"]))
+
+ if not autoflush:
+ with expect_raises_message(
+ sa_exc.InvalidRequestError,
+ re.escape(
+ "Please flush pending primary key changes on attributes "
+ "{'id'} for mapper Mapper[User(users)] before proceeding "
+ "with a refresh"
+ ),
+ ):
+
+ sess.refresh(u, ["name"])
+
+ # id was not expired
+ eq_(inspect(u).attrs.id.history, ([25], (), [10]))
+
+ # name was expired
+ eq_(inspect(u).attrs.name.history, ((), (), ()))
+
+ else:
+ sess.refresh(u, ["name"])
+
+ # new id value stayed
+ eq_(u.__dict__["id"], 25)
+
+ # was autoflushed
+ eq_(inspect(u).attrs.id.history, ((), [25], ()))
+
+ # new value for "name" is lost, value was refreshed
+ eq_(inspect(u).attrs.name.history, ((), ["chuck"], ()))
+
def test_expire_synonym(self):
User, users = self.classes.User, self.tables.users