support that relationship. This applies to both a single and a
joined inheritance polymorphic load.
+ .. change:: 3991
+ :tags: bug, orm
+ :tickets: 3991
+
+ Added new argument :paramref:`.with_for_update` to the
+ :meth:`.Session.refresh` method. When the :meth:`.Query.with_lockmode`
+ method were deprecated in favor of :meth:`.Query.with_for_update`,
+ the :meth:`.Session.refresh` method was never updated to reflect
+ the new option.
+
+ .. seealso::
+
+ :ref:`change_3991`
+
.. change:: 3984
:tags: bug, orm
:tickets: 3984
:ticket:`3303`
+.. _change_3991:
+
+Added "for update" arguments to Session.refresh
+------------------------------------------------
+
+Added new argument :paramref:`.with_for_update` to the
+:meth:`.Session.refresh` method. When the :meth:`.Query.with_lockmode`
+method were deprecated in favor of :meth:`.Query.with_for_update`,
+the :meth:`.Session.refresh` method was never updated to reflect
+the new option::
+
+ session.refresh(some_object, with_for_update=True)
+
+The :paramref:`.Session.refresh.with_for_update` argument accepts a dictionary
+of options that will be passed as the same arguments which are sent to
+:meth:`.Query.with_for_update`::
+
+ session.refresh(some_objects with_for_update={"read": True})
+
+The new parameter supersedes the :paramref:`.Session.refresh.lockmode`
+parameter.
+
+:ticket:`3991`
+
New Features and Improvements - Core
====================================
def load_on_ident(query, key,
- refresh_state=None, lockmode=None,
+ refresh_state=None, with_for_update=None,
only_load_props=None):
"""Load the given identity key from the database."""
q._params = params
- if lockmode is not None:
+ # with_for_update needs to be query.LockmodeArg()
+ if with_for_update is not None:
version_check = True
- q = q.with_lockmode(lockmode)
+ q._for_update_arg = with_for_update
elif query._for_update_arg is not None:
version_check = True
q._for_update_arg = query._for_update_arg
"flush is occurring prematurely")
util.raise_from_cause(e)
- def refresh(self, instance, attribute_names=None, lockmode=None):
+ def refresh(
+ self, instance, attribute_names=None, with_for_update=None,
+ lockmode=None):
"""Expire and refresh the attributes on the given instance.
A query will be issued to the database and all attributes will be
string attribute names indicating a subset of attributes to
be refreshed.
+ :param with_for_update: optional boolean ``True`` indicating FOR UPDATE
+ should be used, or may be a dictionary containing flags to
+ indicate a more specific set of FOR UPDATE flags for the SELECT;
+ flags should match the parameters of :meth:`.Query.with_for_update`.
+ Supersedes the :paramref:`.Session.refresh.lockmode` parameter.
+
+ .. versionadded:: 1.2
+
:param lockmode: Passed to the :class:`~sqlalchemy.orm.query.Query`
as used by :meth:`~sqlalchemy.orm.query.Query.with_lockmode`.
+ Superseded by :paramref:`.Session.refresh.with_for_update`.
.. seealso::
self._expire_state(state, attribute_names)
+ if with_for_update == {}:
+ raise sa_exc.ArgumentError(
+ "with_for_update should be the boolean value "
+ "True, or a dictionary with options. "
+ "A blank dictionary is ambiguous.")
+
+ if lockmode:
+ with_for_update = query.LockmodeArg.parse_legacy_query(lockmode)
+ elif with_for_update is not None:
+ if with_for_update is True:
+ with_for_update = query.LockmodeArg()
+ elif with_for_update:
+ with_for_update = query.LockmodeArg(**with_for_update)
+ else:
+ with_for_update = None
+
if loading.load_on_ident(
self.query(object_mapper(instance)),
state.key, refresh_state=state,
- lockmode=lockmode,
+ with_for_update=with_for_update,
only_load_props=attribute_names) is None:
raise sa_exc.InvalidRequestError(
"Could not refresh instance '%s'" %
else:
return True
+ def __eq__(self, other):
+ return (
+ isinstance(other, ForUpdateArg) and
+ other.nowait == self.nowait and
+ other.read == self.read and
+ other.skip_locked == self.skip_locked and
+ other.key_share == self.key_share and
+ other.of is self.of
+ )
+
+ def __hash__(self):
+ return id(self)
+
def _copy_internals(self, clone=_clone, **kw):
if self.of is not None:
self.of = [clone(col, **kw) for col in self.of]
from test.orm import _fixtures
from sqlalchemy import event, ForeignKey
from sqlalchemy.util.compat import inspect_getargspec
-
+from sqlalchemy.testing import mock
class ExecutionTest(_fixtures.FixtureTest):
run_inserts = None
self._test_instance_guards(early)
self._test_class_guards(early, is_class=False)
+ def test_refresh_arg_signature(self):
+ class Mapped(object):
+ pass
+ self._map_it(Mapped)
+
+ m1 = Mapped()
+ s = create_session()
+
+ with mock.patch.object(s, "_validate_persistent"):
+ assert_raises_message(
+ sa.exc.ArgumentError,
+ "with_for_update should be the boolean value True, "
+ "or a dictionary with options",
+ s.refresh, m1, with_for_update={}
+ )
+
+ with mock.patch(
+ "sqlalchemy.orm.session.loading.load_on_ident"
+ ) as load_on_ident:
+ s.refresh(m1, with_for_update={"read": True})
+ s.refresh(m1, with_for_update=True)
+ s.refresh(m1, with_for_update=False)
+ s.refresh(m1)
+
+ from sqlalchemy.orm.query import LockmodeArg
+ eq_(
+ [
+ call[-1]['with_for_update']
+ for call in load_on_ident.mock_calls],
+ [LockmodeArg(read=True), LockmodeArg(), None, None]
+ )
class TLTransactionTest(fixtures.MappedTest):
run_dispose_bind = 'once'
f1s2.value = 'f1 new value'
s2.commit()
+ # load, version is wrong
+ assert_raises_message(
+ sa.orm.exc.StaleDataError,
+ r"Instance .* has version id '\d+' which does not "
+ r"match database-loaded version id '\d+'",
+ s1.query(Foo).with_for_update(read=True).get, f1s1.id
+ )
+
+ # reload it - this expires the old version first
+ s1.refresh(f1s1, with_for_update={"read": True})
+
+ # now assert version OK
+ s1.query(Foo).with_for_update(read=True).get(f1s1.id)
+
+ # assert brand new load is OK too
+ s1.close()
+ s1.query(Foo).with_for_update(read=True).get(f1s1.id)
+
+ @testing.emits_warning(r'.*does not support updated rowcount')
+ @engines.close_open_connections
+ def test_versioncheck_legacy(self):
+ """query.with_lockmode performs a 'version check' on an already loaded
+ instance"""
+
+ Foo = self.classes.Foo
+
+ s1 = self._fixture()
+ f1s1 = Foo(value='f1 value')
+ s1.add(f1s1)
+ s1.commit()
+
+ s2 = create_session(autocommit=False)
+ f1s2 = s2.query(Foo).get(f1s1.id)
+ f1s2.value = 'f1 new value'
+ s2.commit()
+
# load, version is wrong
assert_raises_message(
sa.orm.exc.StaleDataError,
s1.add(f1s1)
s1.commit()
+ s2 = create_session(autocommit=False)
+ f1s2 = s2.query(Foo).get(f1s1.id)
+ # not sure if I like this API
+ s2.refresh(f1s2, with_for_update=True)
+ f1s2.value = 'f1 new value'
+
+ assert_raises(
+ exc.DBAPIError,
+ s1.refresh, f1s1, lockmode='update_nowait'
+ )
+ s1.rollback()
+
+ s2.commit()
+ s1.refresh(f1s1, with_for_update={"nowait": True})
+ assert f1s1.version_id == f1s2.version_id
+
+ @testing.emits_warning(r'.*does not support updated rowcount')
+ @engines.close_open_connections
+ @testing.requires.update_nowait
+ def test_versioncheck_for_update_legacy(self):
+ """query.with_lockmode performs a 'version check' on an already loaded
+ instance"""
+
+ Foo = self.classes.Foo
+
+ s1 = self._fixture()
+ f1s1 = Foo(value='f1 value')
+ s1.add(f1s1)
+ s1.commit()
+
s2 = create_session(autocommit=False)
f1s2 = s2.query(Foo).get(f1s1.id)
s2.refresh(f1s2, lockmode='update')