]> git.ipfire.org Git - thirdparty/sqlalchemy/alembic.git/commitdiff
- Added support for "alembic stamp" to work when given "heads" as an
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 23 Jan 2015 23:46:53 +0000 (18:46 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 23 Jan 2015 23:46:53 +0000 (18:46 -0500)
argument, when multiple heads are present.
fixes #267

alembic/script.py
docs/build/changelog.rst
tests/test_version_traversal.py

index 763d0487d23d3c6dedf1ab8cea53a0c327eb7902..095a04ba75c69d6eb88f9623466466a97bbc5657 100644 (file)
@@ -331,44 +331,52 @@ class ScriptDirectory(object):
             filtered_heads = self.revision_map.filter_for_lineage(
                 heads, revision, include_dependencies=True)
 
-            dest = self.get_revision(revision)
-
-            if dest is None:
-                # dest is 'base'.  Return a "delete branch" migration
-                # for all applicable heads.
-                return [
-                    migration.StampStep(head.revision, None, False, True)
-                    for head in filtered_heads
-                ]
-            elif dest in filtered_heads:
-                # the dest is already in the version table, do nothing.
-                return []
-
-            # figure out if the dest is a descendant or an
-            # ancestor of the selected nodes
-            descendants = set(self.revision_map._get_descendant_nodes([dest]))
-            ancestors = set(self.revision_map._get_ancestor_nodes([dest]))
-
-            if descendants.intersection(filtered_heads):
-                # heads are above the target, so this is a downgrade.
-                # we can treat them as a "merge", single step.
-                assert not ancestors.intersection(filtered_heads)
-                todo_heads = [head.revision for head in filtered_heads]
-                step = migration.StampStep(
-                    todo_heads, dest.revision, False, False)
-                return [step]
-            elif ancestors.intersection(filtered_heads):
-                # heads are below the target, so this is an upgrade.
-                # we can treat them as a "merge", single step.
-                todo_heads = [head.revision for head in filtered_heads]
-                step = migration.StampStep(
-                    todo_heads, dest.revision, True, False)
-                return [step]
-            else:
-                # destination is in a branch not represented,
-                # treat it as new branch
-                step = migration.StampStep((), dest.revision, True, True)
-                return [step]
+            steps = []
+
+            dests = self.get_revisions(revision) or [None]
+            for dest in dests:
+                if dest is None:
+                    # dest is 'base'.  Return a "delete branch" migration
+                    # for all applicable heads.
+                    steps.extend([
+                        migration.StampStep(head.revision, None, False, True)
+                        for head in filtered_heads
+                    ])
+                    continue
+                elif dest in filtered_heads:
+                    # the dest is already in the version table, do nothing.
+                    continue
+
+                # figure out if the dest is a descendant or an
+                # ancestor of the selected nodes
+                descendants = set(
+                    self.revision_map._get_descendant_nodes([dest]))
+                ancestors = set(self.revision_map._get_ancestor_nodes([dest]))
+
+                if descendants.intersection(filtered_heads):
+                    # heads are above the target, so this is a downgrade.
+                    # we can treat them as a "merge", single step.
+                    assert not ancestors.intersection(filtered_heads)
+                    todo_heads = [head.revision for head in filtered_heads]
+                    step = migration.StampStep(
+                        todo_heads, dest.revision, False, False)
+                    steps.append(step)
+                    continue
+                elif ancestors.intersection(filtered_heads):
+                    # heads are below the target, so this is an upgrade.
+                    # we can treat them as a "merge", single step.
+                    todo_heads = [head.revision for head in filtered_heads]
+                    step = migration.StampStep(
+                        todo_heads, dest.revision, True, False)
+                    steps.append(step)
+                    continue
+                else:
+                    # destination is in a branch not represented,
+                    # treat it as new branch
+                    step = migration.StampStep((), dest.revision, True, True)
+                    steps.append(step)
+                    continue
+            return steps
 
     def run_env(self):
         """Run the script environment.
index bed46302e5f72b1434e8054eb80231e5da898472..66f1f7e76deee8b79d0a00e0bcb1e5d073430987 100644 (file)
@@ -4,6 +4,15 @@ Changelog
 ==========
 
 .. changelog::
+    :version: 0.7.5
+
+    .. change::
+      :tags: feature, versioning
+      :tickets: 267
+
+      Added support for "alembic stamp" to work when given "heads" as an
+      argument, when multiple heads are present.
+
     :version: 0.7.4
     :released: January 12, 2015
 
index d5261b4a1fe4fe54683a9473f0440ebb9a475823..8aef0bf7c8306d9bcbc7b8f28ced619db7d89d7e 100644 (file)
@@ -196,6 +196,12 @@ class RevisionPathTest(MigrationTest):
             self.env._downgrade_revs, self.c.revision[0:4], self.b.revision
         )
 
+    def test_stamp_to_base(self):
+        revs = self.env._stamp_revs("base", self.d.revision)
+        eq_(len(revs), 1)
+        assert revs[0].should_delete_branch
+        eq_(revs[0].delete_version_num, self.d.revision)
+
 
 class BranchedPathTest(MigrationTest):
 
@@ -562,6 +568,17 @@ class ForestTest(MigrationTest):
              self.up_(self.a1), self.up_(self.b1)]
         )
 
+    def test_stamp_to_heads(self):
+        a1, b1, a2, b2 = self.a1, self.b1, self.a2, self.b2
+        revs = self.env._stamp_revs("heads", ())
+        eq_(len(revs), 2)
+        #import pdb
+        #pdb.set_trace()
+        eq_(
+            set(r.to_revisions for r in revs),
+            set([(self.b1.revision,), (self.b2.revision,)])
+        )
+
 
 class MergedPathTest(MigrationTest):