]> git.ipfire.org Git - thirdparty/sqlalchemy/alembic.git/commitdiff
get env to have all the arguments before and after context is set up
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 14 Nov 2011 23:12:04 +0000 (18:12 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Mon, 14 Nov 2011 23:12:04 +0000 (18:12 -0500)
alembic/context.py
alembic/script.py
alembic/templates/pylons/env.py
docs/build/tutorial.rst
tests/__init__.py
tests/test_offline_environment.py

index 4cf362842d1feab913bcce63e6e60d2e129cad08..842c4db6ba974cb6a65c5aeb0bb53a3bb40ff2ea 100644 (file)
@@ -225,6 +225,11 @@ def _opts(cfg, script, **kw):
     _script = script
     config = cfg
 
+def _clear():
+    global _context_opts, _context, _script
+    _context = _script = None
+    _context_opts = {}
+
 def requires_connection():
     """Return True if the current migrations environment should have
     an active database connection.
@@ -234,11 +239,7 @@ def requires_connection():
 
 def get_head_revision():
     """Return the value of the 'head' revision."""
-    rev = _script._get_rev('head')
-    if rev is not None:
-        return rev.revision
-    else:
-        return None
+    return _script._as_rev_number("head")
 
 def get_starting_revision_argument():
     """Return the 'starting revision' argument,
@@ -247,7 +248,12 @@ def get_starting_revision_argument():
     This is only usable in "offline" mode.
 
     """
-    return get_context()._start_from_rev
+    if _context is not None:
+        return _script._as_rev_number(get_context()._start_from_rev)
+    elif 'starting_rev' in _context_opts:
+        return _script._as_rev_number(_context_opts['starting_rev'])
+    else:
+        raise util.CommandError("No starting revision argument is available.")
 
 def get_revision_argument():
     """Get the 'destination' revision argument.
@@ -257,11 +263,17 @@ def get_revision_argument():
     as is 'base' which is translated to None.
 
     """
-    return get_context().destination_rev
+    if _context is not None:
+        return _script._as_rev_number(get_context().destination_rev)
+    else:
+        return _script._as_rev_number(_context_opts['destination_rev'])
 
 def get_tag_argument():
     """Return the value passed for the ``--tag`` argument, if any."""
-    return get_context().tag
+    if _context is not None:
+        return get_context().tag
+    else:
+        return _context_opts.get('tag', None)
 
 def configure(
         connection=None,
index 64bc226985cf1e6e0de647eb8b22a1fe898e9127..4c4b3659d791313edf42e0de0949bef9820911b2 100644 (file)
@@ -49,15 +49,19 @@ class ScriptDirectory(object):
                         yield sc
 
     def _get_rev(self, id_):
-        if id_ == 'head':
-            id_ = self._current_head()
-        elif id_ == 'base':
-            id_ = None
+        id_ = self._as_rev_number(id_)
         try:
             return self._revision_map[id_]
         except KeyError:
             raise util.CommandError("No such revision %s" % id_)
 
+    def _as_rev_number(self, id_):
+        if id_ == 'head':
+            id_ = self._current_head()
+        elif id_ == 'base':
+            id_ = None
+        return id_
+
     def _revs(self, upper, lower):
         lower = self._get_rev(lower)
         upper = self._get_rev(upper)
index e202cea5c8a934f27ac396561f884a34be16f865..799ba0256bf8598d03ae5d96a24c81eeea90ddfb 100644 (file)
@@ -4,19 +4,19 @@ Place 'pylons_config_file' into alembic.ini, and the application will
 be loaded from there.
 
 """
-from alembic import options, context
+from alembic import config, context
 from paste.deploy import loadapp
 import logging
 
 try:
     # if pylons app already in, don't create a new app
-    from pylons import config
-    config['__file__']
+    from pylons import config as pylons_config
+    pylons_config['__file__']
 except:
     # can use config['__file__'] here, i.e. the Pylons
     # ini file, instead of alembic.ini
-    config_file = options.get_main_option('pylons_config_file')
-    config_file = options.config_file_name
+    config_file = config.get_main_option('pylons_config_file')
+    config_file = config.config_file_name
     logging.config.fileConfig(config_file)
     wsgi_app = loadapp('config:%s' % config_file, relative_to='.')
 
index af84b3afe8d264ca6eb5f49ae8be8fcaacb2f11d..4b64ea11d1a584a015d0b2037bc0f9dc92e50d78 100644 (file)
@@ -519,10 +519,10 @@ treat a local file in the same way ``alembic_version`` works::
     if not context.requires_connection():
         version_file = os.path.join(os.path.dirname(config.config_file_name), "version.txt"))
         current_version = file_(version_file).read()
-        context.configure(dialect_name=engine.name, current_version=current_version)
-        start, end = context.run_migrations()
-        if end:
-            file_(version_file, 'w').write(end)
+        context.configure(dialect_name=engine.name, starting_version=current_version)
+        end_version = context.get_revision_argument()
+        context.run_migrations()
+        file_(version_file, 'w').write(end)
 
 Writing Migration Scripts to Support Script Generation
 ------------------------------------------------------
index 8788f1f3c9cba4da079f4dd7b8a7e3bd7665dc2d..68c7222056e38007088c777620caa8ff6a0e2abe 100644 (file)
@@ -55,6 +55,14 @@ def ne_(a, b, msg=None):
     """Assert a != b, with repr messaging on failure."""
     assert a != b, msg or "%r == %r" % (a, b)
 
+def assert_raises_message(except_cls, msg, callable_, *args, **kwargs):
+    try:
+        callable_(*args, **kwargs)
+        assert False, "Callable did not raise an exception"
+    except except_cls, e:
+        assert re.search(msg, str(e)), "%r !~ %s" % (msg, e)
+        print str(e)
+
 def _testing_config():
     from alembic.config import Config
     if not os.access(staging_directory, os.F_OK):
@@ -193,7 +201,7 @@ def staging_env(create=True):
 
 def clear_staging_env():
     shutil.rmtree(staging_directory, True)
-
+    context._clear()
 
 def three_rev_fixture(cfg):
     a = util.rev_id()
index c8847d5f27e34fe00f4728932f2cd506e07a74a3..a71bb99c141c662412ee99ab54f249524a2f38b6 100644 (file)
 from tests import clear_staging_env, staging_env, \
     _no_sql_testing_config, sqlite_db, eq_, ne_, \
-    capture_context_buffer, three_rev_fixture, _env_file_fixture
+    capture_context_buffer, three_rev_fixture, _env_file_fixture,\
+    assert_raises_message
 from alembic import command, util
+from unittest import TestCase
 
-def setup():
-    global cfg, env
-    env = staging_env()
-    cfg = _no_sql_testing_config()
 
-    global a, b, c
-    a, b, c = three_rev_fixture(cfg)
+class OfflineEnvironmentTest(TestCase):
+    def setUp(self):
+        env = staging_env()
+        self.cfg = _no_sql_testing_config()
 
-def teardown():
-    clear_staging_env()
+        global a, b, c
+        a, b, c = three_rev_fixture(self.cfg)
 
