filtered_heads = self.revision_map.filter_for_lineage(
heads, revision, include_dependencies=True)
- dest = self.get_revision(revision)
-
- if dest is None:
- # dest is 'base'. Return a "delete branch" migration
- # for all applicable heads.
- return [
- migration.StampStep(head.revision, None, False, True)
- for head in filtered_heads
- ]
- elif dest in filtered_heads:
- # the dest is already in the version table, do nothing.
- return []
-
- # figure out if the dest is a descendant or an
- # ancestor of the selected nodes
- descendants = set(self.revision_map._get_descendant_nodes([dest]))
- ancestors = set(self.revision_map._get_ancestor_nodes([dest]))
-
- if descendants.intersection(filtered_heads):
- # heads are above the target, so this is a downgrade.
- # we can treat them as a "merge", single step.
- assert not ancestors.intersection(filtered_heads)
- todo_heads = [head.revision for head in filtered_heads]
- step = migration.StampStep(
- todo_heads, dest.revision, False, False)
- return [step]
- elif ancestors.intersection(filtered_heads):
- # heads are below the target, so this is an upgrade.
- # we can treat them as a "merge", single step.
- todo_heads = [head.revision for head in filtered_heads]
- step = migration.StampStep(
- todo_heads, dest.revision, True, False)
- return [step]
- else:
- # destination is in a branch not represented,
- # treat it as new branch
- step = migration.StampStep((), dest.revision, True, True)
- return [step]
+ steps = []
+
+ dests = self.get_revisions(revision) or [None]
+ for dest in dests:
+ if dest is None:
+ # dest is 'base'. Return a "delete branch" migration
+ # for all applicable heads.
+ steps.extend([
+ migration.StampStep(head.revision, None, False, True)
+ for head in filtered_heads
+ ])
+ continue
+ elif dest in filtered_heads:
+ # the dest is already in the version table, do nothing.
+ continue
+
+ # figure out if the dest is a descendant or an
+ # ancestor of the selected nodes
+ descendants = set(
+ self.revision_map._get_descendant_nodes([dest]))
+ ancestors = set(self.revision_map._get_ancestor_nodes([dest]))
+
+ if descendants.intersection(filtered_heads):
+ # heads are above the target, so this is a downgrade.
+ # we can treat them as a "merge", single step.
+ assert not ancestors.intersection(filtered_heads)
+ todo_heads = [head.revision for head in filtered_heads]
+ step = migration.StampStep(
+ todo_heads, dest.revision, False, False)
+ steps.append(step)
+ continue
+ elif ancestors.intersection(filtered_heads):
+ # heads are below the target, so this is an upgrade.
+ # we can treat them as a "merge", single step.
+ todo_heads = [head.revision for head in filtered_heads]
+ step = migration.StampStep(
+ todo_heads, dest.revision, True, False)
+ steps.append(step)
+ continue
+ else:
+ # destination is in a branch not represented,
+ # treat it as new branch
+ step = migration.StampStep((), dest.revision, True, True)
+ steps.append(step)
+ continue
+ return steps
def run_env(self):
"""Run the script environment.
self.env._downgrade_revs, self.c.revision[0:4], self.b.revision
)
+ def test_stamp_to_base(self):
+ revs = self.env._stamp_revs("base", self.d.revision)
+ eq_(len(revs), 1)
+ assert revs[0].should_delete_branch
+ eq_(revs[0].delete_version_num, self.d.revision)
+
class BranchedPathTest(MigrationTest):
self.up_(self.a1), self.up_(self.b1)]
)
+ def test_stamp_to_heads(self):
+ a1, b1, a2, b2 = self.a1, self.b1, self.a2, self.b2
+ revs = self.env._stamp_revs("heads", ())
+ eq_(len(revs), 2)
+ #import pdb
+ #pdb.set_trace()
+ eq_(
+ set(r.to_revisions for r in revs),
+ set([(self.b1.revision,), (self.b2.revision,)])
+ )
+
class MergedPathTest(MigrationTest):