strategies for producing queries with uniquely-
named columns. [ticket:1729].
+ - [feature] A warning is emitted when a reference
+ to an instrumented collection is no longer
+ associated with the parent class due to
+ expiration/attribute refresh/collection
+ replacement, but an append
+ or remove operation is received on the
+ now-detached collection. [ticket:2476]
+
- [removed] Deprecated identifiers removed:
* allow_null_pks mapper() argument
accepts_scalar_loader = True
uses_objects = False
supports_population = True
+ collection = False
def delete(self, state, dict_):
accepts_scalar_loader = False
uses_objects = True
supports_population = True
+ collection = False
def delete(self, state, dict_):
old = self.get(state, dict_)
accepts_scalar_loader = False
uses_objects = True
supports_population = True
+ collection = True
def __init__(self, class_, key, callable_, dispatch,
typecallable=None, trackparent=False, extension=None,
collections.bulk_replace(new_values, old_collection, new_collection)
old_collection.unlink(old)
+ def _invalidate_collection(self, collection):
+ adapter = getattr(collection, '_sa_adapter')
+ adapter.invalidated = True
+
def set_committed_value(self, state, dict_, value):
"""Set an attribute value on the given instance and 'commit' it."""
of custom methods, such as to unwrap Zope security proxies.
"""
+ invalidated = False
+
def __init__(self, attr, owner_state, data):
self._key = attr.key
self._data = weakref.ref(data)
self.owner_state = owner_state
self.link_to_self(data)
+ def _warn_invalidated(self):
+ util.warn("This collection has been invalidated.")
+
@property
def data(self):
"The entity collection being adapted."
"""
if initiator is not False and item is not None:
+ if self.invalidated:
+ self._warn_invalidated()
return self.attr.fire_append_event(
self.owner_state,
self.owner_state.dict,
"""
if initiator is not False and item is not None:
+ if self.invalidated:
+ self._warn_invalidated()
self.attr.fire_remove_event(
self.owner_state,
self.owner_state.dict,
fire_remove_event().
"""
+ if self.invalidated:
+ self._warn_invalidated()
self.attr.fire_pre_remove_event(
self.owner_state,
self.owner_state.dict,
class _ProxyImpl(object):
accepts_scalar_loader = False
expire_missing = True
+ collection = False
def __init__(self, key):
self.key = key
uses_objects = True
accepts_scalar_loader = False
supports_population = False
+ collection = False
def __init__(self, class_, key, typecallable,
dispatch,
"""Remove the given attribute and any
callables associated with it."""
- dict_.pop(key, None)
+ old = dict_.pop(key, None)
+ if old is not None and self.manager[key].impl.collection:
+ self.manager[key].impl._invalidate_collection(old)
self.callables.pop(key, None)
def _expire_attribute_pre_commit(self, dict_, key):
The additional bookkeeping is finished up in commit_all().
+ Should only be called for scalar attributes.
+
This method is actually called a lot with joined-table
loading, when the second table isn't present in the result.
"""Remove the given attribute and set the given callable
as a loader."""
- dict_.pop(key, None)
+ old = dict_.pop(key, None)
+ if old is not None and self.manager[key].impl.collection:
+ self.manager[key].impl._invalidate_collection(old)
self.callables[key] = callable_
def _expire(self, dict_, modified_set):
if impl.accepts_scalar_loader and \
(impl.expire_missing or key in dict_):
self.callables[key] = self
- dict_.pop(key, None)
+ old = dict_.pop(key, None)
+ if impl.collection and old is not None:
+ impl._invalidate_collection(old)
self.manager.dispatch.expire(self, None)
impl = self.manager[key].impl
if impl.accepts_scalar_loader:
self.callables[key] = self
- dict_.pop(key, None)
+ old = dict_.pop(key, None)
+ if impl.collection and old is not None:
+ impl._invalidate_collection(old)
self.committed_state.pop(key, None)
if pending:
assert_raises_message
from test.lib import fixtures
from test.lib.util import gc_collect, all_partial_orderings
-from sqlalchemy.util import cmp, jython, topological
+from sqlalchemy.util import jython
from sqlalchemy import event
# global for pickling tests
teardown()
+
+class TestUnlink(fixtures.TestBase):
+ def setUp(self):
+ class A(object):
+ pass
+ class B(object):
+ pass
+ self.A = A
+ self.B = B
+ instrumentation.register_class(A)
+ instrumentation.register_class(B)
+ attributes.register_attribute(A, 'bs', uselist=True,
+ useobject=True)
+
+ def test_expired(self):
+ A, B = self.A, self.B
+ a1 = A()
+ coll = a1.bs
+ a1.bs.append(B())
+ state = attributes.instance_state(a1)
+ state._expire(state.dict, set())
+ assert_raises(
+ Warning,
+ coll.append, B()
+ )
+
+ def test_replaced(self):
+ A, B = self.A, self.B
+ a1 = A()
+ coll = a1.bs
+ a1.bs.append(B())
+ a1.bs = []
+ # a bulk replace empties the old collection
+ assert len(coll) == 0
+ coll.append(B())
+ assert len(coll) == 1
+
+ def test_pop_existing(self):
+ A, B = self.A, self.B
+ a1 = A()
+ coll = a1.bs
+ a1.bs.append(B())
+ state = attributes.instance_state(a1)
+ state._reset(state.dict, "bs")
+ assert_raises(
+ Warning,
+ coll.append, B()
+ )
+
+ def test_ad_hoc_lazy(self):
+ A, B = self.A, self.B
+ a1 = A()
+ coll = a1.bs
+ a1.bs.append(B())
+ state = attributes.instance_state(a1)
+ state._set_callable(state.dict, "bs", lambda: B())
+ assert_raises(
+ Warning,
+ coll.append, B()
+ )