def get(self, state, passive=False):
if passive:
- return self._get_collection(state, passive=True).added_items
+ return self._get_collection_history(state, passive=True).added_items
else:
return AppenderQuery(self, state)
- def get_collection(self, state, user_data=None):
- return self._get_collection(state, passive=True).added_items
+ def get_collection(self, state, user_data=None, passive=True):
+ if passive:
+ return self._get_collection_history(state, passive=passive).added_items
+ else:
+ history = self._get_collection_history(state, passive=passive)
+ return history.added_items + history.unchanged_items
def fire_append_event(self, state, value, initiator):
state.modified = True
raise NotImplementedError()
def get_history(self, state, passive=False):
- c = self._get_collection(state, passive)
+ c = self._get_collection_history(state, passive)
return (c.added_items, c.unchanged_items, c.deleted_items)
- def _get_collection(self, state, passive=False):
+ def _get_collection_history(self, state, passive=False):
try:
c = state.dict[self.key]
except KeyError:
def append(self, state, value, initiator, passive=False):
if initiator is not self:
- self._get_collection(state, passive=True).added_items.append(value)
+ self._get_collection_history(state, passive=True).added_items.append(value)
self.fire_append_event(state, value, initiator)
def remove(self, state, value, initiator, passive=False):
if initiator is not self:
- self._get_collection(state, passive=True).deleted_items.append(value)
+ self._get_collection_history(state, passive=True).deleted_items.append(value)
self.fire_remove_event(state, value, initiator)
def __iter__(self):
sess = self.__session()
if sess is None:
- return iter(self.attr._get_collection(self.instance._state, passive=True).added_items)
+ return iter(self.attr._get_collection_history(self.instance._state, passive=True).added_items)
else:
return iter(self._clone(sess))
def __getitem__(self, index):
sess = self.__session()
if sess is None:
- return self.attr._get_collection(self.instance._state, passive=True).added_items.__getitem__(index)
+ return self.attr._get_collection_history(self.instance._state, passive=True).added_items.__getitem__(index)
else:
return self._clone(sess).__getitem__(index)
def count(self):
sess = self.__session()
if sess is None:
- return len(self.attr._get_collection(self.instance._state, passive=True).added_items)
+ return len(self.attr._get_collection_history(self.instance._state, passive=True).added_items)
else:
return self._clone(sess).count()
oldlist = list(self)
else:
oldlist = []
- self.attr._get_collection(self.instance._state, passive=True).replace(oldlist, collection)
+ self.attr._get_collection_history(self.instance._state, passive=True).replace(oldlist, collection)
return oldlist
def append(self, item):
] == sess.query(User).all()
@testing.fails_on('maxdb')
- def test_delete(self):
+ def test_delete_nocascade(self):
mapper(User, users, properties={
'addresses':dynamic_loader(mapper(Address, addresses), backref='user')
})
sess.delete(u.addresses[3])
assert [Address(email_address='a'), Address(email_address='b'), Address(email_address='d')] == list(u.addresses)
+ sess.clear()
+ u = sess.query(User).get(u.id)
+
sess.delete(u)
# u.addresses relation will have to force the load
assert testing.db.scalar(addresses.count(addresses.c.user_id != None)) ==0
+ @testing.fails_on('maxdb')
+ def test_delete_cascade(self):
+ mapper(User, users, properties={
+ 'addresses':dynamic_loader(mapper(Address, addresses), backref='user', cascade="all, delete-orphan")
+ })
+ sess = create_session(autoflush=True)
+ u = User(name='ed')
+ u.addresses.append(Address(email_address='a'))
+ u.addresses.append(Address(email_address='b'))
+ u.addresses.append(Address(email_address='c'))
+ u.addresses.append(Address(email_address='d'))
+ u.addresses.append(Address(email_address='e'))
+ u.addresses.append(Address(email_address='f'))
+ sess.save(u)
+
+ assert Address(email_address='c') == u.addresses[2]
+ sess.delete(u.addresses[2])
+ sess.delete(u.addresses[4])
+ sess.delete(u.addresses[3])
+ assert [Address(email_address='a'), Address(email_address='b'), Address(email_address='d')] == list(u.addresses)
+
+ sess.clear()
+ u = sess.query(User).get(u.id)
+
+ sess.delete(u)
+
+ # u.addresses relation will have to force the load
+ # of all addresses so that they can be updated
+ sess.flush()
+ sess.close()
+
+ assert testing.db.scalar(addresses.count()) ==0
+
@testing.fails_on('maxdb')
def test_remove_orphans(self):
mapper(User, users, properties={