def _setattrbycolumn(self, obj, column, value):
self.columntoproperty[column][0].setattr(obj, value)
-
def save_obj(self, objects, uow, postupdate=False):
"""called by a UnitOfWork object to save objects, which involves either an INSERT or
list."""
#print "SAVE_OBJ MAPPER", self.class_.__name__, objects
connection = uow.transaction.connection(self)
+
+ if not postupdate:
+ for obj in objects:
+ if not hasattr(obj, "_instance_key"):
+ self.extension.before_insert(self, connection, obj)
+ else:
+ self.extension.before_update(self, connection, obj)
+
+ inserted_objects = util.Set()
+ updated_objects = util.Set()
for table in self.tables.sort(reverse=False):
#print "SAVE_OBJ table ", self.class_.__name__, table.name
# looping through our set of tables, which are all "real" tables, as opposed
statement = table.update(clause)
rows = 0
supports_sane_rowcount = True
+ def comparator(a, b):
+ for col in self.pks_by_table[table]:
+ x = cmp(a[1][col._label],b[1][col._label])
+ if x != 0:
+ return x
+ return 0
+ update.sort(comparator)
for rec in update:
(obj, params) = rec
c = connection.execute(statement, params)
self._postfetch(connection, table, obj, c, c.last_updated_params())
- self.extension.after_update(self, connection, obj)
+
+ updated_objects.add(obj)
rows += c.cursor.rowcount
+
if c.supports_sane_rowcount() and rows != len(update):
raise exceptions.FlushError("ConcurrencyError - updated rowcount %d does not match number of objects updated %d" % (rows, len(update)))
+
if len(insert):
statement = table.insert()
for rec in insert:
mapper._synchronizer.execute(obj, obj)
sync(self)
- self.extension.after_insert(self, connection, obj)
+ inserted_objects.add(obj)
+ if not postupdate:
+ [self.extension.after_insert(self, connection, obj) for obj in inserted_objects]
+ [self.extension.after_update(self, connection, obj) for obj in updated_objects]
def _postfetch(self, connection, table, obj, resultproxy, params):
"""after an INSERT or UPDATE, asks the returned result if PassiveDefaults fired off on the database side
DELETE statement for each table used by this mapper, for each object in the list."""
connection = uow.transaction.connection(self)
#print "DELETE_OBJ MAPPER", self.class_.__name__, objects
-
+
+ [self.extension.before_delete(self, connection, obj) for obj in objects]
+ deleted_objects = util.Set()
for table in self.tables.sort(reverse=True):
if not self._has_pks(table):
continue
delete = []
- deleted_objects = []
for obj in objects:
params = {}
if not hasattr(obj, "_instance_key"):
params[col.key] = self._getattrbycolumn(obj, col)
if self.version_id_col is not None:
params[self.version_id_col.key] = self._getattrbycolumn(obj, self.version_id_col)
- self.extension.before_delete(self, connection, obj)
- deleted_objects.append(obj)
+ deleted_objects.add(obj)
if len(delete):
+ def comparator(a, b):
+ for col in self.pks_by_table[table]:
+ x = cmp(a[col.key],b[col.key])
+ if x != 0:
+ return x
+ return 0
+ delete.sort(comparator)
clause = sql.and_()
for col in self.pks_by_table[table]:
clause.clauses.append(col == sql.bindparam(col.key, type=col.type))
c = connection.execute(statement, delete)
if c.supports_sane_rowcount() and c.rowcount != len(delete):
raise exceptions.FlushError("ConcurrencyError - updated rowcount %d does not match number of objects updated %d" % (c.cursor.rowcount, len(delete)))
- for obj in deleted_objects:
- self.extension.after_delete(self, connection, obj)
+
+ [self.extension.after_delete(self, connection, obj) for obj in deleted_objects]
def _has_pks(self, table):
try: