From: Mike Bayer Date: Mon, 14 Nov 2011 23:12:04 +0000 (-0500) Subject: get env to have all the arguments before and after context is set up X-Git-Tag: rel_0_1_0~59 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c8bcd152b8ac822a85f4e1539b3748c4461bdceb;p=thirdparty%2Fsqlalchemy%2Falembic.git get env to have all the arguments before and after context is set up --- diff --git a/alembic/context.py b/alembic/context.py index 4cf36284..842c4db6 100644 --- a/alembic/context.py +++ b/alembic/context.py @@ -225,6 +225,11 @@ def _opts(cfg, script, **kw): _script = script config = cfg +def _clear(): + global _context_opts, _context, _script + _context = _script = None + _context_opts = {} + def requires_connection(): """Return True if the current migrations environment should have an active database connection. @@ -234,11 +239,7 @@ def requires_connection(): def get_head_revision(): """Return the value of the 'head' revision.""" - rev = _script._get_rev('head') - if rev is not None: - return rev.revision - else: - return None + return _script._as_rev_number("head") def get_starting_revision_argument(): """Return the 'starting revision' argument, @@ -247,7 +248,12 @@ def get_starting_revision_argument(): This is only usable in "offline" mode. """ - return get_context()._start_from_rev + if _context is not None: + return _script._as_rev_number(get_context()._start_from_rev) + elif 'starting_rev' in _context_opts: + return _script._as_rev_number(_context_opts['starting_rev']) + else: + raise util.CommandError("No starting revision argument is available.") def get_revision_argument(): """Get the 'destination' revision argument. @@ -257,11 +263,17 @@ def get_revision_argument(): as is 'base' which is translated to None. """ - return get_context().destination_rev + if _context is not None: + return _script._as_rev_number(get_context().destination_rev) + else: + return _script._as_rev_number(_context_opts['destination_rev']) def get_tag_argument(): """Return the value passed for the ``--tag`` argument, if any.""" - return get_context().tag + if _context is not None: + return get_context().tag + else: + return _context_opts.get('tag', None) def configure( connection=None, diff --git a/alembic/script.py b/alembic/script.py index 64bc2269..4c4b3659 100644 --- a/alembic/script.py +++ b/alembic/script.py @@ -49,15 +49,19 @@ class ScriptDirectory(object): yield sc def _get_rev(self, id_): - if id_ == 'head': - id_ = self._current_head() - elif id_ == 'base': - id_ = None + id_ = self._as_rev_number(id_) try: return self._revision_map[id_] except KeyError: raise util.CommandError("No such revision %s" % id_) + def _as_rev_number(self, id_): + if id_ == 'head': + id_ = self._current_head() + elif id_ == 'base': + id_ = None + return id_ + def _revs(self, upper, lower): lower = self._get_rev(lower) upper = self._get_rev(upper) diff --git a/alembic/templates/pylons/env.py b/alembic/templates/pylons/env.py index e202cea5..799ba025 100644 --- a/alembic/templates/pylons/env.py +++ b/alembic/templates/pylons/env.py @@ -4,19 +4,19 @@ Place 'pylons_config_file' into alembic.ini, and the application will be loaded from there. """ -from alembic import options, context +from alembic import config, context from paste.deploy import loadapp import logging try: # if pylons app already in, don't create a new app - from pylons import config - config['__file__'] + from pylons import config as pylons_config + pylons_config['__file__'] except: # can use config['__file__'] here, i.e. the Pylons # ini file, instead of alembic.ini - config_file = options.get_main_option('pylons_config_file') - config_file = options.config_file_name + config_file = config.get_main_option('pylons_config_file') + config_file = config.config_file_name logging.config.fileConfig(config_file) wsgi_app = loadapp('config:%s' % config_file, relative_to='.') diff --git a/docs/build/tutorial.rst b/docs/build/tutorial.rst index af84b3af..4b64ea11 100644 --- a/docs/build/tutorial.rst +++ b/docs/build/tutorial.rst @@ -519,10 +519,10 @@ treat a local file in the same way ``alembic_version`` works:: if not context.requires_connection(): version_file = os.path.join(os.path.dirname(config.config_file_name), "version.txt")) current_version = file_(version_file).read() - context.configure(dialect_name=engine.name, current_version=current_version) - start, end = context.run_migrations() - if end: - file_(version_file, 'w').write(end) + context.configure(dialect_name=engine.name, starting_version=current_version) + end_version = context.get_revision_argument() + context.run_migrations() + file_(version_file, 'w').write(end) Writing Migration Scripts to Support Script Generation ------------------------------------------------------ diff --git a/tests/__init__.py b/tests/__init__.py index 8788f1f3..68c72220 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -55,6 +55,14 @@ def ne_(a, b, msg=None): """Assert a != b, with repr messaging on failure.""" assert a != b, msg or "%r == %r" % (a, b) +def assert_raises_message(except_cls, msg, callable_, *args, **kwargs): + try: + callable_(*args, **kwargs) + assert False, "Callable did not raise an exception" + except except_cls, e: + assert re.search(msg, str(e)), "%r !~ %s" % (msg, e) + print str(e) + def _testing_config(): from alembic.config import Config if not os.access(staging_directory, os.F_OK): @@ -193,7 +201,7 @@ def staging_env(create=True): def clear_staging_env(): shutil.rmtree(staging_directory, True) - + context._clear() def three_rev_fixture(cfg): a = util.rev_id() diff --git a/tests/test_offline_environment.py b/tests/test_offline_environment.py index c8847d5f..a71bb99c 100644 --- a/tests/test_offline_environment.py +++ b/tests/test_offline_environment.py @@ -1,72 +1,135 @@ from tests import clear_staging_env, staging_env, \ _no_sql_testing_config, sqlite_db, eq_, ne_, \ - capture_context_buffer, three_rev_fixture, _env_file_fixture + capture_context_buffer, three_rev_fixture, _env_file_fixture,\ + assert_raises_message from alembic import command, util +from unittest import TestCase -def setup(): - global cfg, env - env = staging_env() - cfg = _no_sql_testing_config() - global a, b, c - a, b, c = three_rev_fixture(cfg) +class OfflineEnvironmentTest(TestCase): + def setUp(self): + env = staging_env() + self.cfg = _no_sql_testing_config() -def teardown(): - clear_staging_env() + global a, b, c + a, b, c = three_rev_fixture(self.cfg) -def test_not_requires_connection(): - _env_file_fixture(""" + def tearDown(self): + clear_staging_env() + + def test_not_requires_connection(self): + _env_file_fixture(""" assert not context.requires_connection() """) - command.upgrade(cfg, a, sql=True) - command.downgrade(cfg, a, sql=True) + command.upgrade(self.cfg, a, sql=True) + command.downgrade(self.cfg, a, sql=True) -def test_requires_connection(): - _env_file_fixture(""" + def test_requires_connection(self): + _env_file_fixture(""" assert context.requires_connection() """) - command.upgrade(cfg, a) - command.downgrade(cfg, a) + command.upgrade(self.cfg, a) + command.downgrade(self.cfg, a) -def test_starting_rev(): - _env_file_fixture(""" + def test_starting_rev_post_context(self): + _env_file_fixture(""" context.configure(dialect_name='sqlite', starting_rev='x') assert context.get_starting_revision_argument() == 'x' """) - command.upgrade(cfg, a, sql=True) - command.downgrade(cfg, a, sql=True) + command.upgrade(self.cfg, a, sql=True) + command.downgrade(self.cfg, a, sql=True) + command.current(self.cfg) + command.stamp(self.cfg, a) + def test_starting_rev_pre_context(self): + _env_file_fixture(""" +assert context.get_starting_revision_argument() == 'x' +""") + command.upgrade(self.cfg, "x:y", sql=True) + command.downgrade(self.cfg, "x:y", sql=True) + command.stamp(self.cfg, a) + + def test_starting_rev_current_pre_context(self): + _env_file_fixture(""" +assert context.get_starting_revision_argument() is None +""") + assert_raises_message( + util.CommandError, + "No starting revision argument is available.", + command.current, self.cfg + ) -def test_destination_rev(): - _env_file_fixture(""" + def test_destination_rev_pre_context(self): + _env_file_fixture(""" +assert context.get_revision_argument() == '%s' +""" % b) + command.upgrade(self.cfg, b, sql=True) + command.downgrade(self.cfg, b, sql=True) + command.stamp(self.cfg, b, sql=True) + + def test_destination_rev_post_context(self): + _env_file_fixture(""" context.configure(dialect_name='sqlite') assert context.get_revision_argument() == '%s' """ % b) - command.upgrade(cfg, b, sql=True) - command.downgrade(cfg, b, sql=True) + command.upgrade(self.cfg, b, sql=True) + command.downgrade(self.cfg, b, sql=True) + command.stamp(self.cfg, b, sql=True) + def test_head_rev_pre_context(self): + _env_file_fixture(""" +assert context.get_head_revision() == '%s' +""" % c) + command.upgrade(self.cfg, b, sql=True) + command.downgrade(self.cfg, b, sql=True) + command.stamp(self.cfg, b, sql=True) + command.current(self.cfg) -def test_head_rev(): - _env_file_fixture(""" + def test_head_rev_post_context(self): + _env_file_fixture(""" context.configure(dialect_name='sqlite') assert context.get_head_revision() == '%s' """ % c) - command.upgrade(cfg, b, sql=True) - command.downgrade(cfg, b, sql=True) + command.upgrade(self.cfg, b, sql=True) + command.downgrade(self.cfg, b, sql=True) + command.stamp(self.cfg, b, sql=True) + command.current(self.cfg) -def test_tag_cmd_arg(): - _env_file_fixture(""" + def test_tag_pre_context(self): + _env_file_fixture(""" +assert context.get_tag_argument() == 'hi' +""") + command.upgrade(self.cfg, b, sql=True, tag='hi') + command.downgrade(self.cfg, b, sql=True, tag='hi') + + def test_tag_pre_context_None(self): + _env_file_fixture(""" +assert context.get_tag_argument() is None +""") + command.upgrade(self.cfg, b, sql=True) + command.downgrade(self.cfg, b, sql=True) + + def test_tag_cmd_arg(self): + _env_file_fixture(""" context.configure(dialect_name='sqlite') assert context.get_tag_argument() == 'hi' """) - command.upgrade(cfg, b, sql=True, tag='hi') - command.downgrade(cfg, b, sql=True, tag='hi') + command.upgrade(self.cfg, b, sql=True, tag='hi') + command.downgrade(self.cfg, b, sql=True, tag='hi') -def test_tag_cfg_arg(): - _env_file_fixture(""" + def test_tag_cfg_arg(self): + _env_file_fixture(""" context.configure(dialect_name='sqlite', tag='there') assert context.get_tag_argument() == 'there' """) - command.upgrade(cfg, b, sql=True, tag='hi') - command.downgrade(cfg, b, sql=True, tag='hi') + command.upgrade(self.cfg, b, sql=True, tag='hi') + command.downgrade(self.cfg, b, sql=True, tag='hi') + + def test_tag_None(self): + _env_file_fixture(""" +context.configure(dialect_name='sqlite') +assert context.get_tag_argument() is None +""") + command.upgrade(self.cfg, b, sql=True) + command.downgrade(self.cfg, b, sql=True)