def process_saves(self, uowcommit, states):
pass
- def whose_dependent_on_who(self, state1, state2):
- """Given an object pair assuming `obj2` is a child of `obj1`,
- return a tuple with the dependent object second, or None if
- there is no dependency.
-
- """
- if state1 is state2:
- return None
- elif self.direction == ONETOMANY:
- return (state1, state2)
- else:
- return (state2, state1)
-
def _verify_canload(self, state):
if state is not None and \
not self.mapper._canload(state, allow_subtypes=not self.enable_typechecks):
uow.dependencies.update([
(child_action, before_delete),
(before_delete, delete_parent),
- (child_action, delete_parent)
])
def presort_deletes(self, uowcommit, states):
(parent_deletes, child_deletes)
])
+ def per_state_dependencies(self, uow,
+ save_parent,
+ delete_parent,
+ child_action,
+ after_save, before_delete, isdelete):
+ if not isdelete:
+ uow.dependencies.update([
+ (child_action, after_save),
+ (after_save, save_parent),
+ ])
+ else:
+ uow.dependencies.update([
+ (delete_parent, child_action)
+ ])
+
def presort_deletes(self, uowcommit, states):
if self.cascade.delete or self.cascade.delete_orphan:
for state in states:
testing.db,
sess.flush,
AllOf(
- CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id", {'id':3}),
- CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id", {'id':2}),
+ CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id",
+ lambda ctx:{'id':n3.id}),
+ CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id",
+ lambda ctx: {'id':n2.id}),
),
- CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id", {'id':1})
+ CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id",
+ lambda ctx: {'id':n1.id})
)
def test_one_to_many_delete_parent(self):
testing.db,
sess.flush,
AllOf(
- CompiledSQL("UPDATE nodes SET parent_id=:parent_id WHERE nodes.id = :nodes_id", {'nodes_id':3, 'parent_id':None}),
- CompiledSQL("UPDATE nodes SET parent_id=:parent_id WHERE nodes.id = :nodes_id", {'nodes_id':2, 'parent_id':None}),
+ CompiledSQL("UPDATE nodes SET parent_id=:parent_id WHERE nodes.id = :nodes_id",
+ lambda ctx: {'nodes_id':n3.id, 'parent_id':None}),
+ CompiledSQL("UPDATE nodes SET parent_id=:parent_id WHERE nodes.id = :nodes_id",
+ lambda ctx: {'nodes_id':n2.id, 'parent_id':None}),
),
- CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id", {'id':1})
+ CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id",
+ lambda ctx:{'id':n1.id})
)
+ def test_many_to_one_save(self):
+ mapper(Node, nodes, properties={
+ 'parent':relationship(Node, remote_side=nodes.c.id)
+ })
+ sess = create_session()
+
+ n1 = Node(data='n1')
+ n2, n3 = Node(data='n2', parent=n1), Node(data='n3', parent=n1)
+
+ sess.add_all([n2, n3])
+
+ self.assert_sql_execution(
+ testing.db,
+ sess.flush,
+
+ CompiledSQL(
+ "INSERT INTO nodes (parent_id, data) VALUES (:parent_id, :data)",
+ {'parent_id': None, 'data': 'n1'}
+ ),
+ AllOf(
+ CompiledSQL(
+ "INSERT INTO nodes (parent_id, data) VALUES (:parent_id, :data)",
+ lambda ctx: {'parent_id': n1.id, 'data': 'n2'}
+ ),
+ CompiledSQL(
+ "INSERT INTO nodes (parent_id, data) VALUES (:parent_id, :data)",
+ lambda ctx: {'parent_id': n1.id, 'data': 'n3'}
+ ),
+ )
+ )