-def test_not_requires_connection():
-    _env_file_fixture("""
+    def tearDown(self):
+        clear_staging_env()
+
+    def test_not_requires_connection(self):
+        _env_file_fixture("""
 assert not context.requires_connection()
 """)
-    command.upgrade(cfg, a, sql=True)
-    command.downgrade(cfg, a, sql=True)
+        command.upgrade(self.cfg, a, sql=True)
+        command.downgrade(self.cfg, a, sql=True)
 
-def test_requires_connection():
-    _env_file_fixture("""
+    def test_requires_connection(self):
+        _env_file_fixture("""
 assert context.requires_connection()
 """)
-    command.upgrade(cfg, a)
-    command.downgrade(cfg, a)
+        command.upgrade(self.cfg, a)
+        command.downgrade(self.cfg, a)
 
 
-def test_starting_rev():
-    _env_file_fixture("""
+    def test_starting_rev_post_context(self):
+        _env_file_fixture("""
 context.configure(dialect_name='sqlite', starting_rev='x')
 assert context.get_starting_revision_argument() == 'x'
 """)
-    command.upgrade(cfg, a, sql=True)
-    command.downgrade(cfg, a, sql=True)
+        command.upgrade(self.cfg, a, sql=True)
+        command.downgrade(self.cfg, a, sql=True)
+        command.current(self.cfg)
+        command.stamp(self.cfg, a)
 
+    def test_starting_rev_pre_context(self):
+        _env_file_fixture("""
+assert context.get_starting_revision_argument() == 'x'
+""")
+        command.upgrade(self.cfg, "x:y", sql=True)
+        command.downgrade(self.cfg, "x:y", sql=True)
+        command.stamp(self.cfg, a)
+
+    def test_starting_rev_current_pre_context(self):
+        _env_file_fixture("""
+assert context.get_starting_revision_argument() is None
+""")
+        assert_raises_message(
+            util.CommandError,
+            "No starting revision argument is available.",
+            command.current, self.cfg
+        )
 
-def test_destination_rev():
-    _env_file_fixture("""
+    def test_destination_rev_pre_context(self):
+        _env_file_fixture("""
+assert context.get_revision_argument() == '%s'
+""" % b)
+        command.upgrade(self.cfg, b, sql=True)
+        command.downgrade(self.cfg, b, sql=True)
+        command.stamp(self.cfg, b, sql=True)
+
+    def test_destination_rev_post_context(self):
+        _env_file_fixture("""
 context.configure(dialect_name='sqlite')
 assert context.get_revision_argument() == '%s'
 """ % b)
-    command.upgrade(cfg, b, sql=True)
-    command.downgrade(cfg, b, sql=True)
+        command.upgrade(self.cfg, b, sql=True)
+        command.downgrade(self.cfg, b, sql=True)
+        command.stamp(self.cfg, b, sql=True)
 
+    def test_head_rev_pre_context(self):
+        _env_file_fixture("""
+assert context.get_head_revision() == '%s'
+""" % c)
+        command.upgrade(self.cfg, b, sql=True)
+        command.downgrade(self.cfg, b, sql=True)
+        command.stamp(self.cfg, b, sql=True)
+        command.current(self.cfg)
 
-def test_head_rev():
-    _env_file_fixture("""
+    def test_head_rev_post_context(self):
+        _env_file_fixture("""
 context.configure(dialect_name='sqlite')
 assert context.get_head_revision() == '%s'
 """ % c)
-    command.upgrade(cfg, b, sql=True)
-    command.downgrade(cfg, b, sql=True)
+        command.upgrade(self.cfg, b, sql=True)
+        command.downgrade(self.cfg, b, sql=True)
+        command.stamp(self.cfg, b, sql=True)
+        command.current(self.cfg)
 
-def test_tag_cmd_arg():
-    _env_file_fixture("""
+    def test_tag_pre_context(self):
+        _env_file_fixture("""
+assert context.get_tag_argument() == 'hi'
+""")
+        command.upgrade(self.cfg, b, sql=True, tag='hi')
+        command.downgrade(self.cfg, b, sql=True, tag='hi')
+
+    def test_tag_pre_context_None(self):
+        _env_file_fixture("""
+assert context.get_tag_argument() is None
+""")
+        command.upgrade(self.cfg, b, sql=True)
+        command.downgrade(self.cfg, b, sql=True)
+
+    def test_tag_cmd_arg(self):
+        _env_file_fixture("""
 context.configure(dialect_name='sqlite')
 assert context.get_tag_argument() == 'hi'
 """)
-    command.upgrade(cfg, b, sql=True, tag='hi')
-    command.downgrade(cfg, b, sql=True, tag='hi')
+        command.upgrade(self.cfg, b, sql=True, tag='hi')
+        command.downgrade(self.cfg, b, sql=True, tag='hi')
 
-def test_tag_cfg_arg():
-    _env_file_fixture("""
+    def test_tag_cfg_arg(self):
+        _env_file_fixture("""
 context.configure(dialect_name='sqlite', tag='there')
 assert context.get_tag_argument() == 'there'
 """)
-    command.upgrade(cfg, b, sql=True, tag='hi')
-    command.downgrade(cfg, b, sql=True, tag='hi')
+        command.upgrade(self.cfg, b, sql=True, tag='hi')
+        command.downgrade(self.cfg, b, sql=True, tag='hi')
+
+    def test_tag_None(self):
+        _env_file_fixture("""
+context.configure(dialect_name='sqlite')
+assert context.get_tag_argument() is None
+""")
+        command.upgrade(self.cfg, b, sql=True)
+        command.downgrade(self.cfg, b, sql=True)