(after_save, save_parent),
])
else:
- uow.dependencies.update([
- (delete_parent, child_action)
- ])
+ if isinstance(child_action, unitofwork.DeleteState):
+ uow.dependencies.update([
+ (delete_parent, child_action)
+ ])
def presort_deletes(self, uowcommit, states):
if self.cascade.delete or self.cascade.delete_orphan:
self.dependencies.remove(edge)
for dep in convert[edge[1]]:
self.dependencies.add((edge[0], dep))
+ elif edge[0].disabled or edge[1].disabled:
+ self.dependencies.remove(edge)
# remove actions that were part of the cycles,
# or have been marked as "disabled" by the "breaking up"
# execute actions
sort = topological.sort(self.dependencies, self.postsort_actions.values())
+ print "------------------------"
+# import pdb
+# pdb.set_trace()
print self.dependencies
print sort
for rec in sort:
self.dependency_processor.process_deletes(uow, [self.state])
else:
self.dependency_processor.process_saves(uow, [self.state])
+
+ def __repr__(self):
+ return "%s(%s, %s, delete=%s)" % (
+ self.__class__.__name__,
+ self.dependency_processor,
+ mapperutil.state_str(self.state),
+ self.delete
+ )
class SaveUpdateState(PostSortRec):
def __init__(self, uow, state):
output = []
while nodes:
if not queue:
- raise CircularDependencyError("Circular dependency detected: %r" % edges)
+ raise CircularDependencyError("Circular dependency detected: cycles: %r all edges: %r" %
+ (find_cycles(tuples, allitems), edges))
node = queue.pop()
output.append(node)
nodes.remove(node)
backref=backref('parent', remote_side=[item.c.uuid]))})
@testing.resolve_artifact_names
- def testbasic(self):
+ def test_basic(self):
t1 = TT()
t1.children.append(TT())
t1.children.append(TT())
eq_(t.children[0].parent_uuid, t1.uuid)
@testing.resolve_artifact_names
- def testlazyclause(self):
+ def test_lazy_clause(self):
s = create_session()
t1 = TT()
t2 = TT()
primaryjoin=child2.c.child1_id == child1.c.id)))
@testing.resolve_artifact_names
- def testmanytooneonly(self):
+ def test_many_to_one_only(self):
"""test similar to SelfReferentialTest.testmanytooneonly"""
session = create_session()
sess.add(o3)
sess.flush()
-
@testing.resolve_artifact_names
def test_reflush_2(self):
"""A variant on test_reflush()"""
pass
@testing.resolve_artifact_names
- def testcycle(self):
+ def test_cycle(self):
mapper(C2, t2, properties={
'c1s': relationship(C1,
primaryjoin=t2.c.c1 == t1.c.c2,
class BiDirectionalOneToManyTest2(_base.MappedTest):
- """Two mappers with a one-to-many relationship to each other, with a second one-to-many on one of the mappers"""
+ """Two mappers with a one-to-many relationship to each other,
+ with a second one-to-many on one of the mappers"""
run_define_tables = 'each'
'data': relationship(mapper(C1Data, t1_data))})
@testing.resolve_artifact_names
- def testcycle(self):
+ def test_cycle(self):
a = C1()
b = C2()
c = C1()
pass
@testing.resolve_artifact_names
- def testcycle(self):
+ def test_cycle(self):
"""
This test has a peculiar aspect in that it doesnt create as many
dependent relationships as the other tests, and revealed a small
sess.flush()
@testing.resolve_artifact_names
- def testpostupdate_m2o(self):
+ def test_post_update_m2o(self):
"""A cycle between two rows, with a post_update on the many-to-one"""
mapper(Ball, ball)
mapper(Person, person, properties=dict(
)
@testing.resolve_artifact_names
- def testpostupdate_o2m(self):
+ def test_post_update_o2m(self):
"""A cycle between two rows, with a post_update on the one-to-many"""
mapper(Ball, ball)
self.path = path
@testing.resolve_artifact_names
- def testbasic(self):
+ def test_basic(self):
"""Post_update only fires off when needed.
This test case used to produce many superfluous update statements,
pass
@testing.resolve_artifact_names
- def testbasic(self):
+ def test_basic(self):
"""
Test that post_update remembers to be involved in update operations as
well, since it replaces the normal dependency processing completely
),
)
)
+
+ def test_many_to_one_delete_all(self):
+ mapper(Node, nodes, properties={
+ 'parent':relationship(Node, remote_side=nodes.c.id)
+ })
+ sess = create_session()
+
+ n1 = Node(data='n1')
+ n2, n3 = Node(data='n2', parent=n1), Node(data='n3', parent=n1)
+
+ sess.add_all([n2, n3])
+ sess.flush()
+
+ sess.delete(n1)
+ sess.delete(n2)
+ sess.delete(n3)
+ self.assert_sql_execution(
+ testing.db,
+ sess.flush,
+ AllOf(
+ CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id",
+ lambda ctx:{'id':n3.id}),
+ CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id",
+ lambda ctx: {'id':n2.id}),
+ ),
+ CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id",
+ lambda ctx: {'id':n1.id})
+ )
+
+ def test_bidirectional_mutations_one(self):
+ mapper(Node, nodes, properties={
+ 'children':relationship(Node, backref=backref('parent', remote_side=nodes.c.id))
+ })
+ sess = create_session()
+
+ n2, n3 = Node(data='n2', children=[]), Node(data='n3', children=[])
+ n1 = Node(data='n1', children=[n2])
+ sess.add(n1)
+ sess.flush()
+ sess.delete(n2)
+ n1.children.append(n3)
+ sess.flush()
+
+ sess.delete(n1)
+ sess.delete(n3)
+ sess.flush()
\ No newline at end of file