--- /dev/null
+.. change::
+ :tags: bug, orm
+ :tickets: 5226
+
+ The refresh of an expired object will now trigger an autoflush if the list
+ of expired attributes include one or more attributes that were explicitly
+ expired or refreshed using the :meth:`.Session.expire` or
+ :meth:`.Session.refresh` methods. This is an attempt to find a middle
+ ground between the normal unexpiry of attributes that can happen in many
+ cases where autoflush is not desirable, vs. the case where attributes are
+ being explicitly expired or refreshed and it is possible that these
+ attributes depend upon other pending state within the session that needs to
+ be flushed. The two methods now also gain a new flag
+ :paramref:`.Session.expire.autoflush` and
+ :paramref:`.Session.refresh.autoflush`, defaulting to True; when set to
+ False, this will disable the autoflush that occurs on unexpire for these
+ attributes.
def load_on_ident(
- query, key, refresh_state=None, with_for_update=None, only_load_props=None
+ query,
+ key,
+ refresh_state=None,
+ with_for_update=None,
+ only_load_props=None,
+ no_autoflush=False,
):
"""Load the given identity key from the database."""
if key is not None:
else:
ident = identity_token = None
+ if no_autoflush:
+ query = query.autoflush(False)
+
return load_on_pk_identity(
query,
ident,
pl.loaders[token] = (token, limit_to_mapper, loader_callable, arg, kw)
-def load_scalar_attributes(mapper, state, attribute_names):
+def load_scalar_attributes(mapper, state, attribute_names, passive):
"""initiate a column-based attribute refresh operation."""
# assert mapper is _state_mapper(state)
result = False
+ no_autoflush = passive & attributes.NO_AUTOFLUSH
+
# in the case of inheritance, particularly concrete and abstract
# concrete inheritance, the class manager might have some keys
# of attributes on the superclass that we didn't actually map.
None,
only_load_props=attribute_names,
refresh_state=state,
+ no_autoflush=no_autoflush,
)
if result is False:
identity_key,
refresh_state=state,
only_load_props=attribute_names,
+ no_autoflush=no_autoflush,
)
# if instance is pending, a refresh operation
def __iter__(self):
context = self._compile_context()
context.statement.label_style = LABEL_STYLE_TABLENAME_PLUS_COL
- if self._autoflush and not self._populate_existing:
+ if self._autoflush:
self.session._autoflush()
return self._execute_and_instances(context)
)
util.raise_(e, with_traceback=sys.exc_info()[2])
- def refresh(
- self, instance, attribute_names=None, with_for_update=None,
- ):
+ def refresh(self, instance, attribute_names=None, with_for_update=None):
"""Expire and refresh the attributes on the given instance.
A query will be issued to the database and all attributes will be
for o, m, st_, dct_ in cascaded:
self._conditional_expire(st_)
- def _conditional_expire(self, state):
+ def _conditional_expire(self, state, autoflush=None):
"""Expire a state if persistent, else expunge if pending"""
if state.key:
def _expire(self, dict_, modified_set):
self.expired = True
-
if self.modified:
modified_set.discard(self)
self.committed_state.clear()
if not self.manager[attr].impl.load_on_unexpire
)
- self.manager.expired_attribute_loader(self, toload)
+ self.manager.expired_attribute_loader(self, toload, passive)
# if the loader failed, or this
# instance state didn't have an identity,
data = {"a": "this is a", "b": 12}
- def loader(state, keys):
+ def loader(state, keys, passive):
for k in keys:
state.dict[k] = data[k]
return attributes.ATTR_WAS_SET
data = {"a": "this is a", "b": 12}
- def loader(state, keys):
+ def loader(state, keys, passive):
for k in keys:
state.dict[k] = data[k]
return attributes.ATTR_WAS_SET
def test_deferred_pickleable(self):
data = {"a": "this is a", "b": 12}
- def loader(state, keys):
+ def loader(state, keys, passive):
for k in keys:
state.dict[k] = data[k]
return attributes.ATTR_WAS_SET
state.dict.pop("someattr", None)
state.expired_attributes.add("someattr")
- def scalar_loader(state, toload):
+ def scalar_loader(state, toload, passive):
state.dict["someattr"] = "one"
state.manager.expired_attribute_loader = scalar_loader
sess.add(u1)
sess.commit()
+ # in this case, u1.address has active history set, because
+ # this operation necessarily replaces the old object which must be
+ # loaded.
+ # the set operation requires that "u1" is unexpired, because the
+ # replace operation wants to load the
+ # previous value. The original test case for #2921 only included
+ # that the lazyload operation passed a no autoflush flag through
+ # to the operation, however in #5226 this has been enhanced to pass
+ # the no autoflush flag down through to the unexpire of the attributes
+ # as well, so that attribute unexpire can otherwise invoke autoflush.
+ assert "id" not in u1.__dict__
a2 = Address(email_address="asdf")
sess.add(a2)
u1.address = a2
import sqlalchemy as sa
from sqlalchemy import ForeignKey
+from sqlalchemy import func
from sqlalchemy import Integer
+from sqlalchemy import select
from sqlalchemy import testing
from sqlalchemy import util
from sqlalchemy.orm import aliased
eq_(a1.id, 1)
assert "x" in a1.__dict__
+
+
+class AutoflushTest(fixtures.DeclarativeMappedTest):
+ @classmethod
+ def setup_classes(cls):
+ Base = cls.DeclarativeBasic
+
+ class A(Base):
+ __tablename__ = "a"
+
+ id = Column(Integer, primary_key=True)
+ bs = relationship("B")
+
+ class B(Base):
+ __tablename__ = "b"
+ id = Column(Integer, primary_key=True)
+ a_id = Column(ForeignKey("a.id"))
+
+ A.b_count = deferred(
+ select([func.count(1)]).where(A.id == B.a_id).scalar_subquery()
+ )
+
+ def test_deferred_autoflushes(self):
+ A, B = self.classes("A", "B")
+
+ s = Session()
+
+ a1 = A(id=1, bs=[B()])
+ s.add(a1)
+ s.commit()
+
+ eq_(a1.b_count, 1)
+ s.close()
+
+ a1 = s.query(A).first()
+ assert "b_count" not in a1.__dict__
+
+ b1 = B(a_id=1)
+ s.add(b1)
+
+ eq_(a1.b_count, 2)
+
+ assert b1 in s
from sqlalchemy import desc
from sqlalchemy import exc
from sqlalchemy import func
+from sqlalchemy import inspect
from sqlalchemy import Integer
from sqlalchemy import select
from sqlalchemy import testing
elif isinstance(obj, self.classes.Order):
attrname = "items"
- eq_(attributes.get_history(obj, attrname), compare)
+ sess = inspect(obj).session
- if compare_passive is None:
- compare_passive = compare
+ if sess:
+ sess.autoflush = False
+ try:
+ eq_(attributes.get_history(obj, attrname), compare)
- eq_(
- attributes.get_history(
- obj, attrname, attributes.LOAD_AGAINST_COMMITTED
- ),
- compare_passive,
- )
+ if compare_passive is None:
+ compare_passive = compare
+
+ eq_(
+ attributes.get_history(
+ obj, attrname, attributes.LOAD_AGAINST_COMMITTED
+ ),
+ compare_passive,
+ )
+ finally:
+ if sess:
+ sess.autoflush = True
def test_append_transient(self):
u1, a1 = self._transient_fixture()
self.assert_sql_count(testing.db, go, 0)
+ def test_expire_autoflush(self):
+ User, users = self.classes.User, self.tables.users
+ Address, addresses = self.classes.Address, self.tables.addresses
+
+ mapper(User, users)
+ mapper(Address, addresses, properties={"user": relationship(User)})
+
+ s = Session()
+
+ a1 = s.query(Address).get(2)
+ u1 = s.query(User).get(7)
+ a1.user = u1
+
+ s.expire(a1, ["user_id"])
+
+ # autoflushes
+ eq_(a1.user_id, 7)
+
def test_persistence_check(self):
users, User = self.tables.users, self.classes.User
lambda: s.refresh(u),
)
+ def test_refresh_autoflush(self):
+ User, users = self.classes.User, self.tables.users
+ Address, addresses = self.classes.Address, self.tables.addresses
+
+ mapper(User, users)
+ mapper(Address, addresses, properties={"user": relationship(User)})
+
+ s = Session()
+
+ a1 = s.query(Address).get(2)
+ u1 = s.query(User).get(7)
+ a1.user = u1
+
+ s.refresh(a1, ["user_id"])
+
+ # autoflushes
+ eq_(a1.user_id, 7)
+
def test_refresh_expired(self):
User, users = self.classes.User, self.tables.users
assert c3 in p1.children
def test_autoflush_on_pending(self):
+ # ensure p1.id is not expired
+ p1.id
+
c3 = Child()
sess.add(c3)
c3.parent_id = p1.id
assert c3.parent is None
def test_autoflush_load_on_pending_on_pending(self):
+ # ensure p1.id is not expired
+ p1.id
+
Child.parent.property.load_on_pending = True
c3 = Child()
sess.add(c3)
for manualflush in (False, True):
Child.parent.property.load_on_pending = loadonpending
sess.autoflush = autoflush
+
+ # ensure p2.id not expired
+ p2.id
+
c2 = Child()
sess.add(c2)
c2.parent_id = p2.id
# you should get a FlushError on update.
f1.value = "f1rev2"
- f1.version_id = None
with conditional_sane_rowcount_warnings(
update=True, only_returning=True
):
+ f1.version_id = None
assert_raises_message(
sa.orm.exc.FlushError,
"Instance does not contain a non-NULL version value",
s1.expire_all()
- f1.value = "f2"
- f1.version_id = 2
-
with conditional_sane_rowcount_warnings(
update=True, only_returning=True
):
+ f1.value = "f2"
+ f1.version_id = 2
s1.flush()