are now contextual objects, not modules.
[#19]
+- [feature] The naming of revision files can
+ now be customized to be some combination
+ of "rev id" and "slug", the latter of which
+ is based on the revision message.
+ By default, the pattern "<rev>_<slug>"
+ is used for new files. New script files
+ should include the "revision" variable
+ for this to work, which is part of
+ the newer script.py.mako scripts.
+ [#24]
+
0.1.2
=====
- [bug] fix the config.main() function to honor
here = os.path.abspath(os.path.dirname(self.config_file_name))
else:
here = ""
- file_config = ConfigParser.ConfigParser({'here':here})
+ file_config = ConfigParser.SafeConfigParser({'here':here})
if self.config_file_name:
file_config.read([self.config_file_name])
else:
import inspect
import datetime
-_rev_file = re.compile(r'([a-z0-9]+)\.py$')
+_rev_file = re.compile(r'([a-z0-9A-Z]+)(?:_.*)?\.py$')
+_legacy_rev = re.compile(r'([a-f0-9]+)\.py$')
_mod_def_re = re.compile(r'(upgrade|downgrade)_([a-z0-9]+)')
+_slug_re = re.compile(r'\w+')
+_default_file_template = "%(rev)s_%(slug)s"
class ScriptDirectory(object):
"""Provides operations upon an Alembic script directory.
"""
- def __init__(self, dir):
+ def __init__(self, dir, file_template=_default_file_template):
self.dir = dir
self.versions = os.path.join(self.dir, 'versions')
+ self.file_template = file_template
if not os.access(dir, os.F_OK):
raise util.CommandError("Path doesn't exist: %r. Please use "
@classmethod
def from_config(cls, config):
return ScriptDirectory(
- config.get_main_option('script_location'))
+ config.get_main_option('script_location'),
+ file_template = config.get_main_option(
+ 'file_template',
+ _default_file_template)
+ )
def walk_revisions(self):
"""Iterate through all revisions.
if script is None:
continue
if script.revision in map_:
+ import pdb
+ pdb.set_trace()
util.warn("Revision %s is present more than once" %
script.revision)
map_[script.revision] = script
map_[None] = None
return map_
- def _rev_path(self, rev_id):
- filename = "%s.py" % rev_id
+ def _rev_path(self, rev_id, message):
+ slug = "_".join(_slug_re.findall(message or "")[0:20])
+ filename = "%s.py" % (
+ self.file_template % {'rev':rev_id, 'slug':slug}
+ )
return os.path.join(self.versions, filename)
- def write(self, rev_id, content):
- path = self._rev_path(rev_id)
- with open(path, 'w') as fp:
- fp.write(content)
- pyc_path = util.pyc_file_from_path(path)
- if os.access(pyc_path, os.F_OK):
- os.unlink(pyc_path)
- script = Script.from_path(path)
- old = self._revision_map[script.revision]
- if old.down_revision != script.down_revision:
- raise Exception("Can't change down_revision "
- "on a refresh operation.")
- self._revision_map[script.revision] = script
- script.nextrev = old.nextrev
-
def _current_head(self):
current_heads = self._get_heads()
if len(current_heads) > 1:
def generate_rev(self, revid, message, refresh=False, **kw):
current_head = self._current_head()
- path = self._rev_path(revid)
+ path = self._rev_path(revid, message)
self.generate_template(
os.path.join(self.dir, "script.py.mako"),
path,
"""Represent a single revision file in a ``versions/`` directory."""
nextrev = frozenset()
- def __init__(self, module, rev_id):
+ def __init__(self, module, rev_id, path):
self.module = module
self.revision = rev_id
+ self.path = path
self.down_revision = getattr(module, 'down_revision', None)
@property
if not m:
return None
module = util.load_python_file(dir_, filename)
- return Script(module, m.group(1))
+ if not hasattr(module, "revision"):
+ # attempt to get the revision id from the script name,
+ # this for legacy only
+ m = _legacy_rev.match(filename)
+ if not m:
+ raise util.CommandError(
+ "Could not determine revision id from filename %s. "
+ "Be sure the 'revision' variable is "
+ "declared inside the script." % filename)
+ else:
+ revision = m.group(1)
+ else:
+ revision = module.revision
+ return Script(module, revision, os.path.join(dir_, filename))
"""
-# downgrade revision identifier, used by Alembic.
+# revision identifiers, used by Alembic.
+revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
from alembic import op
"""
-# downgrade revision identifier, used by Alembic.
+# revision identifiers, used by Alembic.
+revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
from alembic import op
"""
-# downgrade revision identifier, used by Alembic.
+# revision identifiers, used by Alembic.
+revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
from alembic import op
import re
import alembic
from alembic.operations import Operations
-from alembic.script import ScriptDirectory
+from alembic.script import ScriptDirectory, Script
from alembic import ddl
import StringIO
from alembic.ddl.impl import _impls
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.util import decorator
import shutil
+import textwrap
staging_directory = os.path.join(os.path.dirname(__file__), 'scratch')
files_directory = os.path.join(os.path.dirname(__file__), 'files')
def clear_staging_env():
shutil.rmtree(staging_directory, True)
+
+def write_script(scriptdir, rev_id, content):
+ old = scriptdir._revision_map[rev_id]
+ path = old.path
+ with open(path, 'w') as fp:
+ fp.write(textwrap.dedent(content))
+ pyc_path = util.pyc_file_from_path(path)
+ if os.access(pyc_path, os.F_OK):
+ os.unlink(pyc_path)
+ script = Script.from_path(path)
+ old = scriptdir._revision_map[script.revision]
+ if old.down_revision != script.down_revision:
+ raise Exception("Can't change down_revision "
+ "on a refresh operation.")
+ scriptdir._revision_map[script.revision] = script
+ script.nextrev = old.nextrev
+
+
def three_rev_fixture(cfg):
a = util.rev_id()
b = util.rev_id()
c = util.rev_id()
script = ScriptDirectory.from_config(cfg)
- script.generate_rev(a, None, refresh=True)
- script.write(a, """
+ script.generate_rev(a, "revision a", refresh=True)
+ write_script(script, a, """
+revision = '%s'
down_revision = None
from alembic import op
def downgrade():
op.execute("DROP STEP 1")
-""")
+""" % a)
- script.generate_rev(b, None, refresh=True)
- script.write(b, """
+ script.generate_rev(b, "revision b", refresh=True)
+ write_script(script, b, """
+revision = '%s'
down_revision = '%s'
from alembic import op
def downgrade():
op.execute("DROP STEP 2")
-""" % a)
+""" % (b, a))
- script.generate_rev(c, None, refresh=True)
- script.write(c, """
+ script.generate_rev(c, "revision c", refresh=True)
+ write_script(script, c, """
+revision = '%s'
down_revision = '%s'
from alembic import op
def downgrade():
op.execute("DROP STEP 3")
-""" % b)
+""" % (c, b))
return a, b, c
\ No newline at end of file
def test_render_drop_table(self):
eq_(
- autogenerate._drop_table(Table("sometable", MetaData()), self.autogen_context),
+ autogenerate._drop_table(Table("sometable", MetaData()),
+ self.autogen_context),
"op.drop_table('sometable')"
)
def test_render_add_column(self):
eq_(
autogenerate._add_column(
- "foo", Column("x", Integer, server_default="5"), self.autogen_context),
+ "foo", Column("x", Integer, server_default="5"),
+ self.autogen_context),
"op.add_column('foo', sa.Column('x', sa.Integer(), "
"server_default='5', nullable=True))"
)
def test_render_drop_column(self):
eq_(
autogenerate._drop_column(
- "foo", Column("x", Integer, server_default="5"), self.autogen_context),
+ "foo", Column("x", Integer, server_default="5"),
+ self.autogen_context),
"op.drop_column('foo', 'x')"
)
from __future__ import with_statement
from tests import op_fixture, db_for_dialect, eq_, staging_env, \
clear_staging_env, _no_sql_testing_config,\
- capture_context_buffer, requires_07
+ capture_context_buffer, requires_07, write_script
from unittest import TestCase
from sqlalchemy import DateTime, MetaData, Table, Column, text, Integer, String
from sqlalchemy.engine.reflection import Inspector
def _inline_enum_script(self):
- self.script.write(self.rid, """
+ write_script(self.script, self.rid, """
+revision = '%s'
down_revision = None
from alembic import op
def downgrade():
op.drop_table("sometable")
-""")
+""" % self.rid)
def _distinct_enum_script(self):
- self.script.write(self.rid, """
+ write_script(self.script, self.rid, """
+revision = '%s'
down_revision = None
from alembic import op
op.drop_table("sometable")
ENUM(name="pgenum").drop(op.get_bind(), checkfirst=False)
-""")
+""" % self.rid)
def test_offline_inline_enum_create(self):
self._inline_enum_script()
eq_(script.revision, abc)
eq_(script.down_revision, None)
assert os.access(
- os.path.join(env.dir, 'versions', '%s.py' % abc), os.F_OK)
+ os.path.join(env.dir, 'versions', '%s_this_is_a_message.py' % abc), os.F_OK)
assert callable(script.module.upgrade)
eq_(env._get_heads(), [abc])
def test_005_nextrev():
script = env.generate_rev(def_, "this is the next rev", refresh=True)
+ assert os.access(
+ os.path.join(env.dir, 'versions', '%s_this_is_the_next_rev.py' % def_), os.F_OK)
eq_(script.revision, def_)
eq_(script.down_revision, abc)
eq_(env._revision_map[abc].nextrev, set([def_]))
from __future__ import with_statement
-from tests import clear_staging_env, staging_env, _sqlite_testing_config, sqlite_db, eq_, ne_
+from tests import clear_staging_env, staging_env, \
+ _sqlite_testing_config, sqlite_db, eq_, ne_, write_script, \
+ assert_raises_message
from alembic import command, util
from alembic.script import ScriptDirectory
import time
+import unittest
+import os
-def test_001_revisions():
- global a, b, c
- a = util.rev_id()
- b = util.rev_id()
- c = util.rev_id()
+class VersioningTest(unittest.TestCase):
+ def test_001_revisions(self):
+ global a, b, c
+ a = util.rev_id()
+ b = util.rev_id()
+ c = util.rev_id()
- script = ScriptDirectory.from_config(cfg)
- script.generate_rev(a, None, refresh=True)
- script.write(a, """
-down_revision = None
+ script = ScriptDirectory.from_config(self.cfg)
+ script.generate_rev(a, None, refresh=True)
+ write_script(script, a, """
+ revision = '%s'
+ down_revision = None
-from alembic import op
+ from alembic import op
-def upgrade():
- op.execute("CREATE TABLE foo(id integer)")
+ def upgrade():
+ op.execute("CREATE TABLE foo(id integer)")
-def downgrade():
- op.execute("DROP TABLE foo")
+ def downgrade():
+ op.execute("DROP TABLE foo")
-""")
+ """ % a)
- script.generate_rev(b, None, refresh=True)
- script.write(b, """
-down_revision = '%s'
+ script.generate_rev(b, None, refresh=True)
+ write_script(script, b, """
+ revision = '%s'
+ down_revision = '%s'
-from alembic import op
+ from alembic import op
-def upgrade():
- op.execute("CREATE TABLE bar(id integer)")
+ def upgrade():
+ op.execute("CREATE TABLE bar(id integer)")
-def downgrade():
- op.execute("DROP TABLE bar")
+ def downgrade():
+ op.execute("DROP TABLE bar")
-""" % a)
+ """ % (b, a))
- script.generate_rev(c, None, refresh=True)
- script.write(c, """
-down_revision = '%s'
+ script.generate_rev(c, None, refresh=True)
+ write_script(script, c, """
+ revision = '%s'
+ down_revision = '%s'
-from alembic import op
+ from alembic import op
-def upgrade():
- op.execute("CREATE TABLE bat(id integer)")
+ def upgrade():
+ op.execute("CREATE TABLE bat(id integer)")
-def downgrade():
- op.execute("DROP TABLE bat")
+ def downgrade():
+ op.execute("DROP TABLE bat")
-""" % b)
+ """ % (c, b))
-def test_002_upgrade():
- command.upgrade(cfg, c)
- db = sqlite_db()
- assert db.dialect.has_table(db.connect(), 'foo')
- assert db.dialect.has_table(db.connect(), 'bar')
- assert db.dialect.has_table(db.connect(), 'bat')
+ def test_002_upgrade(self):
+ command.upgrade(self.cfg, c)
+ db = sqlite_db()
+ assert db.dialect.has_table(db.connect(), 'foo')
+ assert db.dialect.has_table(db.connect(), 'bar')
+ assert db.dialect.has_table(db.connect(), 'bat')
-def test_003_downgrade():
- command.downgrade(cfg, a)
- db = sqlite_db()
- assert db.dialect.has_table(db.connect(), 'foo')
- assert not db.dialect.has_table(db.connect(), 'bar')
- assert not db.dialect.has_table(db.connect(), 'bat')
+ def test_003_downgrade(self):
+ command.downgrade(self.cfg, a)
+ db = sqlite_db()
+ assert db.dialect.has_table(db.connect(), 'foo')
+ assert not db.dialect.has_table(db.connect(), 'bar')
+ assert not db.dialect.has_table(db.connect(), 'bat')
-def test_004_downgrade():
- command.downgrade(cfg, 'base')
- db = sqlite_db()
- assert not db.dialect.has_table(db.connect(), 'foo')
- assert not db.dialect.has_table(db.connect(), 'bar')
- assert not db.dialect.has_table(db.connect(), 'bat')
+ def test_004_downgrade(self):
+ command.downgrade(self.cfg, 'base')
+ db = sqlite_db()
+ assert not db.dialect.has_table(db.connect(), 'foo')
+ assert not db.dialect.has_table(db.connect(), 'bar')
+ assert not db.dialect.has_table(db.connect(), 'bat')
-def test_005_upgrade():
- command.upgrade(cfg, b)
- db = sqlite_db()
- assert db.dialect.has_table(db.connect(), 'foo')
- assert db.dialect.has_table(db.connect(), 'bar')
- assert not db.dialect.has_table(db.connect(), 'bat')
+ def test_005_upgrade(self):
+ command.upgrade(self.cfg, b)
+ db = sqlite_db()
+ assert db.dialect.has_table(db.connect(), 'foo')
+ assert db.dialect.has_table(db.connect(), 'bar')
+ assert not db.dialect.has_table(db.connect(), 'bat')
-def test_006_upgrade_again():
- command.upgrade(cfg, b)
+ def test_006_upgrade_again(self):
+ command.upgrade(self.cfg, b)
-# TODO: test some invalid movements
+ # TODO: test some invalid movements
+ @classmethod
+ def setup_class(cls):
+ cls.env = staging_env()
+ cls.cfg = _sqlite_testing_config()
-def setup():
- global cfg, env
- env = staging_env()
- cfg = _sqlite_testing_config()
+ @classmethod
+ def teardown_class(cls):
+ clear_staging_env()
+class VersionNameTemplateTest(unittest.TestCase):
+ def setUp(self):
+ self.env = staging_env()
+ self.cfg = _sqlite_testing_config()
+
+ def tearDown(self):
+ clear_staging_env()
+
+ def test_option(self):
+ self.cfg.set_main_option("file_template", "myfile_%%(slug)s")
+ script = ScriptDirectory.from_config(self.cfg)
+ a = util.rev_id()
+ script.generate_rev(a, "some message", refresh=True)
+ write_script(script, a, """
+ revision = '%s'
+ down_revision = None
+
+ from alembic import op
+
+ def upgrade():
+ op.execute("CREATE TABLE foo(id integer)")
+
+ def downgrade():
+ op.execute("DROP TABLE foo")
+
+ """ % a)
+
+ script = ScriptDirectory.from_config(self.cfg)
+ rev = script._get_rev(a)
+ eq_(rev.revision, a)
+ eq_(os.path.basename(rev.path), "myfile_some_message.py")
+
+ def test_lookup_legacy(self):
+ self.cfg.set_main_option("file_template", "%%(rev)s")
+ script = ScriptDirectory.from_config(self.cfg)
+ a = util.rev_id()
+ script.generate_rev(a, None, refresh=True)
+ write_script(script, a, """
+ down_revision = None
+
+ from alembic import op
+
+ def upgrade():
+ op.execute("CREATE TABLE foo(id integer)")
+
+ def downgrade():
+ op.execute("DROP TABLE foo")
+
+ """)
+
+ script = ScriptDirectory.from_config(self.cfg)
+ rev = script._get_rev(a)
+ eq_(rev.revision, a)
+ eq_(os.path.basename(rev.path), "%s.py" % a)
+
+ def test_error_on_new_with_missing_revision(self):
+ self.cfg.set_main_option("file_template", "%%(slug)s_%%(rev)s")
+ script = ScriptDirectory.from_config(self.cfg)
+ a = util.rev_id()
+ script.generate_rev(a, "foobar", refresh=True)
+ assert_raises_message(
+ util.CommandError,
+ "Could not determine revision id from filename foobar_%s.py. "
+ "Be sure the 'revision' variable is declared "
+ "inside the script." % a,
+ write_script, script, a, """
+ down_revision = None
+
+ from alembic import op
+
+ def upgrade():
+ op.execute("CREATE TABLE foo(id integer)")
+
+ def downgrade():
+ op.execute("DROP TABLE foo")
+
+ """)
-def teardown():
- clear_staging_env()
\ No newline at end of file