limit = kwargs.get('limit', None)
offset = kwargs.get('offset', None)
populate_existing = kwargs.get('populate_existing', False)
+ version_check = kwargs.get('version_check', False)
result = util.UniqueAppender([])
if mappers:
row = cursor.fetchone()
if row is None:
break
- self._instance(session, row, imap, result, populate_existing=populate_existing)
+ self._instance(session, row, imap, result, populate_existing=populate_existing, version_check=version_check)
i = 0
for m in mappers:
m._instance(session, row, imap, otherresults[i])
rows += c.cursor.rowcount
if c.supports_sane_rowcount() and rows != len(update):
- raise exceptions.FlushError("ConcurrencyError - updated rowcount %d does not match number of objects updated %d" % (rows, len(update)))
+ raise exceptions.ConcurrentModificationError("Updated rowcount %d does not match number of objects updated %d" % (rows, len(update)))
if len(insert):
statement = table.insert()
statement = table.delete(clause)
c = connection.execute(statement, delete)
if c.supports_sane_rowcount() and c.rowcount != len(delete):
- raise exceptions.FlushError("ConcurrencyError - updated rowcount %d does not match number of objects updated %d" % (c.cursor.rowcount, len(delete)))
+ raise exceptions.ConcurrentModificationError("Updated rowcount %d does not match number of objects updated %d" % (c.cursor.rowcount, len(delete)))
[self.extension.after_delete(self, connection, obj) for obj in deleted_objects]
def get_select_mapper(self):
return self.__surrogate_mapper or self
- def _instance(self, session, row, imap, result = None, populate_existing = False):
+ def _instance(self, session, row, imap, result = None, populate_existing = False, version_check=False):
"""pulls an object instance from the given row and appends it to the given result
list. if the instance already exists in the given identity map, its not added. in
either case, executes all the property loaders on the instance to also process extra
if session.has_key(identitykey):
instance = session._get(identitykey)
isnew = False
+ if version_check and self.version_id_col is not None and self._getattrbycolumn(instance, self.version_id_col) != row[self.version_id_col]:
+ raise exceptions.ConcurrentModificationError("Instance '%s' version of %s does not match %s" % (instance, self._getattrbycolumn(instance, self.version_id_col), row[self.version_id_col]))
+
if populate_existing or session.is_expired(instance, unexpire=True):
if not imap.has_key(identitykey):
imap[identitykey] = instance
def _get(self, key, ident=None, reload=False, lockmode=None):
lockmode = lockmode or self.lockmode
- if not reload and not self.always_refresh and lockmode == None:
+ if not reload and not self.always_refresh and lockmode is None:
try:
return self.session._get(key)
except KeyError:
i += 1
try:
statement = self.compile(self._get_clause, lockmode=lockmode)
- return self._select_statement(statement, params=params, populate_existing=reload)[0]
+ return self._select_statement(statement, params=params, populate_existing=reload, version_check=(lockmode is not None))[0]
except IndexError:
return None
# a concurrent session has modified this, should throw
# an exception
s.flush()
- except exceptions.SQLAlchemyError, e:
+ except exceptions.ConcurrentModificationError, e:
#print e
success = True
assert success
success = False
try:
s.flush()
- except exceptions.SQLAlchemyError, e:
+ except exceptions.ConcurrentModificationError, e:
#print e
success = True
assert success
+ def testversioncheck(self):
+ """test that query.with_lockmode performs a 'version check' on an already loaded instance"""
+ s1 = create_session()
+ class Foo(object):pass
+ assign_mapper(Foo, version_table, version_id_col=version_table.c.version_id)
+ f1s1 =Foo(value='f1', _sa_session=s1)
+ s1.flush()
+ s2 = create_session()
+ f1s2 = s2.query(Foo).get(f1s1.id)
+ f1s2.value='f1 new value'
+ s2.flush()
+ try:
+ # load, version is wrong
+ s1.query(Foo).with_lockmode('read').get(f1s1.id)
+ assert False
+ except exceptions.ConcurrentModificationError, e:
+ assert True
+ # reload it
+ s1.query(Foo).load(f1s1.id)
+ # now assert version OK
+ s1.query(Foo).with_lockmode('read').get(f1s1.id)
+
+ # assert brand new load is OK too
+ s1.clear()
+ s1.query(Foo).with_lockmode('read').get(f1s1.id)
+
+ def testnoversioncheck(self):
+ """test that query.with_lockmode works OK when the mapper has no version id col"""
+ s1 = create_session()
+ class Foo(object):pass
+ assign_mapper(Foo, version_table)
+ f1s1 =Foo(value='f1', _sa_session=s1)
+ f1s1.version_id=0
+ s1.flush()
+ s2 = create_session()
+ f1s2 = s2.query(Foo).with_lockmode('read').get(f1s1.id)
+ assert f1s2.id == f1s1.id
+ assert f1s2.value == f1s1.value
class UnicodeTest(SessionTest):
def setUpAll(self):