self.dispatch._active_history = True
self.expire_missing = expire_missing
- self._modified_token = None
+ self._modified_token = Event(self, OP_MODIFIED)
__slots__ = (
'class_', 'key', 'callable_', 'dispatch', 'trackparent',
'_modified_token', 'accepts_scalar_loader'
)
- def _init_modified_token(self):
- self._modified_token = Event(self, OP_MODIFIED)
- return self._modified_token
-
def __str__(self):
return "%s.%s" % (self.class_.__name__, self.key)
uses_objects = False
supports_population = True
collection = False
+ dynamic = False
__slots__ = '_replace_token', '_append_token', '_remove_token'
def __init__(self, *arg, **kw):
super(ScalarAttributeImpl, self).__init__(*arg, **kw)
- self._replace_token = self._append_token = None
- self._remove_token = None
-
- def _init_append_token(self):
self._replace_token = self._append_token = Event(self, OP_REPLACE)
- return self._replace_token
-
- _init_append_or_replace_token = _init_append_token
-
- def _init_remove_token(self):
self._remove_token = Event(self, OP_REMOVE)
- return self._remove_token
def delete(self, state, dict_):
def fire_replace_event(self, state, dict_, value, previous, initiator):
for fn in self.dispatch.set:
value = fn(
- state, value, previous,
- initiator or self._replace_token or
- self._init_append_or_replace_token())
+ state, value, previous, initiator or self._replace_token)
return value
def fire_remove_event(self, state, dict_, value, initiator):
for fn in self.dispatch.remove:
- fn(state, value,
- initiator or self._remove_token or self._init_remove_token())
+ fn(state, value, initiator or self._remove_token)
@property
def type(self):
def delete(self, state, dict_):
old = self.get(state, dict_)
- self.fire_remove_event(
- state, dict_, old,
- self._remove_token or self._init_remove_token())
+ self.fire_remove_event(state, dict_, old, self._remove_token)
del dict_[self.key]
def get_history(self, state, dict_, passive=PASSIVE_OFF):
self.sethasparent(instance_state(value), state, False)
for fn in self.dispatch.remove:
- fn(state, value, initiator or
- self._remove_token or self._init_remove_token())
+ fn(state, value, initiator or self._remove_token)
state._modified_event(dict_, self, value)
for fn in self.dispatch.set:
value = fn(
- state, value, previous, initiator or
- self._replace_token or self._init_append_or_replace_token())
+ state, value, previous, initiator or self._replace_token)
state._modified_event(dict_, self, previous)
uses_objects = True
supports_population = True
collection = True
+ dynamic = False
__slots__ = (
'copy', 'collection_factory', '_append_token', '_remove_token',
- '_duck_typed_as'
+ '_bulk_replace_token', '_duck_typed_as'
)
def __init__(self, class_, key, callable_, dispatch,
copy_function = self.__copy
self.copy = copy_function
self.collection_factory = typecallable
- self._append_token = None
- self._remove_token = None
+ self._append_token = Event(self, OP_APPEND)
+ self._remove_token = Event(self, OP_REMOVE)
+ self._bulk_replace_token = Event(self, OP_BULK_REPLACE)
self._duck_typed_as = util.duck_type_collection(
self.collection_factory())
def unlink(target, collection, collection_adapter):
collection._sa_linker(None)
- def _init_append_token(self):
- self._append_token = Event(self, OP_APPEND)
- return self._append_token
-
- def _init_remove_token(self):
- self._remove_token = Event(self, OP_REMOVE)
- return self._remove_token
-
def __copy(self, item):
return [y for y in collections.collection_adapter(item)]
def fire_append_event(self, state, dict_, value, initiator):
for fn in self.dispatch.append:
value = fn(
- state, value,
- initiator or self._append_token or self._init_append_token())
+ state, value, initiator or self._append_token)
state._modified_event(dict_, self, NEVER_SET, True)
self.sethasparent(instance_state(value), state, False)
for fn in self.dispatch.remove:
- fn(state, value,
- initiator or self._remove_token or self._init_remove_token())
+ fn(state, value, initiator or self._remove_token)
state._modified_event(dict_, self, NEVER_SET, True)
iterable = iter(iterable)
new_values = list(iterable)
- evt = Event(self, OP_BULK_REPLACE)
+ evt = self._bulk_replace_token
self.dispatch.bulk_replace(state, new_values, evt)
def backref_listeners(attribute, key, uselist):
"""Apply listeners to synchronize a two-way relationship."""
- # use easily recognizable names for stack traces
+ # use easily recognizable names for stack traces.
+
+ # in the sections marked "tokens to test for a recursive loop",
+ # this is somewhat brittle and very performance-sensitive logic
+ # that is specific to how we might arrive at each event. a marker
+ # that can target us directly to arguments being invoked against
+ # the impl might be simpler, but could interfere with other systems.
parent_token = attribute.impl.parent_token
parent_impl = attribute.impl
instance_dict(oldchild)
impl = old_state.manager[key].impl
- if initiator.impl is not impl or \
- initiator.op is OP_APPEND:
+ # tokens to test for a recursive loop.
+ if not impl.collection and not impl.dynamic:
+ check_recursive_token = impl._replace_token
+ else:
+ check_recursive_token = impl._remove_token
+
+ if initiator is not check_recursive_token:
impl.pop(old_state,
old_dict,
state.obj(),
- parent_impl._append_token or
- parent_impl._init_append_token(),
+ parent_impl._append_token,
passive=PASSIVE_NO_FETCH)
if child is not None:
child_state, child_dict = instance_state(child),\
instance_dict(child)
child_impl = child_state.manager[key].impl
+
if initiator.parent_token is not parent_token and \
initiator.parent_token is not child_impl.parent_token:
_acceptable_key_err(state, initiator, child_impl)
- elif initiator.impl is not child_impl or \
- initiator.op is OP_REMOVE:
+
+ # tokens to test for a recursive loop.
+ check_append_token = child_impl._append_token
+ check_bulk_replace_token = child_impl._bulk_replace_token \
+ if child_impl.collection else None
+
+ if initiator is not check_append_token and \
+ initiator is not check_bulk_replace_token:
child_impl.append(
child_state,
child_dict,
if initiator.parent_token is not parent_token and \
initiator.parent_token is not child_impl.parent_token:
_acceptable_key_err(state, initiator, child_impl)
- elif initiator.impl is not child_impl or \
- initiator.op is OP_REMOVE:
+
+ # tokens to test for a recursive loop.
+ check_append_token = child_impl._append_token
+ check_bulk_replace_token = child_impl._bulk_replace_token \
+ if child_impl.collection else None
+
+ if initiator is not check_append_token and \
+ initiator is not check_bulk_replace_token:
child_impl.append(
child_state,
child_dict,
child_state, child_dict = instance_state(child),\
instance_dict(child)
child_impl = child_state.manager[key].impl
- if initiator.impl is not child_impl or \
- initiator.op is OP_APPEND:
+
+ # tokens to test for a recursive loop.
+ if not child_impl.collection and not child_impl.dynamic:
+ check_remove_token = child_impl._remove_token
+ check_replace_token = child_impl._replace_token
+ else:
+ check_remove_token = child_impl._remove_token
+ check_replace_token = child_impl._bulk_replace_token \
+ if child_impl.collection else None
+
+ if initiator is not check_remove_token and \
+ initiator is not check_replace_token:
child_impl.pop(
child_state,
child_dict,
"""
state, dict_ = instance_state(instance), instance_dict(instance)
impl = state.manager[key].impl
- impl.dispatch.modified(
- state, impl._modified_token or impl._init_modified_token())
+ impl.dispatch.modified(state, impl._modified_token)
state._modified_event(dict_, impl, NO_VALUE, is_userland=True)
class_mapper, backref, sessionmaker, Session
from sqlalchemy.orm import attributes, exc as orm_exc
from sqlalchemy import testing
-from sqlalchemy.testing import eq_
+from sqlalchemy.testing import eq_, is_
from sqlalchemy.testing import fixtures
from test.orm import _fixtures
assert a1 not in u1.addresses
assert a1 in u2.addresses
+ def test_collection_assignment_mutates_previous_one(self):
+ User, Address = self.classes.User, self.classes.Address
+
+ u1 = User(name='jack')
+ u2 = User(name='ed')
+ a1 = Address(email_address='a1')
+ u1.addresses.append(a1)
+
+ is_(a1.user, u1)
+
+ u2.addresses = [a1]
+
+ eq_(u1.addresses, [])
+
+ is_(a1.user, u2)
+
+ def test_collection_assignment_mutates_previous_two(self):
+ User, Address = self.classes.User, self.classes.Address
+
+ u1 = User(name='jack')
+ a1 = Address(email_address='a1')
+
+ u1.addresses.append(a1)
+
+ is_(a1.user, u1)
+
+ u1.addresses = []
+ is_(a1.user, None)
+
+ def test_del_from_collection(self):
+ User, Address = self.classes.User, self.classes.Address
+
+ u1 = User(name='jack')
+ a1 = Address(email_address='a1')
+
+ u1.addresses.append(a1)
+
+ is_(a1.user, u1)
+
+ del u1.addresses[0]
+
+ is_(a1.user, None)
+
+ def test_del_from_scalar(self):
+ User, Address = self.classes.User, self.classes.Address
+
+ u1 = User(name='jack')
+ a1 = Address(email_address='a1')
+
+ u1.addresses.append(a1)
+
+ is_(a1.user, u1)
+
+ del a1.user
+
+ assert a1 not in u1.addresses
+
class O2OScalarBackrefMoveTest(_fixtures.FixtureTest):
run_inserts = None
session.commit()
eq_(k1.items, [i1])
+ def test_bulk_replace(self):
+ Item, Keyword = (self.classes.Item, self.classes.Keyword)
+
+ k1 = Keyword(name='k1')
+ k2 = Keyword(name='k2')
+ k3 = Keyword(name='k3')
+ i1 = Item(description='i1', keywords=[k1, k2])
+ i2 = Item(description='i2', keywords=[k3])
+
+ i1.keywords = [k2, k3]
+ assert i1 in k3.items
+ assert i2 in k3.items
+ assert i1 not in k1.items
+
class M2MScalarMoveTest(_fixtures.FixtureTest):
run_inserts = None