initial collection.
"""
- map_ = {}
+ # Ordering required for some tests to pass (but not required in
+ # general)
+ map_ = sqlautil.OrderedDict()
heads = sqlautil.OrderedSet()
_real_heads = sqlautil.OrderedSet()
return sum([self.get_revisions(id_elem) for id_elem in id_], ())
else:
resolved_id, branch_label = self._resolve_revision_number(id_)
+ if len(resolved_id) == 1:
+ try:
+ rint = int(resolved_id[0])
+ if rint < 0:
+ # branch@-n -> walk down from heads
+ select_heads = self.get_revisions("heads")
+ if branch_label is not None:
+ select_heads = [
+ head
+ for head in select_heads
+ if branch_label in head.branch_labels
+ ]
+ return tuple(
+ self._walk(head, steps=rint)
+ for head in select_heads
+ )
+ except ValueError:
+ # couldn't resolve as integer
+ pass
return tuple(
self._revision_for_ident(rev_id, branch_label)
for rev_id in resolved_id
else:
return util.to_tuple(id_, default=None), branch_label
- def _relative_iterate(
- self,
- destination,
- source,
- is_upwards,
- implicit_base,
- inclusive,
- assert_relative_length,
- ):
- if isinstance(destination, compat.string_types):
- match = _relative_destination.match(destination)
- if not match:
- return None
- else:
- return None
-
- relative = int(match.group(3))
- symbol = match.group(2)
- branch_label = match.group(1)
-
- reldelta = 1 if inclusive and not symbol else 0
-
- if is_upwards:
- if branch_label:
- from_ = "%s@head" % branch_label
- elif symbol:
- if symbol.startswith("head"):
- from_ = symbol
- else:
- from_ = "%s@head" % symbol
- else:
- from_ = "head"
- to_ = source
- else:
- if branch_label:
- to_ = "%s@base" % branch_label
- elif symbol:
- to_ = "%s@base" % symbol
- else:
- to_ = "base"
- from_ = source
-
- revs = list(
- self._iterate_revisions(
- from_, to_, inclusive=inclusive, implicit_base=implicit_base
- )
- )
-
- if symbol:
- if branch_label:
- symbol_rev = self.get_revision(
- "%s@%s" % (branch_label, symbol)
- )
- else:
- symbol_rev = self.get_revision(symbol)
- if symbol.startswith("head"):
- index = 0
- elif symbol == "base":
- index = len(revs) - 1
- else:
- range_ = compat.range(len(revs) - 1, 0, -1)
- for index in range_:
- if symbol_rev.revision == revs[index].revision:
- break
- else:
- index = 0
- else:
- index = 0
- if is_upwards:
- revs = revs[index - relative - reldelta :]
- if (
- not index
- and assert_relative_length
- and len(revs) < abs(relative - reldelta)
- ):
- raise RevisionError(
- "Relative revision %s didn't "
- "produce %d migrations" % (destination, abs(relative))
- )
- else:
- revs = revs[0 : index - relative + reldelta]
- if (
- not index
- and assert_relative_length
- and len(revs) != abs(relative) + reldelta
- ):
- raise RevisionError(
- "Relative revision %s didn't "
- "produce %d migrations" % (destination, abs(relative))
- )
-
- return iter(revs)
-
def iterate_revisions(
self,
upper,
The iterator yields :class:`.Revision` objects.
"""
+ if select_for_downgrade:
+ fn = self._collect_downgrade_revisions
+ else:
+ fn = self._collect_upgrade_revisions
- relative_upper = self._relative_iterate(
- upper,
- lower,
- True,
- implicit_base,
- inclusive,
- assert_relative_length,
- )
- if relative_upper:
- return relative_upper
-
- relative_lower = self._relative_iterate(
- lower,
- upper,
- False,
- implicit_base,
- inclusive,
- assert_relative_length,
- )
- if relative_lower:
- return relative_lower
-
- return self._iterate_revisions(
+ revisions, heads = fn(
upper,
lower,
inclusive=inclusive,
implicit_base=implicit_base,
- select_for_downgrade=select_for_downgrade,
+ assert_relative_length=assert_relative_length,
)
+ for node in self._topological_sort(revisions, heads):
+ yield self.get_revision(node)
+
def _get_descendant_nodes(
self,
targets,
if rev in seen:
continue
seen.add(rev)
- todo.extend(map_[rev_id] for rev_id in fn(rev))
+ # Check for map errors before collecting.
+ for rev_id in fn(rev):
+ next_rev = map_[rev_id]
+ if next_rev.revision != rev_id:
+ raise RevisionError(
+ "Dependency resolution failed; broken map"
+ )
+ todo.append(next_rev)
yield rev
if check:
overlaps = per_target.intersection(targets).difference(
)
)
- def _iterate_revisions(
- self,
- upper,
- lower,
- inclusive=True,
- implicit_base=False,
- select_for_downgrade=False,
- ):
- """iterate revisions from upper to lower.
-
- The traversal is depth-first within branches, and breadth-first
- across branches as a whole.
+ def _topological_sort(self, revisions, heads):
+ """Yield revision ids of a collection of Revision objects in
+ topological sorted order (i.e. revisions always come after their
+ down_revisions and dependencies). Uses the order of keys in
+ _revision_map to sort.
"""
- requested_lowers = self.get_revisions(lower)
+ id_to_rev = self._revision_map
- # some complexity to accommodate an iteration where some
- # branches are starting from nothing, and others are starting
- # from a given point. Additionally, if the bottom branch
- # is specified using a branch identifier, then we limit operations
- # to just that branch.
+ def get_ancestors(rev_id):
+ return {
+ r.revision
+ for r in self._get_ancestor_nodes([id_to_rev[rev_id]])
+ }
- limit_to_lower_branch = isinstance(
- lower, compat.string_types
- ) and lower.endswith("@base")
+ todo = {d.revision for d in revisions}
- uppers = util.dedupe_tuple(self.get_revisions(upper))
+ # Use revision map (ordered dict) key order to pre-sort.
+ inserted_order = list(self._revision_map)
- if not uppers and not requested_lowers:
- return
+ current_heads = list(
+ sorted(
+ {d.revision for d in heads if d.revision in todo},
+ key=inserted_order.index,
+ )
+ )
- upper_ancestors = set(self._get_ancestor_nodes(uppers, check=True))
+ ancestors_by_idx = [get_ancestors(rev_id) for rev_id in current_heads]
- if limit_to_lower_branch:
- lowers = self.get_revisions(self._get_base_revisions(lower))
- elif implicit_base and requested_lowers:
- lower_ancestors = set(self._get_ancestor_nodes(requested_lowers))
- lower_descendants = set(
- self._get_descendant_nodes(requested_lowers)
- )
- base_lowers = set()
- candidate_lowers = upper_ancestors.difference(
- lower_ancestors
- ).difference(lower_descendants)
- for rev in candidate_lowers:
- # note: the use of _normalized_down_revisions as opposed
- # to _all_down_revisions repairs
- # an issue related to looking at a revision in isolation
- # when updating the alembic_version table (issue #789).
- # however, while it seems likely that using
- # _normalized_down_revisions within traversal is more correct
- # than _all_down_revisions, we don't yet have any case to
- # show that it actually makes a difference.
- for downrev in rev._normalized_down_revisions:
- if self._revision_map[downrev] in candidate_lowers:
- break
+ output = []
+
+ current_candidate_idx = 0
+ while current_heads:
+
+ candidate = current_heads[current_candidate_idx]
+
+ for check_head_index, ancestors in enumerate(ancestors_by_idx):
+ # scan all the heads. see if we can continue walking
+ # down the current branch indicated by current_candidate_idx.
+ if (
+ check_head_index != current_candidate_idx
+ and candidate in ancestors
+ ):
+ current_candidate_idx = check_head_index
+ # nope, another head is dependent on us, they have
+ # to be traversed first
+ break
+ else:
+ # yup, we can emit
+ if candidate in todo:
+ output.append(candidate)
+ todo.remove(candidate)
+
+ # now update the heads with our ancestors.
+
+ candidate_rev = id_to_rev[candidate]
+
+ # immediate ancestor nodes
+ heads_to_add = [
+ r
+ for r in candidate_rev._normalized_down_revisions
+ if r in todo and r not in current_heads
+ ]
+
+ if not heads_to_add:
+ # no ancestors, so remove this head from the list
+ del current_heads[current_candidate_idx]
+ del ancestors_by_idx[current_candidate_idx]
+ current_candidate_idx = max(current_candidate_idx - 1, 0)
else:
- base_lowers.add(rev)
- lowers = base_lowers.union(requested_lowers)
- elif implicit_base:
- base_lowers = set(self.get_revisions(self._real_bases))
- lowers = base_lowers.union(requested_lowers)
- elif not requested_lowers:
- lowers = set(self.get_revisions(self._real_bases))
- else:
- lowers = requested_lowers
- # represents all nodes we will produce
- total_space = set(
- rev.revision for rev in upper_ancestors
- ).intersection(
- rev.revision
- for rev in self._get_descendant_nodes(
- lowers,
- check=True,
- omit_immediate_dependencies=(
- select_for_downgrade and requested_lowers
+ if (
+ not candidate_rev._normalized_resolved_dependencies
+ and len(candidate_rev._versioned_down_revisions) == 1
+ ):
+ current_heads[current_candidate_idx] = heads_to_add[0]
+
+ # for plain movement down a revision line without
+ # any mergepoints, branchpoints, or deps, we
+ # can update the ancestors collection directly
+ # by popping out the candidate we just emitted
+ ancestors_by_idx[current_candidate_idx].discard(
+ candidate
+ )
+
+ else:
+ # otherwise recalculate it again, things get
+ # complicated otherwise. This can possibly be
+ # improved to not run the whole ancestor thing
+ # each time but it was getting complicated
+ current_heads[current_candidate_idx] = heads_to_add[0]
+ current_heads.extend(heads_to_add[1:])
+ ancestors_by_idx[
+ current_candidate_idx
+ ] = get_ancestors(heads_to_add[0])
+ ancestors_by_idx.extend(
+ get_ancestors(head) for head in heads_to_add[1:]
+ )
+
+ assert not todo
+ return output
+
+ def _walk(self, start, steps, branch_label=None, no_overwalk=True):
+ """
+ Walk the requested number of :steps up (steps > 0) or down (steps < 0)
+ the revision tree.
+
+ :branch_label is used to select branches only when walking up.
+
+ If the walk goes past the boundaries of the tree and :no_overwalk is
+ True, None is returned, otherwise the walk terminates early.
+
+ A RevisionError is raised if there is no unambiguous revision to
+ walk to.
+ """
+
+ if isinstance(start, compat.string_types):
+ start = self.get_revision(start)
+
+ for _ in range(abs(steps)):
+ if steps > 0:
+ # Walk up
+ children = [
+ rev
+ for rev in self.get_revisions(
+ self.bases if start is None else start.nextrev
+ )
+ ]
+ if branch_label:
+ children = self.filter_for_lineage(children, branch_label)
+ else:
+ # Walk down
+ if start == "base":
+ children = tuple()
+ else:
+ children = self.get_revisions(
+ self.heads if start is None else start.down_revision
+ )
+ if not children:
+ children = ("base",)
+ if not children:
+ # This will return an invalid result if no_overwalk, otherwise
+ # further steps will stay where we are.
+ return None if no_overwalk else start
+ elif len(children) > 1:
+ raise RevisionError("Ambiguous walk")
+ start = children[0]
+
+ return start
+
+ def _parse_downgrade_target(
+ self, current_revisions, target, assert_relative_length
+ ):
+ """
+ Parse downgrade command syntax :target to retrieve the target revision
+ and branch label (if any) given the :current_revisons stamp of the
+ database.
+
+ Returns a tuple (branch_label, target_revision) where branch_label
+ is a string from the command specifying the branch to consider (or
+ None if no branch given), and target_revision is a Revision object
+ which the command refers to. target_revsions is None if the command
+ refers to 'base'. The target may be specified in absolute form, or
+ relative to :current_revisions.
+ """
+ if target is None:
+ return None, None
+ assert isinstance(
+ target, compat.string_types
+ ), "Expected downgrade target in string form"
+ match = _relative_destination.match(target)
+ if match:
+ branch_label, symbol, relative = match.groups()
+ rel_int = int(relative)
+ if rel_int >= 0:
+ if symbol is None:
+ # Downgrading to current + n is not valid.
+ raise RevisionError(
+ "Relative revision %s didn't "
+ "produce %d migrations" % (relative, abs(rel_int))
+ )
+ # Find target revision relative to given symbol.
+ rev = self._walk(
+ symbol,
+ rel_int,
+ branch_label,
+ no_overwalk=assert_relative_length,
+ )
+ if rev is None:
+ raise RevisionError("Walked too far")
+ return branch_label, rev
+ else:
+ relative_revision = symbol is None
+ if relative_revision:
+ # Find target revision relative to current state.
+ if branch_label:
+ symbol = self.filter_for_lineage(
+ util.to_tuple(current_revisions), branch_label
+ )
+ assert len(symbol) == 1
+ symbol = symbol[0]
+ else:
+ current_revisions = util.to_tuple(current_revisions)
+ if not current_revisions:
+ raise RevisionError(
+ "Relative revision %s didn't "
+ "produce %d migrations"
+ % (relative, abs(rel_int))
+ )
+ # Have to check uniques here for duplicate rows test.
+ if len(set(current_revisions)) > 1:
+ util.warn(
+ "downgrade -1 from multiple heads is "
+ "ambiguous; "
+ "this usage will be disallowed in a future "
+ "release."
+ )
+ symbol = current_revisions[0]
+ # Restrict iteration to just the selected branch when
+ # ambiguous branches are involved.
+ branch_label = symbol
+ # Walk down the tree to find downgrade target.
+ rev = self._walk(
+ start=self.get_revision(symbol)
+ if branch_label is None
+ else self.get_revision("%s@%s" % (branch_label, symbol)),
+ steps=rel_int,
+ no_overwalk=assert_relative_length,
+ )
+ if rev is None:
+ if relative_revision:
+ raise RevisionError(
+ "Relative revision %s didn't "
+ "produce %d migrations" % (relative, abs(rel_int))
+ )
+ else:
+ raise RevisionError("Walked too far")
+ return branch_label, rev
+
+ # No relative destination given, revision specified is absolute.
+ branch_label, _, symbol = target.rpartition("@")
+ if not branch_label:
+ branch_label is None
+ return branch_label, self.get_revision(symbol)
+
+ def _parse_upgrade_target(
+ self, current_revisions, target, assert_relative_length
+ ):
+ """
+ Parse upgrade command syntax :target to retrieve the target revision
+ and given the :current_revisons stamp of the database.
+
+ Returns a tuple of Revision objects which should be iterated/upgraded
+ to. The target may be specified in absolute form, or relative to
+ :current_revisions.
+ """
+ if isinstance(target, compat.string_types):
+ match = _relative_destination.match(target)
+ else:
+ match = None
+
+ if not match:
+ # No relative destination, target is absolute.
+ return self.get_revisions(target)
+
+ current_revisions = util.to_tuple(current_revisions)
+
+ branch_label, symbol, relative = match.groups()
+ relative_str = relative
+ relative = int(relative)
+ if relative > 0:
+ if symbol is None:
+ if not current_revisions:
+ current_revisions = (None,)
+ # Try to filter to a single target (avoid ambiguous branches).
+ start_revs = current_revisions
+ if branch_label:
+ start_revs = self.filter_for_lineage(
+ self.get_revisions(current_revisions), branch_label
+ )
+ if not start_revs:
+ # The requested branch is not a head, so we need to
+ # backtrack to find a branchpoint.
+ active_on_branch = self.filter_for_lineage(
+ self._get_ancestor_nodes(
+ self.get_revisions(current_revisions)
+ ),
+ branch_label,
+ )
+ # Find the tips of this set of revisions (revisions
+ # without children within the set).
+ start_revs = tuple(
+ {rev.revision for rev in active_on_branch}
+ - {
+ down
+ for rev in active_on_branch
+ for down in rev._normalized_down_revisions
+ }
+ )
+ if not start_revs:
+ # We must need to go right back to base to find
+ # a starting point for this branch.
+ start_revs = (None,)
+ if len(start_revs) > 1:
+ raise RevisionError(
+ "Ambiguous upgrade from multiple current revisions"
+ )
+ # Walk up from unique target revision.
+ rev = self._walk(
+ start=start_revs[0],
+ steps=relative,
+ branch_label=branch_label,
+ no_overwalk=assert_relative_length,
+ )
+ if rev is None:
+ raise RevisionError(
+ "Relative revision %s didn't "
+ "produce %d migrations" % (relative_str, abs(relative))
+ )
+ return (rev,)
+ else:
+ # Walk is relative to a given revision, not the current state.
+ return (
+ self._walk(
+ start=self.get_revision(symbol),
+ steps=relative,
+ branch_label=branch_label,
+ no_overwalk=assert_relative_length,
+ ),
+ )
+ else:
+ if symbol is None:
+ # Upgrading to current - n is not valid.
+ raise RevisionError(
+ "Relative revision %s didn't "
+ "produce %d migrations" % (relative, abs(relative))
+ )
+ return (
+ self._walk(
+ start=self.get_revision(symbol)
+ if branch_label is None
+ else self.get_revision("%s@%s" % (branch_label, symbol)),
+ steps=relative,
+ no_overwalk=assert_relative_length,
),
)
+
+ def _collect_downgrade_revisions(
+ self, upper, target, inclusive, implicit_base, assert_relative_length
+ ):
+ """
+ Compute the set of current revisions specified by :upper, and the
+ downgrade target specified by :target. Return all dependents of target
+ which are currently active.
+
+ :inclusive=True includes the target revision in the set
+ """
+
+ branch_label, target_revision = self._parse_downgrade_target(
+ current_revisions=upper,
+ target=target,
+ assert_relative_length=assert_relative_length,
)
+ if target_revision == "base":
+ target_revision = None
+ assert target_revision is None or isinstance(target_revision, Revision)
+
+ # Find candidates to drop.
+ if target_revision is None:
+ # Downgrading back to base: find all tree roots.
+ roots = [
+ rev
+ for rev in self._revision_map.values()
+ if rev is not None and rev.down_revision is None
+ ]
+ elif inclusive:
+ # inclusive implies target revision should also be dropped
+ roots = [target_revision]
+ else:
+ # Downgrading to fixed target: find all direct children.
+ roots = list(self.get_revisions(target_revision.nextrev))
- if not total_space:
- # no nodes. determine if this is an invalid range
- # or not.
- start_from = set(requested_lowers)
- start_from.update(
- self._get_ancestor_nodes(
- list(start_from), include_dependencies=True
+ if branch_label and len(roots) > 1:
+ # Need to filter roots.
+ ancestors = {
+ rev.revision
+ for rev in self._get_ancestor_nodes(
+ [self._resolve_branch(branch_label)],
+ include_dependencies=False,
+ )
+ }
+ # Intersection gives the root revisions we are trying to
+ # rollback with the downgrade.
+ roots = list(
+ self.get_revisions(
+ {rev.revision for rev in roots}.intersection(ancestors)
)
)
- # determine all the current branch points represented
- # by requested_lowers
- start_from = self._filter_into_branch_heads(start_from)
+ # Ensure we didn't throw everything away.
+ if len(roots) == 0:
+ raise RevisionError(
+ "Not a valid downgrade target from current heads"
+ )
- # if the requested start is one of those branch points,
- # then just return empty set
- if start_from.intersection(upper_ancestors):
- return
- else:
- # otherwise, they requested nodes out of
- # order
- raise RangeNotAncestorError(lower, upper)
-
- # organize branch points to be consumed separately from
- # member nodes
- branch_todo = set(
- rev
- for rev in (self._revision_map[rev] for rev in total_space)
- if rev._is_real_branch_point
- and len(total_space.intersection(rev._all_nextrev)) > 1
+ heads = self.get_revisions(upper)
+
+ # Aim is to drop :branch_revision; to do so we also need to drop its
+ # descendents and anything dependent on it.
+ downgrade_revisions = set(
+ self._get_descendant_nodes(
+ roots,
+ include_dependencies=True,
+ omit_immediate_dependencies=False,
+ )
+ )
+ active_revisions = set(
+ self._get_ancestor_nodes(heads, include_dependencies=True)
)
+ # Emit revisions to drop in reverse topological sorted order.
+ downgrade_revisions.intersection_update(active_revisions)
+
+ if implicit_base:
+ # Wind other branches back to base.
+ downgrade_revisions.update(
+ active_revisions.difference(self._get_ancestor_nodes(roots))
+ )
+
+ if not downgrade_revisions:
+ # Empty intersection: target revs are not present.
+ raise RangeNotAncestorError("Nothing to drop", upper)
- # it's not possible for any "uppers" to be in branch_todo,
- # because the ._all_nextrev of those nodes is not in total_space
- # assert not branch_todo.intersection(uppers)
+ return downgrade_revisions, heads
- todo = collections.deque(
- r for r in uppers if r.revision in total_space
+ def _collect_upgrade_revisions(
+ self, upper, lower, inclusive, implicit_base, assert_relative_length
+ ):
+ """
+ Compute the set of required revisions specified by :upper, and the
+ current set of active revisions specified by :lower. Find the
+ difference between the two to compute the required upgrades.
+
+ :inclusive=True includes the current/lower revisions in the set
+
+ :implicit_base=False only returns revisions which are downstream
+ of the current/lower revisions. Dependencies from branches with
+ different bases will not be included.
+ """
+ targets = self._parse_upgrade_target(
+ current_revisions=lower,
+ target=upper,
+ assert_relative_length=assert_relative_length,
)
- # iterate for total_space being emptied out
- total_space_modified = True
- while total_space:
+ assert targets is not None
+ assert type(targets) is tuple, "targets should be a tuple"
+
+ # Handled named bases (e.g. branch@... -> heads should only produce
+ # targets on the given branch)
+ if isinstance(lower, compat.string_types) and "@" in lower:
+ branch, _, _ = lower.partition("@")
+ branch_rev = self.get_revision(branch)
+ if branch_rev is not None and branch_rev.revision == branch:
+ # A revision was used as a label; get its branch instead
+ assert len(branch_rev.branch_labels) == 1
+ branch = next(iter(branch_rev.branch_labels))
+ targets = {
+ need for need in targets if branch in need.branch_labels
+ }
+
+ required_node_set = set(
+ self._get_ancestor_nodes(
+ targets, check=True, include_dependencies=True
+ )
+ ).union(targets)
- if not total_space_modified:
- raise RevisionError(
- "Dependency resolution failed; iteration can't proceed"
- )
- total_space_modified = False
- # when everything non-branch pending is consumed,
- # add to the todo any branch nodes that have no
- # descendants left in the queue
- if not todo:
- todo.extendleft(
- sorted(
- (
- rev
- for rev in branch_todo
- if not rev._all_nextrev.intersection(total_space)
- ),
- # favor "revisioned" branch points before
- # dependent ones
- key=lambda rev: 0 if rev.is_branch_point else 1,
- )
- )
- branch_todo.difference_update(todo)
- # iterate nodes that are in the immediate todo
- while todo:
- rev = todo.popleft()
- total_space.remove(rev.revision)
- total_space_modified = True
-
- # do depth first for elements within branches,
- # don't consume any actual branch nodes
- todo.extendleft(
- [
- self._revision_map[downrev]
- for downrev in reversed(rev._normalized_down_revisions)
- if self._revision_map[downrev] not in branch_todo
- and downrev in total_space
- ]
- )
+ current_revisions = self.get_revisions(lower)
+ if not implicit_base and any(
+ rev not in required_node_set
+ for rev in current_revisions
+ if rev is not None
+ ):
+ raise RangeNotAncestorError(lower, upper)
+ assert (
+ type(current_revisions) is tuple
+ ), "current_revisions should be a tuple"
+
+ # Special case where lower = a relative value (get_revisions can't
+ # find it)
+ if current_revisions and current_revisions[0] is None:
+ _, rev = self._parse_downgrade_target(
+ current_revisions=upper,
+ target=lower,
+ assert_relative_length=assert_relative_length,
+ )
+ if rev == "base":
+ current_revisions = tuple()
+ lower = None
+ else:
+ current_revisions = (rev,)
+ lower = rev.revision
- if not inclusive and rev in requested_lowers:
- continue
- yield rev
+ current_node_set = set(
+ self._get_ancestor_nodes(
+ current_revisions, check=True, include_dependencies=True
+ )
+ ).union(current_revisions)
+
+ needs = required_node_set.difference(current_node_set)
+
+ # Include the lower revision (=current_revisions?) in the iteration
+ if inclusive:
+ needs.update(self.get_revisions(lower))
+ # By default, base is implicit as we want all dependencies returned.
+ # Base is also implicit if lower = base
+ # implicit_base=False -> only return direct downstreams of
+ # current_revisions
+ if current_revisions and not implicit_base:
+ lower_descendents = self._get_descendant_nodes(
+ current_revisions, check=True, include_dependencies=False
+ )
+ needs.intersection_update(lower_descendents)
- assert not branch_todo
+ return needs, targets
class Revision(object):
+from sqlalchemy.testing import util as sqla_testing_util
+
from alembic.script.revision import CycleDetected
from alembic.script.revision import DependencyCycleDetected
from alembic.script.revision import DependencyLoopDetected
)
eq_(
- [r.revision for r in map_._iterate_revisions(("c", "c"), "a")],
- ["c", "b", "a"],
+ [
+ r.revision
+ for r in map_.iterate_revisions(
+ ("c", "c"), "a", inclusive=False
+ )
+ ],
+ # Not inclusive so should not traverse a
+ ["c", "b"],
)
def test_repr_revs(self):
):
if map_ is None:
map_ = self.map
+
+ result = [
+ rev.revision
+ for rev in map_.iterate_revisions(
+ upper,
+ lower,
+ inclusive=inclusive,
+ implicit_base=implicit_base,
+ select_for_downgrade=select_for_downgrade,
+ )
+ ]
+
+ edges = [
+ (rev, child.revision)
+ for child in map_._revision_map.values()
+ if child is not None
+ for rev in child._normalized_down_revisions
+ ]
+
+ assert sqla_testing_util.conforms_partial_ordering(
+ edges, list(reversed(result))
+ )
+
eq_(
- [
- rev.revision
- for rev in map_.iterate_revisions(
- upper,
- lower,
- inclusive=inclusive,
- implicit_base=implicit_base,
- select_for_downgrade=select_for_downgrade,
- )
- ],
+ result,
assertion,
)
)
def test_branch_w_up_relative(self):
- self._assert_iteration(
- "ebranch@+2", "base", ["someothername", "e", "d"]
- )
+ # In the absence of a branch point surely the +2 is relative to base?
+ # So 'someothername' would be referenced by ebranch@+3?
+ self._assert_iteration("ebranch@+2", "base", ["e", "d"])
def test_partial_id_resolve(self):
eq_(self.map.get_revision("ebranch@some").revision, "someothername")
def test_iterate_multiple_branch_to_base(self):
self._assert_iteration(
- ["d3cb2", "cb1"], "base", ["d3cb2", "cb2", "b2", "cb1", "b1", "a"]
+ ["d3cb2", "cb1"], "base", ["cb1", "b1", "d3cb2", "cb2", "b2", "a"]
)
def test_iterate_multiple_heads_single_base(self):
RevisionError,
r"Revision d1cb1 is not an ancestor of revision b1",
list,
- self.map._iterate_revisions("b1", "d1cb1"),
+ self.map.iterate_revisions("b1", "d1cb1"),
)
def test_distinct_branches(self):
RevisionError,
r"Revision b1 is not an ancestor of revision d2cb2",
list,
- self.map._iterate_revisions("d2cb2", "b1"),
+ self.map.iterate_revisions("d2cb2", "b1"),
)
def test_wrong_direction_to_base_as_none(self):
RevisionError,
r"Revision d1cb1 is not an ancestor of revision base",
list,
- self.map._iterate_revisions(None, "d1cb1"),
+ self.map.iterate_revisions(None, "d1cb1"),
)
def test_wrong_direction_to_base_as_empty(self):
RevisionError,
r"Revision d1cb1 is not an ancestor of revision base",
list,
- self.map._iterate_revisions((), "d1cb1"),
+ self.map.iterate_revisions((), "d1cb1"),
)
["merge", "fe1b1"],
"a3",
[
+ "fe1b1",
+ "e1b1",
"merge",
"e2b1",
+ "db1",
+ "cb1",
+ "b1",
"e2b2",
"db2",
"cb2",
"b2",
- "fe1b1",
- "e1b1",
- "db1",
- "cb1",
- "b1",
"a3",
],
)
["merge", "fe1b1"],
"a1",
[
- "merge",
- "e2b1", # e2b1 branch
- "e2b2",
- "db2",
- "cb2",
- "b2", # e2b2 branch
"fe1b1",
"e1b1", # fe1b1 branch
+ "merge",
+ "e2b1", # e2b1 branch
"db1", # fe1b1 and e2b1 branches terminate at db1
"cb1",
"b1", # e2b1 branch continued....might be nicer
# if this was before the e2b2 branch...
+ "e2b2",
+ "db2",
+ "cb2",
+ "b2", # e2b2 branch
"a3", # e2b1 and e2b2 branches terminate at a3
"a2",
"a1", # finish out
["merge", "fe1b1"],
["cb1", "cb2"],
[
+ "fe1b1",
+ "e1b1",
"merge",
"e2b1",
+ "db1",
+ "cb1",
"e2b2",
"db2",
"cb2",
- "fe1b1",
- "e1b1",
- "db1",
- "cb1",
],
)
self._assert_iteration(
["merge", "fe1b1"],
["cb1", "cb2"],
- ["merge", "e2b1", "e2b2", "db2", "fe1b1", "e1b1", "db1"],
+ ["fe1b1", "e1b1", "merge", "e2b1", "db1", "e2b2", "db2"],
inclusive=False,
)
"Requested revision fe1b1 overlaps "
"with other requested revisions",
list,
- self.map._iterate_revisions(["db1", "b2", "fe1b1"], ()),
+ self.map.iterate_revisions(["db1", "b2", "fe1b1"], ()),
)
def test_three_branches_end_multiple_bases_exclusive_blank(self):
["e2b1", "b2", "fe1b1"],
(),
[
- "e2b1",
"b2",
"fe1b1",
"e1b1",
+ "e2b1",
"db1",
"cb1",
"b1",
def test_ancestor_nodes(self):
merge = self.map.get_revision("merge")
eq_(
- set(
+ {
rev.revision
for rev in self.map._get_ancestor_nodes([merge], check=True)
- ),
- set(
- [
- "a1",
- "e2b2",
- "e2b1",
- "cb2",
- "merge",
- "a3",
- "a2",
- "b1",
- "b2",
- "db1",
- "db2",
- "cb1",
- ]
- ),
+ },
+ {
+ "a1",
+ "e2b2",
+ "e2b1",
+ "cb2",
+ "merge",
+ "a3",
+ "a2",
+ "b1",
+ "b2",
+ "db1",
+ "db2",
+ "cb1",
+ },
)
"a1a",
"b1b",
"a1b",
+ "base1",
"mergeb3d2",
"b3",
"a3",
"b2",
"a2",
"base2",
- "base1",
],
)
"a1a",
"b1b",
"a1b",
+ "base1",
"mergeb3d2",
"b3",
"a3",
"b2",
"a2",
"base2",
- "base1",
],
inclusive=False,
)
"a1a",
"b1b",
"a1b",
+ "base1",
"mergeb3d2",
"b3",
"a3",
"b2",
"a2",
"base2",
- "base1",
],
)
def test_detect_invalid_base_selection(self):
assert_raises_message(
RevisionError,
- "Requested revision a2 overlaps with " "other requested revisions",
+ "overlaps with other requested revisions",
list,
- self.map._iterate_revisions(["c2"], ["a2", "b2"]),
+ self.map.iterate_revisions(["c2"], ["a2", "b2"]),
)
def test_heads_to_revs_plus_implicit_base_exclusive(self):
"a1a",
"b1b",
"a1b",
+ "base1",
"mergeb3d2",
"b3",
"a3",
"base3",
"d2",
- "base1",
],
inclusive=False,
implicit_base=True,
"a1a",
"b1b",
"a1b",
+ "base1",
"mergeb3d2",
"b3",
"a3",
"base3",
"d2",
"c2",
- "base1",
],
implicit_base=True,
)
self._assert_iteration(
["b3", "b2"],
"base3",
- ["b3", "a3", "b2", "a2", "base2"],
+ ["b2", "a2", "base2", "b3", "a3"],
inclusive=False,
implicit_base=True,
)
"a1a",
"b1b",
"a1b",
+ "base1",
"d2",
"c2",
"b2",
"b3",
"a3",
"base3",
- "base1",
],
)
"a1a",
"b1b",
"a1b",
+ "base1",
"d2",
"c2",
"b2",
"b3",
"a3",
"base3",
- "base1",
],
select_for_downgrade=True,
)
RevisionError,
r"Revision d2 is not an ancestor of revision b2",
list,
- self.map._iterate_revisions("b2", "d2"),
+ self.map.iterate_revisions("b2", "d2"),
)
def test_different_branch_not_wrong_direction(self):
- self._assert_iteration("b3", "d2", [])
+ # Changed from empty list. Expect this should raise an error in
+ # --sql mode (since there is not a direct path), or in upgrade mode
+ # it should return revision b3, not an empty list.
+ assert_raises_message(
+ RevisionError,
+ r"Revision d2 is not an ancestor of revision b3",
+ list,
+ self.map.iterate_revisions("b3", "d2"),
+ )
def test_we_need_head2_upgrade(self):
# the 2 branch relies on the 3 branch
)
def test_we_need_base2_upgrade(self):
- # consider a downgrade to b_2@base - we
- # want to run through all the "2"s alone, and we're done.
+ # This is an upgrade from base, so deps should be included and
+ # the result should be different to the downgrade case below
self._assert_iteration(
- "heads", "b_2@base", ["d2", "c2", "b2", "a2", "base2"]
+ "heads",
+ "b_2@base",
+ ["d2", "c2", "b2", "a2", "base2", "a3", "base3"],
)
def test_we_need_base2_downgrade(self):
)
def test_we_need_base3_upgrade(self):
- self._assert_iteration(
- "heads", "b_3@base", ["b1b", "d2", "c2", "b3", "a3", "base3"]
- )
+ # branch b_3 has no dependencies, so b1b/d2/c2 not needed
+ self._assert_iteration("heads", "b_3@base", ["b3", "a3", "base3"])
def test_we_need_base3_downgrade(self):
# consider a downgrade to b_3@base - due to the a3 dependency, we
self._assert_iteration("b_1@head", "base", ["c1", "b1", "a1", "base1"])
def test_we_need_base1(self):
+ # b_1 has no dependencies
self._assert_iteration(
"heads",
"b_1@base",
"c1",
"b1",
"a1",
- "d2",
- "c2",
- "d3",
- "c3",
- "b2",
- "a2",
- "base2",
"base1",
],
)
def test_we_need_base2(self):
+ # base2 depends on base1, nobody depends on b_3 so removed d3,c3
self._assert_iteration(
- "heads", "b_2@base", ["d2", "c2", "d3", "c3", "b2", "a2", "base2"]
+ "heads", "b_2@base", ["d2", "c2", "b2", "a2", "base2", "base1"]
)
def test_we_need_base3(self):
+ # c3 depends on b2 -> add b2,a2,base2, base2 depends on base1
+ self._assert_iteration(
+ "heads",
+ "b_3@base",
+ ["d3", "c3", "b3", "a3", "base3", "b2", "a2", "base2", "base1"],
+ )
+
+
+class MultipleBaseCrossDependencyTestThree(DownIterateTest):
+ def setUp(self):
+ self.map = RevisionMap(
+ lambda: [
+ Revision("base1", ()),
+ Revision("a1", "base1"),
+ Revision("b1", "a1"),
+ Revision("c2", (), dependencies="a1"),
+ Revision("c3", "c2"),
+ ]
+ )
+
+ def test_traverse_no_parent_but_a_dep(self):
self._assert_iteration(
- "heads", "b_3@base", ["d3", "c3", "b3", "a3", "base3"]
+ "heads",
+ "base",
+ ["b1", "c3", "c2", "a1", "base1"],
)
from alembic.migration import MigrationStep
from alembic.testing import assert_raises_message
from alembic.testing import eq_
+from alembic.testing import expect_warnings
from alembic.testing import mock
from alembic.testing.env import clear_staging_env
from alembic.testing.env import staging_env
set([self.c2.revision]),
)
- def test_relative_downgrade(self):
+ def test_relative_downgrade_baseplus2(self):
+ """ base+2 points to b, no branch label, drop everything above b. """
+ self._assert_downgrade(
+ "base+2",
+ [self.d2.revision, self.d1.revision],
+ [
+ self.down_(self.d1),
+ self.down_(self.c1),
+ self.down_(self.d2),
+ self.down_(self.c2),
+ ],
+ set([self.b.revision]),
+ )
+ def test_relative_downgrade_branchplus2(self):
+ """
+ Correct behaviour (per
+ https://github.com/sqlalchemy/alembic/pull/763#issuecomment-738741297)
+ Only the c2branch should be downgraded, right back to base+2 = b
+ """
self._assert_downgrade(
"c2branch@base+2",
[self.d2.revision, self.d1.revision],
- [self.down_(self.d2), self.down_(self.c2), self.down_(self.d1)],
+ [self.down_(self.d2), self.down_(self.c2)],
+ set([self.d1.revision]),
+ )
+
+ def test_relative_downgrade_branchplus3(self):
+ """ c2branch@base+3 equivalent to c2. """
+ self._assert_downgrade(
+ self.c2.revision,
+ [self.d2.revision, self.d1.revision],
+ [self.down_(self.d2)],
+ set([self.d1.revision, self.c2.revision]),
+ )
+ self._assert_downgrade(
+ "c2branch@base+3",
+ [self.d2.revision, self.d1.revision],
+ [self.down_(self.d2)],
+ set([self.d1.revision, self.c2.revision]),
+ )
+
+ # Old downgrade -1 behaviour depends on order of branch upgrades.
+ # This should probably fail (ambiguous) but is currently documented
+ # as a key use case in branching.
+
+ def test_downgrade_once_order_right(self):
+ with expect_warnings("downgrade -1 from multiple heads is ambiguous;"):
+ self._assert_downgrade(
+ "-1",
+ [self.d2.revision, self.d1.revision],
+ [self.down_(self.d2)],
+ set([self.d1.revision, self.c2.revision]),
+ )
+
+ def test_downgrade_once_order_right_unbalanced(self):
+ with expect_warnings("downgrade -1 from multiple heads is ambiguous;"):
+ self._assert_downgrade(
+ "-1",
+ [self.c2.revision, self.d1.revision],
+ [self.down_(self.c2)],
+ set([self.d1.revision]),
+ )
+
+ def test_downgrade_once_order_left(self):
+ with expect_warnings("downgrade -1 from multiple heads is ambiguous;"):
+ self._assert_downgrade(
+ "-1",
+ [self.d1.revision, self.d2.revision],
+ [self.down_(self.d1)],
+ set([self.d2.revision, self.c1.revision]),
+ )
+
+ def test_downgrade_once_order_left_unbalanced(self):
+ with expect_warnings("downgrade -1 from multiple heads is ambiguous;"):
+ self._assert_downgrade(
+ "-1",
+ [self.c1.revision, self.d2.revision],
+ [self.down_(self.c1)],
+ set([self.d2.revision]),
+ )
+
+ def test_downgrade_once_order_left_unbalanced_labelled(self):
+ self._assert_downgrade(
+ "c1branch@-1",
+ [self.d1.revision, self.d2.revision],
+ [self.down_(self.d1)],
+ set([self.c1.revision, self.d2.revision]),
+ )
+
+ # Captures https://github.com/sqlalchemy/alembic/issues/765
+
+ def test_downgrade_relative_order_right(self):
+ self._assert_downgrade(
+ "{}-1".format(self.d2.revision),
+ [self.d2.revision, self.c1.revision],
+ [self.down_(self.d2)],
+ set([self.c1.revision, self.c2.revision]),
+ )
+
+ def test_downgrade_relative_order_left(self):
+ self._assert_downgrade(
+ "{}-1".format(self.d2.revision),
+ [self.c1.revision, self.d2.revision],
+ [self.down_(self.d2)],
+ set([self.c1.revision, self.c2.revision]),
+ )
+
+ def test_downgrade_single_branch_c1branch(self):
+ """ Use branch label to specify the branch to downgrade. """
+ self._assert_downgrade(
+ "c1branch@{}".format(self.b.revision),
+ (self.c1.revision, self.d2.revision),
+ [
+ self.down_(self.c1),
+ ],
+ set([self.d2.revision]),
+ )
+
+ def test_downgrade_single_branch_c1branch_from_d1_head(self):
+ """Use branch label to specify the branch (where the branch label is
+ not on the head revision)."""
+ self._assert_downgrade(
+ "c2branch@{}".format(self.b.revision),
+ (self.c1.revision, self.d2.revision),
+ [
+ self.down_(self.d2),
+ self.down_(self.c2),
+ ],
set([self.c1.revision]),
)
+ def test_downgrade_single_branch_c2(self):
+ """Use a revision on the branch (not head) to specify the branch."""
+ self._assert_downgrade(
+ "{}@{}".format(self.c2.revision, self.b.revision),
+ (self.d1.revision, self.d2.revision),
+ [
+ self.down_(self.d2),
+ self.down_(self.c2),
+ ],
+ set([self.d1.revision]),
+ )
+
+ def test_downgrade_single_branch_d1(self):
+ """ Use the head revision to specify the branch. """
+ self._assert_downgrade(
+ "{}@{}".format(self.d1.revision, self.b.revision),
+ (self.d1.revision, self.d2.revision),
+ [
+ self.down_(self.d1),
+ self.down_(self.c1),
+ ],
+ set([self.d2.revision]),
+ )
+
+ def test_downgrade_relative_to_branch_head(self):
+ self._assert_downgrade(
+ "c1branch@head-1",
+ (self.d1.revision, self.d2.revision),
+ [self.down_(self.d1)],
+ set([self.c1.revision, self.d2.revision]),
+ )
+
+ def test_upgrade_other_branch_from_mergepoint(self):
+ # Advance c2branch forward by one, meaning one past the mergepoint
+ # in this case.
+ self._assert_upgrade(
+ "c2branch@+1",
+ (self.c1.revision),
+ [self.up_(self.c2)],
+ set([self.c1.revision, self.c2.revision]),
+ )
+
+ def test_upgrade_one_branch_of_heads(self):
+ # Still a bit of ambiguity here ... does this mean an absolute
+ # revision "goto revision c2 (labelled c2branch), +1", or "move up
+ # one revision from current along c2branch"?
+ self._assert_upgrade(
+ "c2branch@+1",
+ (self.c1.revision, self.c2.revision),
+ [self.up_(self.d2)],
+ set([self.c1.revision, self.d2.revision]),
+ )
+
+ def test_ambiguous_upgrade(self):
+ assert_raises_message(
+ util.CommandError,
+ "Ambiguous upgrade from multiple current revisions",
+ self.env._upgrade_revs,
+ "+1",
+ [self.c1.revision, self.c2.revision],
+ )
+
+ def test_not_a_downgrade(self):
+ assert_raises_message(
+ util.CommandError,
+ "Not a valid downgrade target from current heads",
+ self.env._downgrade_revs,
+ self.d2.revision,
+ [self.d1.revision, self.d2.revision],
+ )
+
+ def test_upgrade_from_base(self):
+ self._assert_upgrade(
+ "base+1", [], [self.up_(self.a)], set([self.a.revision])
+ )
+
+ def test_upgrade_from_base_implicit(self):
+ self._assert_upgrade(
+ "+1", [], [self.up_(self.a)], set([self.a.revision])
+ )
+
+ def test_downgrade_minus1_to_base(self):
+ self._assert_downgrade(
+ "-1", [self.a.revision], [self.down_(self.a)], set()
+ )
+
+ def test_downgrade_minus1_from_base(self):
+ assert_raises_message(
+ util.CommandError,
+ "Relative revision -1 didn't produce 1 migrations",
+ self.env._downgrade_revs,
+ "-1",
+ [],
+ )
+
class BranchFromMergepointTest(MigrationTest):
clear_staging_env()
def test_mergepoint_to_only_one_side_upgrade(self):
-
self._assert_upgrade(
self.d1.revision,
(self.d2.revision, self.b1.revision),
self.cmerge.revision,
]
+ # this ordering can vary a lot based on what
+ # sorting algorithm is in use because it's all
+ # heads
self._assert_downgrade(
"base",
heads,
[
self.down_(self.amerge),
self.down_(self.a1),
- self.down_(self.a2),
- self.down_(self.a3),
self.down_(self.b1),
self.down_(self.b2),
self.down_(self.cmerge),
self.down_(self.c1),
+ self.down_(self.a2),
+ self.down_(self.a3),
self.down_(self.c2),
self.down_(self.c3),
],
self.down_(self.c2), # c2->b, delete branch
],
)
+
+
+class BranchedPathTestCrossDependencies(MigrationTest):
+ @classmethod
+ def setup_class(cls):
+ cls.env = env = staging_env()
+ cls.a = env.generate_revision(util.rev_id(), "->a")
+ cls.b = env.generate_revision(util.rev_id(), "a->b")
+
+ cls.c1 = env.generate_revision(
+ util.rev_id(), "b->c1", branch_labels="c1branch", refresh=True
+ )
+ cls.d1 = env.generate_revision(util.rev_id(), "c1->d1")
+
+ cls.c2 = env.generate_revision(
+ util.rev_id(),
+ "b->c2",
+ branch_labels="c2branch",
+ head=cls.b.revision,
+ splice=True,
+ )
+ cls.d2 = env.generate_revision(
+ util.rev_id(),
+ "c2->d2",
+ head=cls.c2.revision,
+ depends_on=(cls.c1.revision,),
+ )
+
+ @classmethod
+ def teardown_class(cls):
+ clear_staging_env()
+
+ def test_downgrade_independent_branch(self):
+ """c2branch depends on c1branch so can be taken down on its own.
+ Current behaviour also takes down the dependency unnecessarily."""
+ self._assert_downgrade(
+ "c2branch@{}".format(self.b.revision),
+ (self.d1.revision, self.d2.revision),
+ [
+ self.down_(self.d2),
+ self.down_(self.c2),
+ ],
+ set([self.d1.revision]),
+ )
+
+ def test_downgrade_branch_dependency(self):
+ """c2branch depends on c1branch so taking down c1branch requires taking
+ down both"""
+ destination = "c1branch@{}".format(self.b.revision)
+ source = self.d1.revision, self.d2.revision
+ revs = self.env._downgrade_revs(destination, source)
+ # Drops c1, d1 as requested, also drops d2 due to dependence on d1.
+ # Full ordering of migrations is not consistent so verify partial
+ # ordering only.
+ rev_ids = [rev.revision.revision for rev in revs]
+ assert set(rev_ids) == {
+ self.c1.revision,
+ self.d1.revision,
+ self.d2.revision,
+ }
+ assert rev_ids.index(self.d1.revision) < rev_ids.index(
+ self.c1.revision
+ )
+ assert rev_ids.index(self.d2.revision) < rev_ids.index(
+ self.c1.revision
+ )
+ # Verify final state.
+ heads = set(util.to_tuple(source, default=()))
+ head = HeadMaintainer(mock.Mock(), heads)
+ for rev in revs:
+ head.update_to_step(rev)
+ eq_(head.heads, set([self.c2.revision]))