except KeyError:
pass
obj.__dict__['_managed_trigger'] = callable
+
+ def untrigger_history(self, obj):
+ del obj.__dict__['_managed_trigger']
+ def has_trigger(self, obj):
+ return obj.__dict__.has_key('_managed_trigger')
+
def reset_history(self, obj, key):
"""removes the history object for the given attribute on the given object.
When the attribute is next accessed, a new container will be created via the
# including modifying any of its related items lists, as its already
# been exposed to being modified by the application.
identitykey = self._identity_key(row)
- if objectstore.get_session().has_key(identitykey):
- instance = objectstore.get_session()._get(identitykey)
+ sess = objectstore.get_session()
+ if sess.has_key(identitykey):
+ instance = sess._get(identitykey)
isnew = False
- if populate_existing:
+ if populate_existing or sess.is_expired(instance, unexpire=True):
if not imap.has_key(identitykey):
imap[identitykey] = instance
for prop in self.props.values():
prop.execute(instance, row, identitykey, imap, True)
-
if self.extension.append_result(self, row, imap, result, instance, isnew, populate_existing=populate_existing):
if result is not None:
result.append_nohistory(instance)
"""invalidates the data in the given objects and sets them to refresh themselves
the next time they are requested."""
for o in obj:
- global_attributes.trigger_history(o, lambda: refresh(o))
+ self.uow.expire(o)
def expunge(self, *obj):
for o in obj:
self.rollback_object(obj)
object_mapper(obj)._get(obj._instance_key, reload=True)
+ def expire(self, obj):
+ self.rollback_object(obj)
+ def exp():
+ object_mapper(obj)._get(obj._instance_key, reload=True)
+ global_attributes.trigger_history(obj, exp)
+
+ def is_expired(self, obj, unexpire=False):
+ ret = global_attributes.has_trigger(obj)
+ if ret and unexpire:
+ global_attributes.untrigger_history(obj)
+ return ret
+
def has_key(self, key):
"""returns True if the given key is present in this UnitOfWork's identity map."""
return self.identity_map.has_key(key)
def rollback_object(self, obj):
"""'rolls back' the attributes that have been changed on an object instance."""
self.attributes.rollback(obj)
+ try:
+ del self.dirty[obj]
+ except KeyError:
+ pass
+ try:
+ del self.deleted[obj]
+ except KeyError:
+ pass
class UOWTransaction(object):
"""handles the details of organizing and executing transaction tasks
self.assert_(a in u.addresses)
objectstore.expire(u)
- # expired, but not refreshed yet. still dirty
- self.assert_(u in objectstore.get_session().uow.dirty)
# get the attribute, it refreshes
self.assert_(u.user_name == 'jack')
self.assert_(a not in u.addresses)
- # not dirty anymore
- self.assert_(u not in objectstore.get_session().uow.dirty)
-
+
+ def testexpire(self):
+ m = mapper(User, users, properties={'addresses':relation(mapper(Address, addresses))})
+ u = m.get(7)
+ u.user_name = 'foo'
+ objectstore.expire(u)
+ # test plain expire
+ self.assert_(u.user_name =='jack')
+
+ # we're changing the database here, so if this test fails in the middle,
+ # it'll screw up the other tests which are hardcoded to 7/'jack'
+ u.user_name = 'foo'
+ objectstore.commit()
+ # change the value in the DB
+ users.update(users.c.user_id==7, values=dict(user_name='jack')).execute()
+ objectstore.expire(u)
+ # object isnt refreshed yet, using dict to bypass trigger
+ self.assert_(u.__dict__['user_name'] != 'jack')
+ # do a select
+ m.select()
+ # test that it refreshed
+ self.assert_(u.__dict__['user_name'] == 'jack')
+
+ # object should be back to normal now,
+ # this should *not* produce a SELECT statement (not tested here though....)
+ self.assert_(u.user_name =='jack')
+
def testrefresh2(self):
assign_mapper(Address, addresses)