# the MIT License: http://www.opensource.org/licenses/mit-license.php
import weakref
-
-from sqlalchemy import util as base_util
from sqlalchemy.orm import attributes
def add(self, state):
raise NotImplementedError()
- def remove(self, state):
- raise NotImplementedError()
-
def update(self, dict):
raise NotImplementedError("IdentityMap uses add() to insert data")
class WeakInstanceDict(IdentityMap):
def __init__(self):
IdentityMap.__init__(self)
- self._remove_mutex = base_util.threading.Lock()
def __getitem__(self, key):
state = dict.__getitem__(self, key)
dict.__setitem__(self, key, state)
self._manage_incoming_state(state)
- def remove_key(self, key):
- state = dict.__getitem__(self, key)
- self.remove(state)
-
- def remove(self, state):
- self._remove_mutex.acquire()
- try:
- if dict.pop(self, state.key) is not state:
- raise AssertionError(
- "State %s is not present in this "
- "identity map" % state)
- finally:
- self._remove_mutex.release()
-
- self._manage_removed_state(state)
-
- def discard(self, state):
- if self.contains_state(state):
- dict.__delitem__(self, state.key)
- self._manage_removed_state(state)
-
def get(self, key, default=None):
if not dict.__contains__(self, key):
return default
return default
return o
- def items(self):
+ def _items(self):
+ values = self.all_states()
+ result = []
+ for state in values:
+ value = state.obj()
+ if value is not None:
+ result.append((state.key, value))
+ return result
+
+ def _values(self):
+ values = self.all_states()
+ result = []
+ for state in values:
+ value = state.obj()
+ if value is not None:
+ result.append(value)
+
+ return result
+
+ # Py3K
+ #def items(self):
+ # return iter(self._items())
+ #
+ #def values(self):
+ # return iter(self._values())
# Py2K
- return list(self.iteritems())
-
+ items = _items
def iteritems(self):
- # end Py2K
- self._remove_mutex.acquire()
- try:
- result = []
- for state in dict.values(self):
- value = state.obj()
- if value is not None:
- result.append((state.key, value))
-
- return iter(result)
- finally:
- self._remove_mutex.release()
-
- def values(self):
- # Py2K
- return list(self.itervalues())
+ return iter(self.items())
+ values = _values
def itervalues(self):
+ return iter(self.values())
# end Py2K
- self._remove_mutex.acquire()
- try:
- result = []
- for state in dict.values(self):
- value = state.obj()
- if value is not None:
- result.append(value)
-
- return iter(result)
- finally:
- self._remove_mutex.release()
def all_states(self):
- self._remove_mutex.acquire()
- try:
- # Py3K
- # return list(dict.values(self))
+ # Py3K
+ # return list(dict.values(self))
+ # Py2K
+ return dict.values(self)
+ # end Py2K
- # Py2K
- return dict.values(self)
- # end Py2K
- finally:
- self._remove_mutex.release()
+ def discard(self, state):
+ st = dict.get(self, state.key, None)
+ if st is state:
+ dict.__delitem__(self, state.key)
+ self._manage_removed_state(state)
def prune(self):
return 0
dict.__setitem__(self, state.key, state.obj())
self._manage_incoming_state(state)
- def remove(self, state):
- if attributes.instance_state(dict.pop(self, state.key)) \
- is not state:
- raise AssertionError('State %s is not present in this '
- 'identity map' % state)
- self._manage_removed_state(state)
-
def discard(self, state):
- if self.contains_state(state):
- dict.__delitem__(self, state.key)
- self._manage_removed_state(state)
-
- def remove_key(self, key):
- state = attributes.instance_state(dict.__getitem__(self, key))
- self.remove(state)
+ obj = dict.get(self, state.key, None)
+ if obj is not None:
+ st = attributes.instance_state(obj)
+ if st is state:
+ dict.__delitem__(self, state.key)
+ self._manage_removed_state(state)
def prune(self):
"""prune unreferenced, non-dirty states."""