it results in any actual database operations)."""
up_revision_id = None
- """Version string corresponding to :attr:`.Revision.revision`."""
+ """Version string corresponding to :attr:`.Revision.revision`.
+
+ In the case of a stamp operation, it is advised to use the
+ :attr:`.MigrationInfo.up_revision_ids` tuple as a stamp operation can
+ make a single movement from one or more branches down to a single
+ branchpoint, in which case there will be multiple "up" revisions.
+
+ .. seealso::
+
+ :attr:`.MigrationInfo.up_revision_ids`
+
+ """
+
+ up_revision_ids = None
+ """Tuple of version strings corresponding to :attr:`.Revision.revision`.
+
+ In the majority of cases, this tuple will be a single value, synonomous
+ with the scalar value of :attr:`.MigrationInfo.up_revision_id`.
+ It can be multiple revision identifiers only in the case of an
+ ``alembic stamp`` operation which is moving downwards from multiple
+ branches down to their common branch point.
+
+ .. versionadded:: 0.9.4
+
+ """
down_revision_ids = None
"""Tuple of strings representing the base revisions of this migration step.
revision_map = None
"""The revision map inside of which this operation occurs."""
- def __init__(self, revision_map, is_upgrade, is_stamp, up_revision,
+ def __init__(self, revision_map, is_upgrade, is_stamp, up_revisions,
down_revisions):
self.revision_map = revision_map
self.is_upgrade = is_upgrade
self.is_stamp = is_stamp
- self.up_revision_id = up_revision
- self.down_revision_ids = util.to_tuple(down_revisions)
+ self.up_revision_ids = util.to_tuple(up_revisions, default=())
+ if self.up_revision_ids:
+ self.up_revision_id = self.up_revision_ids[0]
+ else:
+ # this should never be the case with
+ # "upgrade", "downgrade", or "stamp" as we are always
+ # measuring movement in terms of at least one upgrade version
+ self.up_revision_id = None
+ self.down_revision_ids = util.to_tuple(down_revisions, default=())
@property
def is_migration(self):
@property
def source_revision_ids(self):
"""Active revisions before this migration step is applied."""
- revs = self.down_revision_ids if self.is_upgrade \
- else self.up_revision_id
- return util.to_tuple(revs, default=())
+ return self.down_revision_ids if self.is_upgrade \
+ else self.up_revision_ids
@property
def destination_revision_ids(self):
"""Active revisions after this migration step is applied."""
- revs = self.up_revision_id if self.is_upgrade \
+ return self.up_revision_ids if self.is_upgrade \
else self.down_revision_ids
- return util.to_tuple(revs, default=())
@property
def up_revision(self):
- """Get :attr:`~MigrationInfo.up_revision_id` as a :class:`.Revision`."""
+ """Get :attr:`~.MigrationInfo.up_revision_id` as a :class:`.Revision`."""
return self.revision_map.get_revision(self.up_revision_id)
+ @property
+ def up_revisions(self):
+ """Get :attr:`~.MigrationInfo.up_revision_ids` as a :class:`.Revision`.
+
+ .. versionadded:: 0.9.4
+
+ """
+ return self.revision_map.get_revisions(self.up_revision_ids)
+
@property
def down_revisions(self):
- """Get :attr:`~MigrationInfo.down_revision_ids` as a tuple of
+ """Get :attr:`~.MigrationInfo.down_revision_ids` as a tuple of
:class:`Revisions <.Revision>`."""
return self.revision_map.get_revisions(self.down_revision_ids)
@property
def info(self):
return MigrationInfo(revision_map=self.revision_map,
- up_revision=self.revision.revision,
+ up_revisions=self.revision.revision,
down_revisions=self.revision._all_down_revisions,
is_upgrade=self.is_upgrade, is_stamp=False)
def info(self):
up, down = (self.to_, self.from_) if self.is_upgrade \
else (self.from_, self.to_)
- return MigrationInfo(self.revision_map, up, down, self.is_upgrade,
- True)
+ return MigrationInfo(revision_map=self.revision_map,
+ up_revisions=up,
+ down_revisions=down,
+ is_upgrade=self.is_upgrade,
+ is_stamp=True)
self._test_004_downgrade()
self._test_005_upgrade()
self._test_006_upgrade_again()
+ self._test_007_stamp_upgrade()
def _test_001_revisions(self):
self.a = a = util.rev_id()
assert db.dialect.has_table(db.connect(), 'bar')
assert not db.dialect.has_table(db.connect(), 'bat')
+ def _test_007_stamp_upgrade(self):
+ command.stamp(self.cfg, self.c)
+ db = self.bind
+ assert db.dialect.has_table(db.connect(), 'foo')
+ assert db.dialect.has_table(db.connect(), 'bar')
+ assert not db.dialect.has_table(db.connect(), 'bat')
+
class SourcelessApplyVersionsTest(ApplyVersionsFunctionalTest):
sourceless = True
assert hasattr(kw['ctx'], 'get_current_revision')
step = kw['step']
- assert isinstance(getattr(step, 'is_upgrade', None), bool)
- assert isinstance(getattr(step, 'is_stamp', None), bool)
- assert isinstance(getattr(step, 'is_migration', None), bool)
- assert isinstance(getattr(step, 'up_revision_id', None),
- compat.string_types)
- assert isinstance(getattr(step, 'up_revision', None), Script)
- for revtype in 'down', 'source', 'destination':
+ assert isinstance(step.is_upgrade, bool)
+ assert isinstance(step.is_stamp, bool)
+ assert isinstance(step.is_migration, bool)
+ assert isinstance(step.up_revision_id, compat.string_types)
+ assert isinstance(step.up_revision, Script)
+
+ for revtype in 'up', 'down', 'source', 'destination':
revs = getattr(step, '%s_revisions' % revtype)
assert isinstance(revs, tuple)
for rev in revs: