from . import loading
-def save_obj(base_mapper, states, uowtransaction, single=False):
+def save_obj(
+ base_mapper, states, uowtransaction, single=False,
+ bookkeeping=True):
"""Issue ``INSERT`` and/or ``UPDATE`` statements for a list
of objects.
states_to_insert, states_to_update = _organize_states_for_save(
base_mapper,
states,
- uowtransaction)
+ uowtransaction, bookkeeping)
cached_connections = _cached_connection_dict(base_mapper)
for table, mapper in base_mapper._sorted_tables.items():
insert = _collect_insert_commands(base_mapper, uowtransaction,
- table, states_to_insert)
+ table, states_to_insert,
+ bookkeeping)
update = _collect_update_commands(base_mapper, uowtransaction,
table, states_to_update)
mapper, table, insert)
_finalize_insert_update_commands(base_mapper, uowtransaction,
- states_to_insert, states_to_update)
+ states_to_insert, states_to_update,
+ bookkeeping)
def post_update(base_mapper, states, uowtransaction, post_update_cols):
mapper.dispatch.after_delete(mapper, connection, state)
-def _organize_states_for_save(base_mapper, states, uowtransaction):
+def _organize_states_for_save(
+ base_mapper, states, uowtransaction, bookkeeping):
"""Make an initial pass across a set of states for INSERT or
UPDATE.
# no instance_key attached to it), and another instance
# with the same identity key already exists as persistent.
# convert to an UPDATE if so.
- if not has_identity and \
+ if bookkeeping and not has_identity and \
instance_key in uowtransaction.session.identity_map:
instance = \
uowtransaction.session.identity_map[instance_key]
def _collect_insert_commands(base_mapper, uowtransaction, table,
- states_to_insert):
+ states_to_insert, bookkeeping):
"""Identify sets of values to use in INSERT statements for a
list of states.
value = state_dict.get(prop.key, None)
if value is None:
- if col in pks:
+ if bookkeeping and col in pks:
has_all_pks = False
elif col.default is None and \
col.server_default is None:
params[col.key] = value
- elif col.server_default is not None and \
+ elif bookkeeping and col.server_default is not None and \
mapper.base_mapper.eager_defaults:
has_all_defaults = False
def _finalize_insert_update_commands(base_mapper, uowtransaction,
- states_to_insert, states_to_update):
+ states_to_insert, states_to_update,
+ bookkeeping):
"""finalize state on states that have been inserted or updated,
including calling after_insert/after_update events.
instance_key, row_switch in states_to_insert + \
states_to_update:
- if mapper._readonly_props:
- readonly = state.unmodified_intersection(
- [p.key for p in mapper._readonly_props
- if p.expire_on_flush or p.key not in state.dict]
- )
- if readonly:
- state._expire_attributes(state.dict, readonly)
-
- # if eager_defaults option is enabled, load
- # all expired cols. Else if we have a version_id_col, make sure
- # it isn't expired.
- toload_now = []
-
- if base_mapper.eager_defaults:
- toload_now.extend(state._unloaded_non_object)
- elif mapper.version_id_col is not None and \
- mapper.version_id_generator is False:
- prop = mapper._columntoproperty[mapper.version_id_col]
- if prop.key in state.unloaded:
- toload_now.extend([prop.key])
-
- if toload_now:
- state.key = base_mapper._identity_key_from_state(state)
- loading.load_on_ident(
- uowtransaction.session.query(base_mapper),
- state.key, refresh_state=state,
- only_load_props=toload_now)
+ if bookkeeping:
+ if mapper._readonly_props:
+ readonly = state.unmodified_intersection(
+ [p.key for p in mapper._readonly_props
+ if p.expire_on_flush or p.key not in state.dict]
+ )
+ if readonly:
+ state._expire_attributes(state.dict, readonly)
+
+ # if eager_defaults option is enabled, load
+ # all expired cols. Else if we have a version_id_col, make sure
+ # it isn't expired.
+ toload_now = []
+
+ if base_mapper.eager_defaults:
+ toload_now.extend(state._unloaded_non_object)
+ elif mapper.version_id_col is not None and \
+ mapper.version_id_generator is False:
+ prop = mapper._columntoproperty[mapper.version_id_col]
+ if prop.key in state.unloaded:
+ toload_now.extend([prop.key])
+
+ if toload_now:
+ state.key = base_mapper._identity_key_from_state(state)
+ loading.load_on_ident(
+ uowtransaction.session.query(base_mapper),
+ state.key, refresh_state=state,
+ only_load_props=toload_now)
# call after_XXX extensions
if not has_identity:
with util.safe_reraise():
transaction.rollback(_capture_exception=True)
+ def bulk_save(self, objects):
+ self._flushing = True
+ flush_context = UOWTransaction(self)
+
+ if self.dispatch.before_bulk_save:
+ self.dispatch.before_bulk_save(
+ self, flush_context, objects)
+
+ flush_context.transaction = transaction = self.begin(
+ subtransactions=True)
+ try:
+ self._warn_on_events = True
+ try:
+ flush_context.bulk_save(objects)
+ finally:
+ self._warn_on_events = False
+
+ self.dispatch.after_bulk_save(
+ self, flush_context, objects
+ )
+
+ flush_context.finalize_flush_changes()
+
+ self.dispatch.after_bulk_save_postexec(
+ self, flush_context, objects)
+
+ transaction.commit()
+
+ except:
+ with util.safe_reraise():
+ transaction.rollback(_capture_exception=True)
+ finally:
+ self._flushing = False
+
def is_modified(self, instance, include_collections=True,
passive=True):
"""Return ``True`` if the given instance has locally
from .. import util, event
from ..util import topological
from . import attributes, persistence, util as orm_util
+import itertools
def track_cascade_events(descriptor, prop):
execute() method has succeeded and the transaction has been committed.
"""
+ if not self.states:
+ return
+
states = set(self.states)
isdel = set(
s for (s, (isdelete, listonly)) in self.states.items()
if isdelete
)
other = states.difference(isdel)
- self.session._remove_newly_deleted(isdel)
- self.session._register_newly_persistent(other)
+ if isdel:
+ self.session._remove_newly_deleted(isdel)
+ if other:
+ self.session._register_newly_persistent(other)
+
+ def bulk_save(self, objects):
+ for (base_mapper, in_session), states in itertools.groupby(
+ (attributes.instance_state(obj) for obj in objects),
+ lambda state:
+ (
+ state.mapper.base_mapper,
+ state.key is self.session.hash_key
+ )):
+
+ persistence.save_obj(
+ base_mapper, list(states), self, bookkeeping=in_session)
+
+ if in_session:
+ self.states.update(
+ (state, (False, False))
+ for state in states
+ )
class IterateMappersMixin(object):