callable that, given the current value of the "version_id_col",
returns the next version number. Can be used for alternate
versioning schemes such as uuid, timestamps. [ticket:1692]
+
+ - added "lockmode" kw argument to Session.refresh(), will
+ pass through the string value to Query the same as
+ in with_lockmode(), will also do version check for a
+ version_id_col-enabled mapping.
- Fixed bug in session.rollback() which involved not removing
formerly "pending" objects from the session before
for state, dict_ in states.items():
state.commit_all(dict_, self.identity_map)
- def refresh(self, instance, attribute_names=None):
+ def refresh(self, instance, attribute_names=None, lockmode=None):
"""Refresh the attributes on the given instance.
A query will be issued to the database and all attributes will be
Eagerly-loaded relational attributes will eagerly load within the
single refresh operation.
- The ``attribute_names`` argument is an iterable collection of
- attribute names indicating a subset of attributes to be refreshed.
-
+ :param attribute_names: optional. An iterable collection of
+ string attribute names indicating a subset of attributes to
+ be refreshed.
+
+ :param lockmode: Passed to the :class:`~sqlalchemy.orm.query.Query`
+ as used by :meth:`~sqlalchemy.orm.query.Query.with_lockmode`.
+
"""
try:
state = attributes.instance_state(instance)
self._validate_persistent(state)
if self.query(_object_mapper(instance))._get(
state.key, refresh_state=state,
+ lockmode=lockmode,
only_load_props=attribute_names) is None:
raise sa_exc.InvalidRequestError(
"Could not refresh instance '%s'" %
s2.commit()
# load, version is wrong
- assert_raises(sa.orm.exc.ConcurrentModificationError, s1.query(Foo).with_lockmode('read').get, f1s1.id)
+ assert_raises(
+ sa.orm.exc.ConcurrentModificationError,
+ s1.query(Foo).with_lockmode('read').get, f1s1.id
+ )
+
+ # load, version is wrong
+ assert_raises(
+ sa.orm.exc.ConcurrentModificationError,
+ s1.refresh, f1s1, lockmode='read'
+ )
# reload it
s1.query(Foo).populate_existing().get(f1s1.id)
+
# now assert version OK
s1.query(Foo).with_lockmode('read').get(f1s1.id)
# assert brand new load is OK too
s1.close()
s1.query(Foo).with_lockmode('read').get(f1s1.id)
+
+
+
@testing.emits_warning(r'.*does not support updated rowcount')
@engines.close_open_connections