with self._catch_revision_errors():
return self.revision_map.get_revisions(id_)
+ def get_all_current(self, id_):
+ with self._catch_revision_errors():
+ top_revs = set(self.revision_map.get_revisions(id_))
+ top_revs.update(
+ self.revision_map._get_ancestor_nodes(
+ list(top_revs), include_dependencies=True)
+ )
+ top_revs = self.revision_map._filter_into_branch_heads(top_revs)
+ return top_revs
+
def get_revision(self, id_):
"""Return the :class:`.Script` instance with the given rev id.
" (head)" if self._is_real_head else "",
" (effective head)" if self.is_head and
not self._is_real_head else ""
- )
+ )
if tree_indicators:
text += "%s%s" % (
" (branchpoint)" if self.is_branch_point else "",
- " (mergepoint)" if self.is_merge_point else "",
+ " (mergepoint)" if self.is_merge_point else ""
)
if include_doc:
text += ", %s" % self.doc
_sqlite_file_db, write_script, env_file_fixture
from alembic.testing import eq_, assert_raises_message, mock
from alembic import util
+from contextlib import contextmanager
+import re
-class HistoryTest(TestBase):
+class _BufMixin(object):
+ def _buf_fixture(self):
+ # try to simulate how sys.stdout looks - we send it u''
+ # but then it's trying to encode to something.
+ buf = BytesIO()
+ wrapper = TextIOWrapper(buf, encoding='ascii', line_buffering=True)
+ wrapper.getvalue = buf.getvalue
+ return wrapper
+
+
+class HistoryTest(_BufMixin, TestBase):
@classmethod
def setup_class(cls):
]).encode("ascii", "replace").decode("ascii").strip()
)
- def _buf_fixture(self):
- # try to simulate how sys.stdout looks - we send it u''
- # but then it's trying to encode to something.
- buf = BytesIO()
- wrapper = TextIOWrapper(buf, encoding='ascii', line_buffering=True)
- wrapper.getvalue = buf.getvalue
- return wrapper
-
def test_history_full(self):
self.cfg.stdout = buf = self._buf_fixture()
command.history(self.cfg, verbose=True)
self._eq_cmd_output(buf, [self.c, self.b, self.a])
+class CurrentTest(_BufMixin, TestBase):
+
+ @classmethod
+ def setup_class(cls):
+ cls.env = env = staging_env()
+ cls.cfg = _sqlite_testing_config()
+ cls.a1 = env.generate_revision("a1", "a1")
+ cls.a2 = env.generate_revision("a2", "a2")
+ cls.a3 = env.generate_revision("a3", "a3")
+ cls.b1 = env.generate_revision("b1", "b1", head="base")
+ cls.b2 = env.generate_revision("b2", "b2", head="b1", depends_on="a2")
+ cls.b3 = env.generate_revision("b3", "b3", head="b2")
+
+ @classmethod
+ def teardown_class(cls):
+ clear_staging_env()
+
+ @contextmanager
+ def _assert_lines(self, revs):
+ self.cfg.stdout = buf = self._buf_fixture()
+
+ yield
+
+ lines = set([
+ re.match(r'(^.\w)', elem).group(1)
+ for elem in re.split(
+ "\n",
+ buf.getvalue().decode('ascii', 'replace').strip()) if elem])
+
+ eq_(lines, set(revs))
+
+ def test_no_current(self):
+ command.stamp(self.cfg, ())
+ with self._assert_lines([]):
+ command.current(self.cfg)
+
+ def test_plain_current(self):
+ command.stamp(self.cfg, ())
+ command.stamp(self.cfg, self.a3.revision)
+ with self._assert_lines(['a3']):
+ command.current(self.cfg)
+
+ def test_two_heads(self):
+ command.stamp(self.cfg, ())
+ command.stamp(self.cfg, (self.a1.revision, self.b1.revision))
+ with self._assert_lines(['a1', 'b1']):
+ command.current(self.cfg)
+
+ def test_heads_one_is_dependent(self):
+ command.stamp(self.cfg, ())
+ command.stamp(self.cfg, (self.b2.revision, ))
+ with self._assert_lines(['a2', 'b2']):
+ command.current(self.cfg)
+
+ def test_heads_upg(self):
+ command.stamp(self.cfg, (self.b2.revision, ))
+ command.upgrade(self.cfg, (self.b3.revision))
+ with self._assert_lines(['a2', 'b3']):
+ command.current(self.cfg)
+
+
class RevisionTest(TestBase):
def setUp(self):
self.env = staging_env()