in conjunction with large mapper graphs, large numbers of
objects:
+ - Removed all* O(N) scanning behavior from the flush() process,
+ i.e. operations that were scanning the full session,
+ including an extremely expensive one that was erroneously
+ assuming primary key values were changing when this
+ was not the case.
+
+ * one edge case remains which may invoke a full scan,
+ if an existing primary key attribute is modified
+ to a new value.
+
- The Session's "weak referencing" behavior is now *full* -
no strong references whatsoever are made to a mapped object
or related items/collections in its __dict__. Backrefs and
__all__ = sorted(name for name, obj in locals().items()
if not (name.startswith('_') or inspect.ismodule(obj)))
-__version__ = '0.5.3'
+__version__ = '0.5.4'
del inspect, sys
uowcommit.register_object(
attributes.instance_state(c),
isdelete=True)
- if not self.passive_updates and self._pks_changed(uowcommit, state):
+ if self._pks_changed(uowcommit, state):
if not history:
- history = uowcommit.get_attribute_history(state, self.key, passive=False)
- for child in history.unchanged:
- if child is not None:
- uowcommit.register_object(child)
+ history = uowcommit.get_attribute_history(state, self.key, passive=self.passive_updates)
+ if history:
+ for child in history.unchanged:
+ if child is not None:
+ uowcommit.register_object(child)
def _synchronize(self, state, child, associationrow, clearkeys, uowcommit):
source = state
sync.populate(source, self.parent, dest, self.mapper, self.prop.synchronize_pairs)
def _pks_changed(self, uowcommit, state):
- return sync.source_changes(uowcommit, state, self.parent, self.prop.synchronize_pairs)
+ return sync.source_modified(uowcommit, state, self.parent, self.prop.synchronize_pairs)
class DetectKeySwitch(DependencyProcessor):
"""a special DP that works for many-to-one relations, fires off for
elem.dict[self.key] is not None and
attributes.instance_state(elem.dict[self.key]) in switchers
]:
- uowcommit.register_object(s, listonly=self.passive_updates)
+ uowcommit.register_object(s)
sync.populate(attributes.instance_state(s.dict[self.key]), self.mapper, s, self.parent, self.prop.synchronize_pairs)
def _pks_changed(self, uowcommit, state):
- return sync.source_changes(uowcommit, state, self.mapper, self.prop.synchronize_pairs)
+ return sync.source_modified(uowcommit, state, self.mapper, self.prop.synchronize_pairs)
class ManyToOneDP(DependencyProcessor):
def __init__(self, prop):
sync.populate_dict(child, self.mapper, associationrow, self.prop.secondary_synchronize_pairs)
def _pks_changed(self, uowcommit, state):
- return sync.source_changes(uowcommit, state, self.parent, self.prop.synchronize_pairs)
+ return sync.source_modified(uowcommit, state, self.parent, self.prop.synchronize_pairs)
class MapperStub(object):
"""Represent a many-to-many dependency within a flush
class IdentityMap(dict):
def __init__(self):
- self._mutable_attrs = {}
- self.modified = False
+ self._mutable_attrs = set()
+ self._modified = set()
self._wr = weakref.ref(self)
def replace(self, state):
def _manage_incoming_state(self, state):
state._instance_dict = self._wr
- if state.modified:
- self.modified = True
+ if state.modified:
+ self._modified.add(state)
if state.manager.mutable_attributes:
- self._mutable_attrs[state] = True
+ self._mutable_attrs.add(state)
def _manage_removed_state(self, state):
del state._instance_dict
+ self._mutable_attrs.discard(state)
+ self._modified.discard(state)
+
+ def _dirty_states(self):
+ return self._modified.union(s for s in self._mutable_attrs if s.modified)
- if state in self._mutable_attrs:
- del self._mutable_attrs[state]
-
def check_modified(self):
"""return True if any InstanceStates present have been marked as 'modified'."""
- if not self.modified:
- for state in list(self._mutable_attrs):
- if state.check_modified():
- return True
- else:
- return False
- else:
+ if self._modified:
return True
+ else:
+ for state in self._mutable_attrs:
+ if state.modified:
+ return True
+ return False
def has_key(self, key):
return key in self
self.session._expunge_state(s)
for s in self.session.identity_map.all_states():
- _expire_state(s, None)
+ _expire_state(s, None, instance_dict=self.session.identity_map)
def _remove_snapshot(self):
assert self._is_transaction_boundary
if not self.nested and self.session.expire_on_commit:
for s in self.session.identity_map.all_states():
- _expire_state(s, None)
+ _expire_state(s, None, instance_dict=self.session.identity_map)
def _connection_for_bind(self, bind):
self._assert_is_active()
def _finalize_loaded(self, states):
for state, dict_ in states.items():
- state.commit_all(dict_)
+ state.commit_all(dict_, self.identity_map)
def refresh(self, instance, attribute_names=None):
"""Refresh the attributes on the given instance.
"""Expires all persistent instances within this Session."""
for state in self.identity_map.all_states():
- _expire_state(state, None)
+ _expire_state(state, None, instance_dict=self.identity_map)
def expire(self, instance, attribute_names=None):
"""Expire the attributes on an instance.
raise exc.UnmappedInstanceError(instance)
self._validate_persistent(state)
if attribute_names:
- _expire_state(state, attribute_names=attribute_names)
+ _expire_state(state, attribute_names=attribute_names, instance_dict=self.identity_map)
else:
# pre-fetch the full cascade since the expire is going to
# remove associations
cascaded = list(_cascade_state_iterator('refresh-expire', state))
- _expire_state(state, None)
+ _expire_state(state, None, instance_dict=self.identity_map)
for (state, m, o) in cascaded:
- _expire_state(state, None)
+ _expire_state(state, None, instance_dict=self.identity_map)
def prune(self):
"""Remove unreferenced instances cached in the identity map.
state.key = instance_key
self.identity_map.replace(state)
- state.commit_all(state.dict)
-
+ state.commit_all(state.dict, self.identity_map)
+
# remove from new last, might be the last strong ref
if state in self._new:
if self._enable_transaction_accounting and self.transaction:
prop.merge(self, instance, merged, dont_load, _recursive)
if dont_load:
- attributes.instance_state(merged).commit_all(attributes.instance_dict(merged)) # remove any history
+ attributes.instance_state(merged).commit_all(attributes.instance_dict(merged), self.identity_map) # remove any history
if new_instance:
merged_state._run_on_load(merged)
not self._deleted and not self._new):
return
-
dirty = self._dirty_states
if not dirty and not self._deleted and not self._new:
- self.identity_map.modified = False
+ self.identity_map._modified.clear()
return
flush_context = UOWTransaction(self)
raise exc.UnmappedInstanceError(o)
objset.add(state)
else:
- # or just everything
- objset = set(self.identity_map.all_states()).union(new)
+ objset = None
# store objects whose fate has been decided
processed = set()
# put all saves/updates into the flush context. detect top-level
# orphans and throw them into deleted.
- for state in new.union(dirty).intersection(objset).difference(deleted):
+ if objset:
+ proc = new.union(dirty).intersection(objset).difference(deleted)
+ else:
+ proc = new.union(dirty).difference(deleted)
+
+ for state in proc:
is_orphan = _state_mapper(state)._is_orphan(state)
if is_orphan and not _state_has_identity(state):
path = ", nor ".join(
processed.add(state)
# put all remaining deletes into the flush context.
- for state in deleted.intersection(objset).difference(processed):
+ if objset:
+ proc = deleted.intersection(objset).difference(processed)
+ else:
+ proc = deleted.difference(processed)
+ for state in proc:
flush_context.register_object(state, isdelete=True)
if len(flush_context.tasks) == 0:
flush_context.finalize_flush_changes()
- if not objects:
- self.identity_map.modified = False
-
+ # useful assertions:
+ #if not objects:
+ # assert not self.identity_map._modified
+ #else:
+ # assert self.identity_map._modified == self.identity_map._modified.difference(objects)
+ #self.identity_map._modified.clear()
+
for ext in self.extensions:
ext.after_flush_postexec(self, flush_context)
those that were possibly deleted.
"""
- return util.IdentitySet(
- [state
- for state in self.identity_map.all_states()
- if state.modified])
+ return self.identity_map._dirty_states()
@property
def dirty(self):
key for key in self.manager.iterkeys()
if key not in self.committed_state and key not in self.dict)
- def expire_attributes(self, attribute_names):
+ def expire_attributes(self, attribute_names, instance_dict=None):
self.expired_attributes = set(self.expired_attributes)
if attribute_names is None:
attribute_names = self.manager.keys()
self.expired = True
+ if self.modified:
+ if not instance_dict:
+ instance_dict = self._instance_dict()
+ if instance_dict:
+ instance_dict._modified.discard(self)
+ else:
+ instance_dict._modified.discard(self)
+
self.modified = False
filter_deferred = True
else:
if needs_committed:
self.committed_state[attr.key] = previous
+ if not self.modified:
+ instance_dict = self._instance_dict()
+ if instance_dict:
+ instance_dict._modified.add(self)
+
self.modified = True
self._strong_obj = self.obj()
- instance_dict = self._instance_dict()
- if instance_dict:
- instance_dict.modified = True
-
def commit(self, dict_, keys):
"""Commit attributes.
self.expired_attributes.remove(key)
self.callables.pop(key, None)
- def commit_all(self, dict_):
+ def commit_all(self, dict_, instance_dict=None):
"""commit all attributes unconditionally.
This is used after a flush() or a full load/refresh
if key in dict_:
self.manager[key].impl.commit_to_state(self, dict_, self.committed_state)
+ if instance_dict and self.modified:
+ instance_dict._modified.discard(self)
+
self.modified = self.expired = False
self._strong_obj = None
dict_[r.key] = value
-def source_changes(uowcommit, source, source_mapper, synchronize_pairs):
+def source_modified(uowcommit, source, source_mapper, synchronize_pairs):
+ """return true if the source object has changes from an old to a new value on the given
+ synchronize pairs
+
+ """
for l, r in synchronize_pairs:
try:
prop = source_mapper._get_col_to_prop(l)
except exc.UnmappedColumnError:
_raise_col_to_prop(False, source_mapper, l, None, r)
history = uowcommit.get_attribute_history(source, prop.key, passive=True)
- if history.has_changes():
- return True
- else:
- return False
-
-def dest_changes(uowcommit, dest, dest_mapper, synchronize_pairs):
- for l, r in synchronize_pairs:
- try:
- prop = dest_mapper._get_col_to_prop(r)
- except exc.UnmappedColumnError:
- _raise_col_to_prop(True, None, l, dest_mapper, r)
- history = uowcommit.get_attribute_history(dest, prop.key, passive=True)
- if history.has_changes():
+ if len(history.deleted):
return True
else:
return False
return history.as_state()
def register_object(self, state, isdelete=False, listonly=False, postupdate=False, post_update_cols=None):
+
# if object is not in the overall session, do nothing
if not self.session._contains_state(state):
if self._should_log_debug:
u1.address = a1
sess.add(a1)
sess.flush()
-
+
u1.username = 'ed'
def go():
sess.flush()
if passive_updates:
+ sess.expire(u1, ['address'])
self.assert_sql_count(testing.db, go, 1)
else:
self.assert_sql_count(testing.db, go, 2)
sess.flush()
self.assert_sql_count(testing.db, go, 0)
- assert a1.username == 'ed'
sess.expunge_all()
self.assertEquals([Address(username='ed')], sess.query(Address).all())
def go():
sess.flush()
if passive_updates:
+ sess.expire(u1, ['addresses'])
self.assert_sql_count(testing.db, go, 1)
else:
self.assert_sql_count(testing.db, go, 3)
u1 = sess.query(User).get('ed')
assert len(u1.addresses) == 2 # load addresses
u1.username = 'fred'
- print "--------------------------------"
def go():
sess.flush()
# check that the passive_updates is on on the other side
if passive_updates:
+ sess.expire(u1, ['addresses'])
self.assert_sql_count(testing.db, go, 1)
else:
self.assert_sql_count(testing.db, go, 3)
assert len(s.identity_map) == 1
user = s.query(User).one()
- assert not s.identity_map.modified
+ assert not s.identity_map._modified
user.name = 'u2'
- assert s.identity_map.modified
+ assert s.identity_map._modified
s.flush()
eq_(users.select().execute().fetchall(), [(user.id, 'u2')])