raise NotImplementedError()
- def last_inserted_params(self):
- """Return a dictionary of the full parameter dictionary for the last
- compiled INSERT statement.
-
- Includes any ColumnDefaults or Sequences that were pre-executed.
- """
-
- raise NotImplementedError()
-
- def last_updated_params(self):
- """Return a dictionary of the full parameter dictionary for the last
- compiled UPDATE statement.
-
- Includes any ColumnDefaults that were pre-executed.
- """
-
- raise NotImplementedError()
-
def lastrow_has_defaults(self):
"""Return True if the last INSERT or UPDATE row contained
inlined or database-side defaults.
did not explicitly specify returning().
"""
- if not self.context.isinsert:
- raise exc.InvalidRequestError(
- "Statement is not an insert() expression construct.")
- elif self.context._is_explicit_returning:
- raise exc.InvalidRequestError(
- "Can't call inserted_primary_key when returning() "
- "is used.")
return self.context._inserted_primary_key
"""Return the primary key for the row just inserted."""
return self.inserted_primary_key
-
+
def last_updated_params(self):
"""Return ``last_updated_params()`` from the underlying
ExecutionContext.
See ExecutionContext for details.
"""
-
- return self.context.last_updated_params()
+
+ return self.context.last_updated_params
def last_inserted_params(self):
"""Return ``last_inserted_params()`` from the underlying
See ExecutionContext for details.
"""
- return self.context.last_inserted_params()
+ return self.context.last_inserted_params
def lastrow_has_defaults(self):
"""Return ``lastrow_has_defaults()`` from the underlying
ipk.append(row[c])
self._inserted_primary_key = ipk
-
- def last_inserted_params(self):
- return self._last_inserted_params
-
- def last_updated_params(self):
- return self._last_updated_params
-
+
def lastrow_has_defaults(self):
return hasattr(self, 'postfetch_cols') and len(self.postfetch_cols)
return None
else:
return self._exec_default(column.onupdate)
-
+
+ @util.memoized_property
+ def _inserted_primary_key(self):
+
+ if not self.isinsert:
+ raise exc.InvalidRequestError(
+ "Statement is not an insert() expression construct.")
+ elif self._is_explicit_returning:
+ raise exc.InvalidRequestError(
+ "Can't call inserted_primary_key when returning() "
+ "is used.")
+
+
+ # lazyily evaluate inserted_primary_key for executemany.
+ # for execute(), its already in __dict__.
+ if self.executemany:
+ return [
+ [compiled_parameters.get(c.key, None)
+ for c in self.compiled.\
+ statement.table.primary_key
+ ] for compiled_parameters in self.compiled_parameters
+ ]
+ else:
+ # _inserted_primary_key should be calced here
+ assert False
+
def __process_defaults(self):
"""Generate default values for compiled insert/update statements,
and generate inserted_primary_key collection.
param[c.key] = val
del self.current_parameters
+ if self.isinsert:
+ self.last_inserted_params = self.compiled_parameters
+ else:
+ self.last_updated_params = self.compiled_parameters
+
else:
self.current_parameters = compiled_parameters = \
self.compiled_parameters[0]
if val is not None:
compiled_parameters[c.key] = val
del self.current_parameters
-
- if self.isinsert:
+
+ if self.isinsert and not self._is_explicit_returning:
self._inserted_primary_key = [
- compiled_parameters.get(c.key, None)
- for c in self.compiled.\
+ self.compiled_parameters[0].get(c.key, None)
+ for c in self.compiled.\
statement.table.primary_key
- ]
- self._last_inserted_params = compiled_parameters
+ ]
+
+ if self.isinsert:
+ self.last_inserted_params = compiled_parameters
else:
- self._last_updated_params = compiled_parameters
+ self.last_updated_params = compiled_parameters
- self.postfetch_cols = self.compiled.postfetch
- self.prefetch_cols = self.compiled.prefetch
+ self.postfetch_cols = self.compiled.postfetch
+ self.prefetch_cols = self.compiled.prefetch
DefaultDialect.execution_ctx_cls = DefaultExecutionContext
params = {}
value_params = {}
hasdata = False
-
+ has_all_pks = True
+
if isinsert:
for col in mapper._cols_by_table[table]:
if col is mapper.version_id_col:
col not in pks:
params[col.key] = value
+ elif col in pks:
+ has_all_pks = False
elif isinstance(value, sql.ClauseElement):
value_params[col] = value
else:
params[col.key] = value
insert.append((state, state_dict, params, mapper,
- connection, value_params))
+ connection, value_params, has_all_pks))
else:
for col in mapper._cols_by_table[table]:
if col is mapper.version_id_col:
statement = self._memo(('update', table), update_stmt)
rows = 0
+ postfetch = []
for state, state_dict, params, mapper, \
connection, value_params in update:
else:
c = cached_connections[connection].\
execute(statement, params)
-
- mapper._postfetch(uowtransaction, table,
- state, state_dict, c,
- c.last_updated_params(), value_params)
-
+
+ postfetch.append((mapper, state, state_dict,
+ c.prefetch_cols(), c.postfetch_cols(),
+ c.last_updated_params(), value_params))
rows += c.rowcount
+ for mapper, pf in groupby(
+ postfetch, lambda rec: rec[0]
+ ):
+ mapper._postfetch(uowtransaction, table, pf)
+
+
if connection.dialect.supports_sane_rowcount:
if rows != len(update):
raise orm_exc.StaleDataError(
if insert:
statement = self._memo(('insert', table), table.insert)
+ postfetch = []
- for state, state_dict, params, mapper, \
- connection, value_params in insert:
-
- if value_params:
- c = connection.execute(
- statement.values(value_params),
- params)
- else:
+ for (connection, pkeys, hasvalue, has_all_pks), records in groupby(
+ insert, lambda rec: (rec[4], rec[2].keys(), bool(rec[5]), rec[6])
+ ):
+ if has_all_pks and not hasvalue:
+ records = list(records)
+ multiparams = [params for state, state_dict,
+ params, mapper, conn, value_params,
+ has_all_pks in records]
c = cached_connections[connection].\
- execute(statement, params)
+ execute(statement, multiparams)
+
+ for (state, state_dict, params, mapper, conn, value_params, has_all_pks), \
+ last_inserted_params in zip(records, c.context.compiled_parameters):
+ postfetch.append((mapper, state, state_dict,
+ c.prefetch_cols(), c.postfetch_cols(),
+ last_inserted_params, {}))
+
+ else:
+ for state, state_dict, params, mapper, \
+ connection, value_params, has_all_pks in records:
+
+ if value_params:
+ c = connection.execute(
+ statement.values(value_params),
+ params)
+ else:
+ c = cached_connections[connection].\
+ execute(statement, params)
- primary_key = c.inserted_primary_key
-
- if primary_key is not None:
- # set primary key attributes
- for pk, col in zip(primary_key,
- mapper._pks_by_table[table]):
- # TODO: make sure this inlined code is OK
- # with composites
- prop = mapper._columntoproperty[col]
- if state_dict.get(prop.key) is None:
- # TODO: would rather say:
- #state_dict[prop.key] = pk
- mapper._set_state_attr_by_column(state,
- state_dict,
- col, pk)
-
- mapper._postfetch(uowtransaction, table,
- state, state_dict, c,
- c.last_inserted_params(),
- value_params)
+ primary_key = c.inserted_primary_key
+
+ if primary_key is not None:
+ # set primary key attributes
+ for pk, col in zip(primary_key,
+ mapper._pks_by_table[table]):
+ # TODO: make sure this inlined code is OK
+ # with composites
+ prop = mapper._columntoproperty[col]
+ if state_dict.get(prop.key) is None:
+ # TODO: would rather say:
+ #state_dict[prop.key] = pk
+ mapper._set_state_attr_by_column(state,
+ state_dict,
+ col, pk)
+
+ postfetch.append((mapper, state, state_dict,
+ c.prefetch_cols(), c.postfetch_cols(),
+ c.last_inserted_params(), value_params))
+
+ for mapper, pf in groupby(
+ postfetch, lambda rec: rec[0]
+ ):
+ mapper._postfetch(uowtransaction, table, pf)
for state, state_dict, mapper, connection, has_identity, \
instance_key, row_switch in tups:
mapper.dispatch.on_after_update(mapper, connection, state)
def _postfetch(self, uowtransaction, table,
- state, dict_, resultproxy,
- params, value_params):
+ recs):
"""During a flush, expire attributes in need of newly
persisted database state."""
- postfetch_cols = resultproxy.postfetch_cols()
- generated_cols = list(resultproxy.prefetch_cols())
-
- if self.version_id_col is not None:
- generated_cols.append(self.version_id_col)
-
- for c in generated_cols:
- if c.key in params and c in self._columntoproperty:
- self._set_state_attr_by_column(state, dict_, c, params[c.key])
-
- if postfetch_cols:
- sessionlib._expire_state(state, state.dict,
- [self._columntoproperty[c].key
- for c in postfetch_cols]
- )
-
- # synchronize newly inserted ids from one table to the next
- # TODO: this still goes a little too often. would be nice to
- # have definitive list of "columns that changed" here
- for m, equated_pairs in self._table_to_equated[table]:
- sync.populate(state, m, state, m,
- equated_pairs,
- uowtransaction,
- self.passive_updates)
+ for m, state, dict_, prefetch_cols, postfetch_cols, \
+ params, value_params in recs:
+ postfetch_cols = postfetch_cols
+ generated_cols = list(prefetch_cols)
+
+ if self.version_id_col is not None:
+ generated_cols.append(self.version_id_col)
+
+ for c in generated_cols:
+ if c.key in params and c in self._columntoproperty:
+ self._set_state_attr_by_column(state, dict_, c, params[c.key])
+
+ if postfetch_cols:
+ sessionlib._expire_state(state, state.dict,
+ [self._columntoproperty[c].key
+ for c in postfetch_cols]
+ )
+
+ # synchronize newly inserted ids from one table to the next
+ # TODO: this still goes a little too often. would be nice to
+ # have definitive list of "columns that changed" here
+ for m, equated_pairs in self._table_to_equated[table]:
+ sync.populate(state, m, state, m,
+ equated_pairs,
+ uowtransaction,
+ self.passive_updates)
@util.memoized_property
def _table_to_equated(self):