have not, you might notice your apps using a lot fewer queries than
before in some situations. [ticket:871]
+ - mutable primary key support is added. primary key columns can be
+ changed freely, and the identity of the instance will change upon
+ flush. In addition, update cascades of foreign key referents (primary
+ key or not) along relations are supported, either in tandem with the
+ database's ON UPDATE CASCADE (required for DB's like Postgres) or
+ issued directly by the ORM in the form of UPDATE statements, by setting
+ the flag "passive_cascades=False".
+
- added "cascade delete" behavior to "dynamic" relations just like
that of regular relations. if passive_deletes flag (also just added)
is not set, a delete of the parent item will trigger a full load of
When `passive_deletes` is applied, the `children` relation will not be loaded into memory when an instance of `MyClass` is marked for deletion. The `cascade="all, delete-orphan"` *will* take effect for instances of `MyOtherClass` which are currently present in the session; however for instances of `MyOtherClass` which are not loaded, SQLAlchemy assumes that "ON DELETE CASCADE" rules will ensure that those rows are deleted by the database and that no foreign key violation will occur.
+#### Mutable Primary Keys / Update Cascades {@name=mutablepks}
+
+As of SQLAlchemy 0.4.2, the primary key attributes of an instance can be changed freely, and will be persisted upon flush. When the primary key of an entity changes, related items which reference the primary key must also be updated as well. For databases which enforce referential integrity, it's required to use the database's ON UPDATE CASCADE functionality in order to propagate primary key changes. For those which don't, the `passive_cascades` flag can be set to `False` which instructs SQLAlchemy to issue UPDATE statements individually. The `passive_cascades` flag can also be `False` in conjunction with ON UPDATE CASCADE functionality, although in that case it issues UPDATE statements unnecessarily.
+
+A typical mutable primary key setup might look like:
+
+ {python}
+ users = Table('users', metadata,
+ Column('username', String(50), primary_key=True),
+ Column('fullname', String(100)))
+
+ addresses = Table('addresses', metadata,
+ Column('email', String(50), primary_key=True),
+ Column('username', String(50), ForeignKey('users.username', onupdate="cascade")))
+
+ class User(object):
+ pass
+ class Address(object):
+ pass
+
+ mapper(User, users, properties={
+ 'addresses':relation(Address, passive_updates=False)
+ })
+ mapper(Address, addresses)
+
+passive_updates is set to `True` by default. Foreign key references to non-primary key columns are supported as well.
indicates the ordering that should be applied when loading these items.
passive_deletes=False
- Indicates the behavior of delete operations.
+ Indicates loading behavior during delete operations.
+
A value of True indicates that unloaded child items should not be loaded
during a delete operation on the parent. Normally, when a parent
item is deleted, all child items are loaded so that they can either be
or error raise scenario is in place on the database side. Note that
the foreign key attributes on in-session child objects will not be changed
after a flush occurs so this is a very special use-case setting.
-
+
+ passive_updates=True
+ Indicates loading and INSERT/UPDATE/DELETE behavior when the source
+ of a foreign key value changes (i.e. an "on update" cascade), which are
+ typically the primary key columns of the source row.
+
+ When True, it is assumed that ON UPDATE CASCADE is configured on the
+ foreign key in the database, and that the database will handle propagation of an
+ UPDATE from a source column to dependent rows. Note that with databases
+ which enforce referential integrity (ie. Postgres, MySQL with InnoDB tables),
+ ON UPDATE CASCADE is required for this operation. The relation() will
+ update the value of the attribute on related items which are locally present
+ in the session during a flush.
+
+ When False, it is assumed that the database does not enforce referential
+ integrity and will not be issuing its own CASCADE operation for an update.
+ The relation() will issue the appropriate UPDATE statements to the database
+ in response to the change of a referenced key, and items locally present
+ in the session during a flush will also be refreshed.
+
+ This flag should probably be set to False if primary key changes are expected
+ and the database in use doesn't support CASCADE (i.e. SQLite, MySQL MyISAM tables).
+
post_update
this indicates that the relationship should be handled by a second
UPDATE statement after an INSERT or before a DELETE. Currently, it also
def set(self, state, value, initiator):
raise NotImplementedError()
-
+
+ def get_committed_value(self, state):
+ if state.committed_state is not None:
+ if self.key not in state.committed_state:
+ self.get()
+ return state.committed_state.get(self.key)
+ else:
+ return None
+
def set_committed_value(self, state, value):
"""set an attribute value on the given instance and 'commit' it.
return types[prop.direction](prop)
class DependencyProcessor(object):
+ no_dependencies = False
+
def __init__(self, prop):
self.prop = prop
self.cascade = prop.cascade
self.post_update = prop.post_update
self.foreign_keys = prop.foreign_keys
self.passive_deletes = prop.passive_deletes
+ self.passive_updates = prop.passive_updates
self.enable_typechecks = prop.enable_typechecks
self.key = prop.key
else:
self.syncrules.compile(self.prop.primaryjoin, foreign_keys=self.foreign_keys)
- def get_object_dependencies(self, state, uowcommit, passive = True):
- key = ("dependencies", state, self.key, passive)
-
- # cache the objects, not the states; the strong reference here
- # prevents newly loaded objects from being dereferenced during the
- # flush process
- if key in uowcommit.attributes:
- (added, unchanged, deleted) = uowcommit.attributes[key]
- else:
- (added, unchanged, deleted) = attributes.get_history(state, self.key, passive = passive)
- uowcommit.attributes[key] = (added, unchanged, deleted)
-
- if added is None:
- return (added, unchanged, deleted)
- else:
- return (
- [getattr(c, '_state', None) for c in added],
- [getattr(c, '_state', None) for c in unchanged],
- [getattr(c, '_state', None) for c in deleted],
- )
def _conditional_post_update(self, state, uowcommit, related):
"""Execute a post_update call.
if x is not None:
uowcommit.register_object(state, postupdate=True, post_update_cols=self.syncrules.dest_columns())
break
-
+
+ def _pks_changed(self, uowcommit, state):
+ return self.syncrules.source_changes(uowcommit, state)
+
def __str__(self):
return "%s(%s)" % (self.__class__.__name__, str(self.prop))
# is on.
if (not self.cascade.delete or self.post_update) and not self.passive_deletes=='all':
for state in deplist:
- (added, unchanged, deleted) = self.get_object_dependencies(state, uowcommit, passive=self.passive_deletes)
+ (added, unchanged, deleted) = uowcommit.get_attribute_history(state, self.key,passive=self.passive_deletes)
if unchanged or deleted:
for child in deleted:
if child is not None and self.hasparent(child) is False:
self._conditional_post_update(child, uowcommit, [state])
else:
for state in deplist:
- (added, unchanged, deleted) = self.get_object_dependencies(state, uowcommit, passive=True)
+ (added, unchanged, deleted) = uowcommit.get_attribute_history(state, self.key, passive=True)
if added or deleted:
for child in added:
self._synchronize(state, child, None, False, uowcommit)
for child in deleted:
if not self.cascade.delete_orphan and not self.hasparent(child):
self._synchronize(state, child, None, True, uowcommit)
-
+
+ if self._pks_changed(uowcommit, state):
+ if unchanged:
+ for child in unchanged:
+ self._synchronize(state, child, None, False, uowcommit)
+
def preprocess_dependencies(self, task, deplist, uowcommit, delete = False):
#print self.mapper.mapped_table.name + " " + self.key + " " + repr(len(deplist)) + " preprocess_dep isdelete " + repr(delete) + " direction " + repr(self.direction)
# the child objects have to have their foreign key to the parent set to NULL
if not self.post_update and not self.cascade.delete and not self.passive_deletes=='all':
for state in deplist:
- (added, unchanged, deleted) = self.get_object_dependencies(state, uowcommit, passive=self.passive_deletes)
+ (added, unchanged, deleted) = uowcommit.get_attribute_history(state, self.key,passive=self.passive_deletes)
if unchanged or deleted:
for child in deleted:
if child is not None and self.hasparent(child) is False:
uowcommit.register_object(child)
else:
for state in deplist:
- (added, unchanged, deleted) = self.get_object_dependencies(state, uowcommit, passive=True)
+ (added, unchanged, deleted) = uowcommit.get_attribute_history(state, self.key,passive=True)
if added or deleted:
for child in added:
if child is not None:
uowcommit.register_object(child, isdelete=True)
for c, m in self.mapper.cascade_iterator('delete', child):
uowcommit.register_object(c._state, isdelete=True)
-
+ if not self.passive_updates and self._pks_changed(uowcommit, state):
+ if not unchanged:
+ (added, unchanged, deleted) = uowcommit.get_attribute_history(state, self.key, passive=False)
+ if unchanged:
+ for child in unchanged:
+ uowcommit.register_object(child)
+
def _synchronize(self, state, child, associationrow, clearkeys, uowcommit):
if child is not None:
child = getattr(child, '_state', child)
self._verify_canload(child)
self.syncrules.execute(source, dest, source, child, clearkeys)
+class DetectKeySwitch(DependencyProcessor):
+ """a special DP that works for many-to-one relations, fires off for
+ child items who have changed their referenced key."""
+
+ no_dependencies = True
+
+ def register_dependencies(self, uowcommit):
+ uowcommit.register_processor(self.parent, self, self.mapper)
+
+ def preprocess_dependencies(self, task, deplist, uowcommit, delete=False):
+ # for non-passive updates, register in the preprocess stage
+ # so that mapper save_obj() gets a hold of changes
+ if not delete and not self.passive_updates:
+ self._process_key_switches(deplist, uowcommit)
+
+ def process_dependencies(self, task, deplist, uowcommit, delete=False):
+ # for passive updates, register objects in the process stage
+ # so that we avoid ManyToOneDP's registering the object without
+ # the listonly flag in its own preprocess stage (results in UPDATE)
+ # statements being emitted
+ if not delete and self.passive_updates:
+ self._process_key_switches(deplist, uowcommit)
+
+ def _process_key_switches(self, deplist, uowcommit):
+ switchers = util.Set(s for s in deplist if self._pks_changed(uowcommit, s))
+ if switchers:
+ # yes, we're doing a linear search right now through the UOW. only
+ # takes effect when primary key values have actually changed.
+ # a possible optimization might be to enhance the "hasparents" capability of
+ # attributes to actually store all parent references, but this introduces
+ # more complicated attribute accounting.
+ for s in [elem for elem in uowcommit.session.identity_map.all_states()
+ if issubclass(elem.class_, self.parent.class_) and
+ self.key in elem.dict and
+ elem.dict[self.key]._state in switchers
+ ]:
+ uowcommit.register_object(s, listonly=self.passive_updates)
+ self.syncrules.execute(s.dict[self.key]._state, s, None, None, False)
+
class ManyToOneDP(DependencyProcessor):
+ def __init__(self, prop):
+ DependencyProcessor.__init__(self, prop)
+ self.mapper._dependency_processors.append(DetectKeySwitch(prop))
+
def register_dependencies(self, uowcommit):
if self.post_update:
if not self.is_backref:
else:
uowcommit.register_dependency(self.mapper, self.parent)
uowcommit.register_processor(self.mapper, self, self.parent)
+
def process_dependencies(self, task, deplist, uowcommit, delete = False):
#print self.mapper.mapped_table.name + " " + self.key + " " + repr(len(deplist)) + " process_dep isdelete " + repr(delete) + " direction " + repr(self.direction)
# before we can DELETE the row
for state in deplist:
self._synchronize(state, None, None, True, uowcommit)
- (added, unchanged, deleted) = self.get_object_dependencies(state, uowcommit, passive=self.passive_deletes)
+ (added, unchanged, deleted) = uowcommit.get_attribute_history(state, self.key,passive=self.passive_deletes)
if added or unchanged or deleted:
self._conditional_post_update(state, uowcommit, deleted + unchanged + added)
else:
for state in deplist:
- (added, unchanged, deleted) = self.get_object_dependencies(state, uowcommit, passive=True)
+ (added, unchanged, deleted) = uowcommit.get_attribute_history(state, self.key,passive=True)
if added or deleted or unchanged:
for child in added:
self._synchronize(state, child, None, False, uowcommit)
if delete:
if self.cascade.delete:
for state in deplist:
- (added, unchanged, deleted) = self.get_object_dependencies(state, uowcommit, passive=self.passive_deletes)
+ (added, unchanged, deleted) = uowcommit.get_attribute_history(state, self.key,passive=self.passive_deletes)
if deleted or unchanged:
for child in deleted + unchanged:
if child is not None and self.hasparent(child) is False:
for state in deplist:
uowcommit.register_object(state)
if self.cascade.delete_orphan:
- (added, unchanged, deleted) = self.get_object_dependencies(state, uowcommit, passive=self.passive_deletes)
+ (added, unchanged, deleted) = uowcommit.get_attribute_history(state, self.key,passive=self.passive_deletes)
if deleted:
for child in deleted:
if self.hasparent(child) is False:
for c, m in self.mapper.cascade_iterator('delete', child):
uowcommit.register_object(c._state, isdelete=True)
+
def _synchronize(self, state, child, associationrow, clearkeys, uowcommit):
source = child
dest = state
connection = uowcommit.transaction.connection(self.mapper)
secondary_delete = []
secondary_insert = []
-
+ secondary_update = []
+
if hasattr(self.prop, 'reverse_property'):
reverse_dep = getattr(self.prop.reverse_property, '_dependency_processor', None)
else:
if delete:
for state in deplist:
- (added, unchanged, deleted) = self.get_object_dependencies(state, uowcommit, passive=self.passive_deletes)
+ (added, unchanged, deleted) = uowcommit.get_attribute_history(state, self.key,passive=self.passive_deletes)
if deleted or unchanged:
for child in deleted + unchanged:
if child is None or (reverse_dep and (reverse_dep, "manytomany", child, state) in uowcommit.attributes):
uowcommit.attributes[(self, "manytomany", state, child)] = True
else:
for state in deplist:
- (added, unchanged, deleted) = self.get_object_dependencies(state, uowcommit)
+ (added, unchanged, deleted) = uowcommit.get_attribute_history(state, self.key)
if added or deleted:
for child in added:
if child is None or (reverse_dep and (reverse_dep, "manytomany", child, state) in uowcommit.attributes):
self._synchronize(state, child, associationrow, False, uowcommit)
uowcommit.attributes[(self, "manytomany", state, child)] = True
secondary_delete.append(associationrow)
-
+
+ if not self.passive_updates and unchanged and self._pks_changed(uowcommit, state):
+ for child in unchanged:
+ associationrow = {}
+ self.syncrules.update(associationrow, state, child, "old_")
+ secondary_update.append(associationrow)
+
if secondary_delete:
secondary_delete.sort()
# TODO: precompile the delete/insert queries?
result = connection.execute(statement, secondary_delete)
if result.supports_sane_multi_rowcount() and result.rowcount != len(secondary_delete):
raise exceptions.ConcurrentModificationError("Deleted rowcount %d does not match number of objects deleted %d" % (result.rowcount, len(secondary_delete)))
-
+
+ if secondary_update:
+ statement = self.secondary.update(sql.and_(*[c == sql.bindparam("old_" + c.key, type_=c.type) for c in self.secondary.c if c.key in associationrow]))
+ result = connection.execute(statement, secondary_update)
+ if result.supports_sane_multi_rowcount() and result.rowcount != len(secondary_update):
+ raise exceptions.ConcurrentModificationError("Updated rowcount %d does not match number of objects updated %d" % (result.rowcount, len(secondary_update)))
+
if secondary_insert:
statement = self.secondary.insert()
connection.execute(statement, secondary_insert)
#print self.mapper.mapped_table.name + " " + self.key + " " + repr(len(deplist)) + " preprocess_dep isdelete " + repr(delete) + " direction " + repr(self.direction)
if not delete:
for state in deplist:
- (added, unchanged, deleted) = self.get_object_dependencies(state, uowcommit, passive=True)
+ (added, unchanged, deleted) = uowcommit.get_attribute_history(state, self.key,passive=True)
if deleted:
for child in deleted:
if self.cascade.delete_orphan and self.hasparent(child) is False:
uowcommit.register_object(c._state, isdelete=True)
def _synchronize(self, state, child, associationrow, clearkeys, uowcommit):
- dest = associationrow
- source = None
- if dest is None:
+ if associationrow is None:
return
self._verify_canload(child)
- self.syncrules.execute(source, dest, state, child, clearkeys)
+ self.syncrules.execute(None, associationrow, state, child, clearkeys)
class AssociationDP(OneToManyDP):
def __init__(self, *args, **kwargs):
self.polymorphic_on = polymorphic_on
self._eager_loaders = util.Set()
self._row_translators = {}
+ self._dependency_processors = []
# our 'polymorphic identity', a string name that when located in a result set row
# indicates this Mapper should be used to construct the object instance for that row.
return issubclass(state.class_, self.class_)
else:
return state.class_ is self.class_
-
- def _get_state_attr_by_column(self, state, column):
+
+ def _get_col_to_prop(self, column):
try:
- return self._columntoproperty[column].getattr(state, column)
+ return self._columntoproperty[column]
except KeyError:
prop = self.__props.get(column.key, None)
if prop:
raise exceptions.InvalidRequestError("Column '%s.%s' is not available, due to conflicting property '%s':%s" % (column.table.name, column.name, column.key, repr(prop)))
else:
raise exceptions.InvalidRequestError("No column %s.%s is configured on mapper %s..." % (column.table.name, column.name, str(self)))
+
+ def _get_state_attr_by_column(self, state, column):
+ return self._get_col_to_prop(column).getattr(state, column)
def _set_state_attr_by_column(self, state, column, value):
- return self._columntoproperty[column].setattr(state, value, column)
+ return self._get_col_to_prop(column).setattr(state, value, column)
def _get_attr_by_column(self, obj, column):
- return self._get_state_attr_by_column(obj._state, column)
+ return self._get_col_to_prop(column).getattr(obj._state, column)
+
+ def _get_committed_attr_by_column(self, obj, column):
+ return self._get_col_to_prop(column).getcommitted(obj._state, column)
def _set_attr_by_column(self, obj, column, value):
- self._set_state_attr_by_column(obj._state, column, value)
+ self._get_col_to_prop(column).setattr(obj._state, column, value)
def save_obj(self, states, uowtransaction, postupdate=False, post_update_cols=None, single=False):
"""Issue ``INSERT`` and/or ``UPDATE`` statements for a list of objects.
if self.__should_log_debug:
self.__log_debug("detected row switch for identity %s. will update %s, remove %s from transaction" % (instance_key, mapperutil.state_str(state), mapperutil.instance_str(existing)))
uowtransaction.set_row_switch(existing)
- if _state_has_identity(state):
- if state.dict['_instance_key'] != instance_key:
- raise exceptions.FlushError("Can't change the identity of instance %s in session (existing identity: %s; new identity: %s)" % (mapperutil.state_str(state), state.dict['_instance_key'], instance_key))
+# if _state_has_identity(state):
+# if state.dict['_instance_key'] != instance_key:
+# raise exceptions.FlushError("Can't change the identity of instance %s in session (existing identity: %s; new identity: %s)" % (mapperutil.state_str(state), state.dict['_instance_key'], instance_key))
inserted_objects = util.Set()
updated_objects = util.Set()
(added, unchanged, deleted) = attributes.get_history(state, prop.key, passive=True)
if added:
hasdata = True
- elif col in pks:
- params[col._label] = mapper._get_state_attr_by_column(state, col)
elif mapper.polymorphic_on is not None and mapper.polymorphic_on.shares_lineage(col):
pass
else:
if post_update_cols is not None and col not in post_update_cols:
+ if col in pks:
+ params[col._label] = mapper._get_state_attr_by_column(state, col)
continue
+
prop = mapper._columntoproperty[col]
- (added, unchanged, deleted) = attributes.get_history(state, prop.key, passive=True)
+ (added, unchanged, deleted) = uowtransaction.get_attribute_history(state, prop.key, passive=True, cache=False)
+ #(added, unchanged, deleted) = attributes.get_history(state, prop.key, passive=True)
if added:
if isinstance(added[0], sql.ClauseElement):
value_params[col] = added[0]
else:
params[col.key] = prop.get_col_value(col, added[0])
+ if col in pks:
+ if deleted:
+ params[col._label] = deleted[0]
+ else:
+ # row switch logic can reach us here
+ params[col._label] = added[0]
hasdata = True
+ elif col in pks:
+ params[col._label] = mapper._get_state_attr_by_column(state, col)
if hasdata:
update.append((state, params, mapper, connection, value_params))
if 'after_delete' in mapper.extension.methods:
mapper.extension.after_delete(mapper, connection, state.obj())
- def register_dependencies(self, uowcommit, *args, **kwargs):
+ def register_dependencies(self, uowcommit):
"""Register ``DependencyProcessor`` instances with a
``unitofwork.UOWTransaction``.
"""
for prop in self.__props.values():
- prop.register_dependencies(uowcommit, *args, **kwargs)
-
+ prop.register_dependencies(uowcommit)
+ for dep in self._dependency_processors:
+ dep.register_dependencies(uowcommit)
+
def cascade_iterator(self, type, state, recursive=None, halt_on=None):
"""Iterate each element and its mapper in an object graph,
for all relations that meet the given cascade rule.
def getattr(self, state, column):
return getattr(state.class_, self.key).impl.get(state)
+ def getcommitted(self, state, column):
+ return getattr(state.class_, self.key).impl.get_committed_value(state)
+
def setattr(self, state, value, column):
getattr(state.class_, self.key).impl.set(state, value, None)
obj = getattr(state.class_, self.key).impl.get(state)
return self.get_col_value(column, obj)
+ def getcommitted(self, state, column):
+ obj = getattr(state.class_, self.key).impl.get_committed_value(state)
+ return self.get_col_value(column, obj)
+
def setattr(self, state, value, column):
# TODO: test coverage for this method
obj = getattr(state.class_, self.key).impl.get(state)
of items that correspond to a related database table.
"""
- def __init__(self, argument, secondary=None, primaryjoin=None, secondaryjoin=None, entity_name=None, foreign_keys=None, foreignkey=None, uselist=None, private=False, association=None, order_by=False, attributeext=None, backref=None, is_backref=False, post_update=False, cascade=None, viewonly=False, lazy=True, collection_class=None, passive_deletes=False, remote_side=None, enable_typechecks=True, join_depth=None, strategy_class=None):
+ def __init__(self, argument, secondary=None, primaryjoin=None, secondaryjoin=None, entity_name=None, foreign_keys=None, foreignkey=None, uselist=None, private=False, association=None, order_by=False, attributeext=None, backref=None, is_backref=False, post_update=False, cascade=None, viewonly=False, lazy=True, collection_class=None, passive_deletes=False, passive_updates=True, remote_side=None, enable_typechecks=True, join_depth=None, strategy_class=None):
self.uselist = uselist
self.argument = argument
self.entity_name = entity_name
util.warn_deprecated('foreignkey option is deprecated; see docs for details')
self.collection_class = collection_class
self.passive_deletes = passive_deletes
+ self.passive_updates = passive_updates
self.remote_side = util.to_set(remote_side)
self.enable_typechecks = enable_typechecks
self._parent_join_cache = {}
# just a string was sent
if secondary is not None:
# reverse primary/secondary in case of a many-to-many
- self.backref = BackRef(backref, primaryjoin=secondaryjoin, secondaryjoin=primaryjoin)
+ self.backref = BackRef(backref, primaryjoin=secondaryjoin, secondaryjoin=primaryjoin, passive_updates=self.passive_updates)
else:
- self.backref = BackRef(backref, primaryjoin=primaryjoin, secondaryjoin=secondaryjoin)
+ self.backref = BackRef(backref, primaryjoin=primaryjoin, secondaryjoin=secondaryjoin, passive_updates=self.passive_updates)
else:
self.backref = backref
self.is_backref = is_backref
def create_statement(instance):
params = {}
for (c, bind) in param_names:
- params[bind] = mapper._get_attr_by_column(instance, c)
+ # use the "committed" (database) version to get query column values
+ params[bind] = mapper._get_committed_attr_by_column(instance, c)
return (statement, params)
def new_execute(instance, row, isnew, **flags):
def visit_bindparam(bindparam):
mapper = reverse_direction and self.parent_property.mapper or self.parent_property.parent
if bindparam.key in bind_to_col:
- bindparam.value = mapper._get_attr_by_column(instance, bind_to_col[bindparam.key])
+ # use the "committed" (database) version to get query column values
+ bindparam.value = mapper._get_committed_attr_by_column(instance, bind_to_col[bindparam.key])
return visitors.traverse(criterion, clone=True, visit_bindparam=visit_bindparam)
def setup_loader(self, instance, options=None, path=None):
if self.use_get:
params = {}
for col, bind in self.lazybinds.iteritems():
- params[bind.key] = self.parent._get_attr_by_column(instance, col)
+ # use the "committed" (database) version to get query column values
+ params[bind.key] = self.parent._get_committed_attr_by_column(instance, col)
ident = []
nonnulls = False
for primary_key in self.select_mapper.primary_key:
from sqlalchemy import schema, exceptions, util
from sqlalchemy.sql import visitors, operators
from sqlalchemy import logging
-from sqlalchemy.orm import util as mapperutil
+from sqlalchemy.orm import util as mapperutil, attributes
ONETOMANY = 0
MANYTOONE = 1
def dest_columns(self):
return [r.dest_column for r in self.syncrules if r.dest_column is not None]
+ def update(self, dest, parent, child, old_prefix):
+ for rule in self.syncrules:
+ rule.update(dest, parent, child, old_prefix)
+
def execute(self, source, dest, obj=None, child=None, clearkeys=None):
for rule in self.syncrules:
rule.execute(source, dest, obj, child, clearkeys)
-
+
+ def source_changes(self, uowcommit, source):
+ for rule in self.syncrules:
+ if rule.source_changes(uowcommit, source):
+ return True
+ else:
+ return False
+
class SyncRule(object):
"""An instruction indicating how to populate the objects on each
side of a relationship.
except AttributeError:
self._dest_primary_key = self.dest_mapper is not None and self.dest_column in self.dest_mapper._pks_by_table[self.dest_column.table] and not self.dest_mapper.allow_null_pks
return self._dest_primary_key
-
+
+ def source_changes(self, uowcommit, source):
+ prop = self.source_mapper._columntoproperty[self.source_column]
+ (added, unchanged, deleted) = uowcommit.get_attribute_history(source, prop.key, passive=True)
+ return bool(added)
+
+ def update(self, dest, parent, child, old_prefix):
+ if self.issecondary is False:
+ source = parent
+ elif self.issecondary is True:
+ source = child
+ oldvalue = self.source_mapper._get_committed_attr_by_column(source.obj(), self.source_column)
+ value = self.source_mapper._get_state_attr_by_column(source, self.source_column)
+ dest[self.dest_column.key] = value
+ dest[old_prefix + self.dest_column.key] = oldvalue
+
def execute(self, source, dest, parent, child, clearkeys):
+ # TODO: break the "dictionary" case into a separate method like 'update' above,
+ # reduce conditionals
if source is None:
if self.issecondary is False:
source = parent
"""register the given object as 'clean' (i.e. persistent) within this unit of work, after
a save operation has taken place."""
+ mapper = _state_mapper(state)
+ instance_key = mapper._identity_key_from_state(state)
+
if '_instance_key' not in state.dict:
- mapper = _state_mapper(state)
- state.dict['_instance_key'] = mapper._identity_key_from_state(state)
+ state.dict['_instance_key'] = instance_key
+
+ elif state.dict['_instance_key'] != instance_key:
+ # primary key switch
+ self.identity_map[instance_key] = state.obj()
+ del self.identity_map[state.dict['_instance_key']]
+ state.dict['_instance_key'] = instance_key
+
if hasattr(state, 'insert_order'):
delattr(state, 'insert_order')
self.identity_map[state.dict['_instance_key']] = state.obj()
self.attributes = {}
self.logger = logging.instance_logger(self, echoflag=session.echo_uow)
+
+ def get_attribute_history(self, state, key, passive=True, cache=True):
+ hashkey = ("history", state, key)
+
+ # cache the objects, not the states; the strong reference here
+ # prevents newly loaded objects from being dereferenced during the
+ # flush process
+ if cache and hashkey in self.attributes:
+ (added, unchanged, deleted, cached_passive) = self.attributes[hashkey]
+ # if the cached lookup was "passive" and now we want non-passive, do a non-passive
+ # lookup and re-cache
+ if cached_passive and not passive:
+ (added, unchanged, deleted) = attributes.get_history(state, key, passive=False)
+ self.attributes[hashkey] = (added, unchanged, deleted, passive)
+ else:
+ (added, unchanged, deleted) = attributes.get_history(state, key, passive=passive)
+ self.attributes[hashkey] = (added, unchanged, deleted, passive)
+
+ if added is None:
+ return (added, unchanged, deleted)
+ else:
+ return (
+ [getattr(c, '_state', c) for c in added],
+ [getattr(c, '_state', c) for c in unchanged],
+ [getattr(c, '_state', c) for c in deleted],
+ )
+
def register_object(self, state, isdelete = False, listonly = False, postupdate=False, post_update_cols=None, **kwargs):
# if object is not in the overall session, do nothing
task = self.get_task_by_mapper(mapper)
targettask = self.get_task_by_mapper(mapperfrom)
up = UOWDependencyProcessor(processor, targettask)
- task._dependencies.add(up)
+ task.dependencies.add(up)
def execute(self):
"""Execute this UOWTransaction.
# mapping of InstanceState -> UOWTaskElement
self._objects = {}
- self._dependencies = util.Set()
+ self.dependencies = util.Set()
self.cyclical_dependencies = util.Set()
def polymorphic_tasks(self):
used only for debugging output.
"""
- return not self._objects and not self._dependencies
+ return not self._objects and not self.dependencies
def append(self, state, listonly=False, isdelete=False):
if state not in self._objects:
polymorphic_todelete_objects = property(lambda self:[rec.state for rec in self.polymorphic_elements
if rec.state is not None and not rec.listonly and rec.isdelete is True])
- dependencies = property(lambda self:self._dependencies)
-
polymorphic_dependencies = _polymorphic_collection(lambda task:task.dependencies)
polymorphic_cyclical_dependencies = _polymorphic_collection(lambda task:task.cyclical_dependencies)
object_to_original_task[state] = subtask
for dep in deps_by_targettask.get(subtask, []):
# is this dependency involved in one of the cycles ?
- if not dependency_in_cycles(dep):
+ # (don't count the DetectKeySwitch prop)
+ if dep.processor.no_dependencies or not dependency_in_cycles(dep):
continue
(processor, targettask) = (dep.processor, dep.targettask)
isdelete = taskelement.isdelete
# stick the non-circular dependencies onto the new UOWTask
for d in extradeplist:
- t._dependencies.add(d)
+ t.dependencies.add(d)
if head is not None:
make_task_tree(head, t, {})
for state in t2.elements:
localtask.append(obj, t2.listonly, isdelete=t2._objects[state].isdelete)
for dep in t2.dependencies:
- localtask._dependencies.add(dep)
+ localtask.dependencies.add(dep)
ret.insert(0, localtask)
return ret
self.processor.process_dependencies(self.targettask, [elem.state for elem in self.targettask.polymorphic_todelete_elements if elem.state is not None], trans, delete=True)
def get_object_dependencies(self, state, trans, passive):
- return self.processor.get_object_dependencies(state, trans, passive=passive)
+ return trans.get_attribute_history(state, self.processor.key, passive=passive)
def whose_dependent_on_who(self, state1, state2):
"""establish which object is operationally dependent amongst a parent/child
'orm.generative',
'orm.lazytest1',
'orm.assorted_eager',
-
+
+ 'orm.naturalpks',
'orm.sessioncontext',
'orm.unitofwork',
'orm.session',
--- /dev/null
+import testbase
+from sqlalchemy import *
+from sqlalchemy.orm import *
+from sqlalchemy import exceptions
+
+from testlib.fixtures import *
+from testlib import *
+
+"""test primary key changing capabilities and passive/non-passive cascading updates."""
+
+class NaturalPKTest(ORMTest):
+ def define_tables(self, metadata):
+ global users, addresses, items, users_to_items
+
+ users = Table('users', metadata,
+ Column('username', String(50), primary_key=True),
+ Column('fullname', String(100)))
+
+ addresses = Table('addresses', metadata,
+ Column('email', String(50), primary_key=True),
+ Column('username', String(50), ForeignKey('users.username', onupdate="cascade")))
+
+ items = Table('items', metadata,
+ Column('itemname', String(50), primary_key=True),
+ Column('description', String(100)))
+
+ users_to_items = Table('userstoitems', metadata,
+ Column('username', String(50), ForeignKey('users.username', onupdate='cascade'), primary_key=True),
+ Column('itemname', String(50), ForeignKey('items.itemname', onupdate='cascade'), primary_key=True),
+ )
+
+ def test_entity(self):
+ mapper(User, users)
+
+ sess = create_session()
+ u1 = User(username='jack', fullname='jack')
+
+ sess.save(u1)
+ sess.flush()
+ assert sess.get(User, 'jack') is u1
+
+ u1.username = 'ed'
+ sess.flush()
+
+ def go():
+ assert sess.get(User, 'ed') is u1
+ self.assert_sql_count(testbase.db, go, 0)
+
+ assert sess.get(User, 'jack') is None
+
+ sess.clear()
+ u1 = sess.query(User).get('ed')
+ self.assertEquals(User(username='ed', fullname='jack'), u1)
+
+ @testing.unsupported('sqlite','mysql')
+ def test_onetomany_passive(self):
+ self._test_onetomany(True)
+
+ def test_onetomany_nonpassive(self):
+ self._test_onetomany(False)
+
+ def _test_onetomany(self, passive_updates):
+ mapper(User, users, properties={
+ 'addresses':relation(Address, passive_updates=passive_updates)
+ })
+ mapper(Address, addresses)
+
+ sess = create_session()
+ u1 = User(username='jack', fullname='jack')
+ u1.addresses.append(Address(email='jack1'))
+ u1.addresses.append(Address(email='jack2'))
+ sess.save(u1)
+ sess.flush()
+
+ assert sess.get(Address, 'jack1') is u1.addresses[0]
+
+ u1.username = 'ed'
+ sess.flush()
+ assert u1.addresses[0].username == 'ed'
+
+ sess.clear()
+ self.assertEquals([Address(username='ed'), Address(username='ed')], sess.query(Address).all())
+
+ u1 = sess.get(User, 'ed')
+ u1.username = 'jack'
+ def go():
+ sess.flush()
+ if not passive_updates:
+ self.assert_sql_count(testbase.db, go, 4) # test passive_updates=False; load addresses, update user, update 2 addresses
+ else:
+ self.assert_sql_count(testbase.db, go, 1) # test passive_updates=True; update user
+ sess.clear()
+ assert User(username='jack', addresses=[Address(username='jack'), Address(username='jack')]) == sess.get(User, 'jack')
+
+ u1 = sess.get(User, 'jack')
+ u1.addresses = []
+ u1.username = 'fred'
+ sess.flush()
+ sess.clear()
+ assert sess.get(Address, 'jack1').username is None
+ u1 = sess.get(User, 'fred')
+ self.assertEquals(User(username='fred', fullname='jack'), u1)
+
+ @testing.unsupported('sqlite', 'mysql')
+ def test_manytoone_passive(self):
+ self._test_manytoone(True)
+
+ def test_manytoone_nonpassive(self):
+ self._test_manytoone(False)
+
+ def _test_manytoone(self, passive_updates):
+ mapper(User, users)
+ mapper(Address, addresses, properties={
+ 'user':relation(User, passive_updates=passive_updates)
+ })
+
+ sess = create_session()
+ a1 = Address(email='jack1')
+ a2 = Address(email='jack2')
+
+ u1 = User(username='jack', fullname='jack')
+ a1.user = u1
+ a2.user = u1
+ sess.save(a1)
+ sess.save(a2)
+ sess.flush()
+
+ u1.username = 'ed'
+
+ print id(a1), id(a2), id(u1)
+ print u1._state.parents
+ def go():
+ sess.flush()
+ if passive_updates:
+ self.assert_sql_count(testbase.db, go, 1)
+ else:
+ self.assert_sql_count(testbase.db, go, 3)
+
+ def go():
+ sess.flush()
+ self.assert_sql_count(testbase.db, go, 0)
+
+ assert a1.username == a2.username == 'ed'
+ sess.clear()
+ self.assertEquals([Address(username='ed'), Address(username='ed')], sess.query(Address).all())
+
+ @testing.unsupported('sqlite', 'mysql')
+ def test_bidirectional_passive(self):
+ self._test_bidirectional(True)
+
+ def test_bidirectional_nonpassive(self):
+ self._test_bidirectional(False)
+
+ def _test_bidirectional(self, passive_updates):
+ mapper(User, users)
+ mapper(Address, addresses, properties={
+ 'user':relation(User, passive_updates=passive_updates, backref='addresses')
+ })
+
+ sess = create_session()
+ a1 = Address(email='jack1')
+ a2 = Address(email='jack2')
+
+ u1 = User(username='jack', fullname='jack')
+ a1.user = u1
+ a2.user = u1
+ sess.save(a1)
+ sess.save(a2)
+ sess.flush()
+
+ u1.username = 'ed'
+ (ad1, ad2) = sess.query(Address).all()
+ self.assertEquals([Address(username='jack'), Address(username='jack')], [ad1, ad2])
+ def go():
+ sess.flush()
+ if passive_updates:
+ self.assert_sql_count(testbase.db, go, 1)
+ else:
+ self.assert_sql_count(testbase.db, go, 3)
+ self.assertEquals([Address(username='ed'), Address(username='ed')], [ad1, ad2])
+ sess.clear()
+ self.assertEquals([Address(username='ed'), Address(username='ed')], sess.query(Address).all())
+
+ u1 = sess.get(User, 'ed')
+ assert len(u1.addresses) == 2 # load addresses
+ u1.username = 'fred'
+ print "--------------------------------"
+ def go():
+ sess.flush()
+ # check that the passive_updates is on on the other side
+ if passive_updates:
+ self.assert_sql_count(testbase.db, go, 1)
+ else:
+ self.assert_sql_count(testbase.db, go, 3)
+ sess.clear()
+ self.assertEquals([Address(username='fred'), Address(username='fred')], sess.query(Address).all())
+
+
+ @testing.unsupported('sqlite', 'mysql')
+ def test_manytomany_passive(self):
+ self._test_manytomany(True)
+
+ def test_manytomany_nonpassive(self):
+ self._test_manytomany(False)
+
+ def _test_manytomany(self, passive_updates):
+ mapper(User, users, properties={
+ 'items':relation(Item, secondary=users_to_items, backref='users', passive_updates=passive_updates)
+ })
+ mapper(Item, items)
+
+ sess = create_session()
+ u1 = User(username='jack')
+ u2 = User(username='fred')
+ i1 = Item(itemname='item1')
+ i2 = Item(itemname='item2')
+
+ u1.items.append(i1)
+ u1.items.append(i2)
+ i2.users.append(u2)
+ sess.save(u1)
+ sess.save(u2)
+ sess.flush()
+
+ r = sess.query(Item).all()
+ # fixtures.Base can't handle a comparison with the backrefs involved....
+ self.assertEquals(Item(itemname='item1'), r[0])
+ self.assertEquals(['jack'], [u.username for u in r[0].users])
+ self.assertEquals(Item(itemname='item2'), r[1])
+ self.assertEquals(['jack', 'fred'], [u.username for u in r[1].users])
+
+ u2.username='ed'
+ def go():
+ sess.flush()
+ go()
+ def go():
+ sess.flush()
+ self.assert_sql_count(testbase.db, go, 0)
+
+ sess.clear()
+ r = sess.query(Item).all()
+ self.assertEquals(Item(itemname='item1'), r[0])
+ self.assertEquals(['jack'], [u.username for u in r[0].users])
+ self.assertEquals(Item(itemname='item2'), r[1])
+ self.assertEquals(['ed', 'jack'], sorted([u.username for u in r[1].users]))
+
+class SelfRefTest(ORMTest):
+ def define_tables(self, metadata):
+ global nodes, Node
+
+ nodes = Table('nodes', metadata,
+ Column('name', String(50), primary_key=True),
+ Column('parent', String(50), ForeignKey('nodes.name', onupdate='cascade'))
+ )
+
+ class Node(Base):
+ pass
+
+ def test_onetomany(self):
+ mapper(Node, nodes, properties={
+ 'children':relation(Node, backref=backref('parentnode', remote_side=nodes.c.name, passive_updates=False), passive_updates=False)
+ })
+
+ sess = create_session()
+ n1 = Node(name='n1')
+ n1.children.append(Node(name='n11'))
+ n1.children.append(Node(name='n12'))
+ n1.children.append(Node(name='n13'))
+ sess.save(n1)
+ sess.flush()
+
+ n1.name = 'new n1'
+ sess.flush()
+ self.assertEquals(n1.children[1].parent, 'new n1')
+ self.assertEquals(['new n1', 'new n1', 'new n1'], [n.parent for n in sess.query(Node).filter(Node.name.in_(['n11', 'n12', 'n13']))])
+
+
+class NonPKCascadeTest(ORMTest):
+ def define_tables(self, metadata):
+ global users, addresses
+
+ users = Table('users', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('username', String(50), unique=True),
+ Column('fullname', String(100)))
+
+ addresses = Table('addresses', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('email', String(50)),
+ Column('username', String(50), ForeignKey('users.username', onupdate="cascade")))
+
+ @testing.unsupported('sqlite','mysql')
+ def test_onetomany_passive(self):
+ self._test_onetomany(True)
+
+ def test_onetomany_nonpassive(self):
+ self._test_onetomany(False)
+
+ def _test_onetomany(self, passive_updates):
+ mapper(User, users, properties={
+ 'addresses':relation(Address, passive_updates=passive_updates)
+ })
+ mapper(Address, addresses)
+
+ sess = create_session()
+ u1 = User(username='jack', fullname='jack')
+ u1.addresses.append(Address(email='jack1'))
+ u1.addresses.append(Address(email='jack2'))
+ sess.save(u1)
+ sess.flush()
+ a1 = u1.addresses[0]
+
+ assert sess.get(Address, a1.id) is u1.addresses[0]
+
+ u1.username = 'ed'
+ sess.flush()
+ assert u1.addresses[0].username == 'ed'
+
+ sess.clear()
+ self.assertEquals([Address(username='ed'), Address(username='ed')], sess.query(Address).all())
+
+ u1 = sess.get(User, u1.id)
+ u1.username = 'jack'
+ def go():
+ sess.flush()
+ if not passive_updates:
+ self.assert_sql_count(testbase.db, go, 4) # test passive_updates=False; load addresses, update user, update 2 addresses
+ else:
+ self.assert_sql_count(testbase.db, go, 1) # test passive_updates=True; update user
+ sess.clear()
+ assert User(username='jack', addresses=[Address(username='jack'), Address(username='jack')]) == sess.get(User, u1.id)
+
+ u1 = sess.get(User, u1.id)
+ u1.addresses = []
+ u1.username = 'fred'
+ sess.flush()
+ sess.clear()
+ assert sess.get(Address, a1.id).username is None
+ u1 = sess.get(User, u1.id)
+ self.assertEquals(User(username='fred', fullname='jack'), u1)
+
+
+if __name__ == '__main__':
+ testbase.main()
+
+
e.data = 'some more data'
Session.commit()
- @engines.assert_conns_closed
- def test_pksimmutable(self):
- class Entry(object):
- pass
- mapper(Entry, table)
- e = Entry()
- e.multi_id=5
- e.multi_rev=5
- e.name='somename'
- Session.commit()
- e.multi_rev=6
- e.name = 'someothername'
- try:
- Session.commit()
- assert False
- except exceptions.FlushError, fe:
- assert str(fe) == "Can't change the identity of instance Entry@%s in session (existing identity: (%s, (5, 5), None); new identity: (%s, (5, 6), None))" % (hex(id(e)), repr(e.__class__), repr(e.__class__)), str(fe)
-
class ForeignPKTest(ORMTest):
"""tests mapper detection of the relationship direction when parent/child tables are joined on their
primary keys"""