parameters but this would require further development.
[ticket:1357]
+ - Fixed another location where autoflush was interfering
+ with session.merge(). autoflush is disabled completely
+ for the duration of merge() now. [ticket:1360]
+
- sql
- ``sqlalchemy.extract()`` is now dialect sensitive and can
extract components of timestamps idiomatically across the
dest_list = []
for current in instances:
_recursive[(current, self)] = True
- obj = session.merge(current, dont_load=dont_load, _recursive=_recursive)
+ obj = session._merge(current, dont_load=dont_load, _recursive=_recursive)
if obj is not None:
dest_list.append(obj)
if dont_load:
current = instances[0]
if current is not None:
_recursive[(current, self)] = True
- obj = session.merge(current, dont_load=dont_load, _recursive=_recursive)
+ obj = session._merge(current, dont_load=dont_load, _recursive=_recursive)
if obj is not None:
if dont_load:
dest_state.dict[self.key] = obj
for state, m, o in cascade_states:
self._delete_impl(state)
- def merge(self, instance, dont_load=False,
- _recursive=None):
+ def merge(self, instance, dont_load=False):
"""Copy the state an instance onto the persistent instance with the same identifier.
If there is no persistent instance currently associated with the
mapped with ``cascade="merge"``.
"""
- if _recursive is None:
- # TODO: this should be an IdentityDict for instances, but will
- # need a separate dict for PropertyLoader tuples
- _recursive = {}
- # Autoflush only on the topmost call
- self._autoflush()
-
+ # TODO: this should be an IdentityDict for instances, but will
+ # need a separate dict for PropertyLoader tuples
+ _recursive = {}
+ self._autoflush()
+ autoflush = self.autoflush
+ try:
+ self.autoflush = False
+ return self._merge(instance, dont_load=dont_load, _recursive=_recursive)
+ finally:
+ self.autoflush = autoflush
+
+ def _merge(self, instance, dont_load=False, _recursive=None):
mapper = _object_mapper(instance)
if instance in _recursive:
return _recursive[instance]
new_instance = False
state = attributes.instance_state(instance)
key = state.key
+
if key is None:
if dont_load:
raise sa_exc.InvalidRequestError(
self._update_impl(merged_state)
new_instance = True
else:
- merged = self.query(mapper.class_).autoflush(False).get(key[1])
+ merged = self.query(mapper.class_).get(key[1])
if merged is None:
merged = mapper.class_manager.new_instance()
sess.flush()
assert merged_user not in sess.new
+ @testing.resolve_artifact_names
+ def test_cascades_dont_autoflush_2(self):
+ mapper(User, users, properties={
+ 'addresses':relation(Address,
+ backref='user',
+ cascade="all, delete-orphan")
+ })
+ mapper(Address, addresses)
+
+ u = User(id=7, name='fred', addresses=[
+ Address(id=1, email_address='fred1'),
+ ])
+ sess = create_session(autoflush=True, autocommit=False)
+ sess.add(u)
+ sess.commit()
+
+ sess.expunge_all()
+
+ u = User(id=7, name='fred', addresses=[
+ Address(id=1, email_address='fred1'),
+ Address(id=2, email_address='fred2'),
+ ])
+ sess.merge(u)
+ assert sess.autoflush
+ sess.commit()
+
+
+
+
if __name__ == "__main__":
testenv.main()