def list_templates(config):
"""List available templates"""
-
+
print "Available templates:\n"
for tempname in os.listdir(config.get_template_directory()):
readme = os.path.join(
'README')
synopsis = open(readme).next()
print util.format_opt(tempname, synopsis)
-
+
print "\nTemplates are used via the 'init' command, e.g.:"
print "\n alembic init --template pylons ./scripts"
-
+
def init(config, directory, template='generic'):
"""Initialize a new scripts directory."""
-
+
if os.access(directory, os.F_OK):
raise util.CommandError("Directory %s already exists" % directory)
util.status("Creating directory %s" % os.path.abspath(directory),
os.makedirs, directory)
-
+
versions = os.path.join(directory, 'versions')
util.status("Creating directory %s" % os.path.abspath(versions),
os.makedirs, versions)
script = ScriptDirectory.from_config(config)
script.generate_rev(util.rev_id(), message)
-
+
def upgrade(config, revision, sql=False):
"""Upgrade to a later version."""
as_sql = sql
)
script.run_env()
-
+
def downgrade(config, revision, sql=False):
"""Revert to a previous version."""
-
+
script = ScriptDirectory.from_config(config)
context.opts(
config,
for sc in script.walk_revisions():
if sc.is_branch_point:
print sc
-
+
def current(config):
"""Display the current revision for each database."""
-
+
script = ScriptDirectory.from_config(config)
def display_version(rev):
print "Current revision for %s: %s" % (
context.get_context().connection.engine.url),
script._get_rev(rev))
return []
-
+
context.opts(
config,
fn = display_version
- )
+ )
script.run_env()
-
+
def splice(config, parent, child):
"""'splice' two branches, creating a new revision file."""
-
-
+
+
import inspect
import os
import sys
-
+
class Config(object):
def __init__(self, file_):
self.config_file_name = file_
-
+
@util.memoized_property
def file_config(self):
file_config = ConfigParser.ConfigParser()
file_config.read([self.config_file_name])
return file_config
-
+
def get_template_directory(self):
# TODO: what's the official way to get at
# setuptools-installed datafiles ?
class DefaultContext(object):
__metaclass__ = ContextMeta
__dialect__ = 'default'
-
+
transactional_ddl = False
as_sql = False
-
+
def __init__(self, connection, fn, as_sql=False):
self.connection = connection
self._migrations_fn = fn
self.as_sql = as_sql
-
+
def _current_rev(self):
if self.as_sql:
if not self.connection.dialect.has_table(self.connection, 'alembic_version'):
else:
_version.create(self.connection, checkfirst=True)
return self.connection.scalar(_version.select())
-
+
def _update_current_rev(self, old, new):
if old == new:
return
-
+
if new is None:
self._exec(_version.delete())
elif old is None:
self._exec(_version.insert().values(version_num=literal_column("'%s'" % new)))
else:
self._exec(_version.update().values(version_num=literal_column("'%s'" % new)))
-
+
def run_migrations(self, **kw):
log.info("Context class %s.", self.__class__.__name__)
log.info("Will assume %s DDL.",
if not self.transactional_ddl:
self._update_current_rev(prev_rev, rev)
prev_rev = rev
-
+
if self.transactional_ddl:
self._update_current_rev(current_rev, rev)
-
+
if self.as_sql and self.transactional_ddl:
print "COMMIT;\n"
-
+
def _exec(self, construct):
if isinstance(construct, basestring):
construct = text(construct)
print unicode(construct.compile(dialect=self.connection.dialect)).replace("\t", " ") + ";"
else:
self.connection.execute(construct)
-
+
def execute(self, sql):
self._exec(sql)
-
+
def alter_column(self, table_name, column_name,
nullable=util.NO_VALUE,
server_default=util.NO_VALUE,
name=util.NO_VALUE,
type=util.NO_VALUE
):
-
+
if nullable is not util.NO_VALUE:
self._exec(base.ColumnNullable(table_name, column_name, nullable))
if server_default is not util.NO_VALUE:
self._exec(base.ColumnDefault(table_name, column_name, server_default))
-
+
# ... etc
-
+
def add_constraint(self, const):
self._exec(schema.AddConstraint(const))
global _context_opts, config
_context_opts = kw
config = cfg
-
+
def configure_connection(connection):
global _context
from alembic.ddl import base
_context = _context_impls.get(connection.dialect.name, DefaultContext)(connection, **_context_opts)
-
+
def run_migrations(**kw):
_context.run_migrations(**kw)
class AlterTable(DDLElement):
"""Represent an ALTER TABLE statement.
-
+
Only the string name and optional schema name of the table
is required, not a full Table object.
-
+
"""
def __init__(self, table_name, schema=None):
self.table_name = table_name
def __init__(self, name, column_name, default, schema=None):
super(ColumnDefault, self).__init__(name, column_name, schema=schema)
self.default = default
-
+
class AddColumn(AlterTable):
def __init__(self, name, column, schema=None):
super(AddColumn, self).__init__(name, schema=schema)
def quote_dotted(name, quote):
"""quote the elements of a dotted name"""
-
+
result = '.'.join([quote(x) for x in name.split('.')])
return result
class MySQLContext(DefaultContext):
__dialect__ = 'mysql'
-
+
class PostgresqlContext(DefaultContext):
__dialect__ = 'postgresql'
transactional_ddl = True
-
\ No newline at end of file
type_=util.NO_VALUE
):
"""Issue ALTER COLUMN using the current change context."""
-
+
context.alter_column(table_name, column_name,
nullable=nullable,
server_default=server_default,
name=name
)
t1.append_constraint(f)
-
+
return f
def _unique_constraint(name, source, local_cols):
def _ensure_table_for_fk(metadata, fk):
"""create a placeholder Table object for the referent of a
ForeignKey.
-
+
"""
if isinstance(fk._colspec, basestring):
table_key, cname = fk._colspec.split('.')
def __init__(self, dir):
self.dir = dir
self.versions = os.path.join(self.dir, 'versions')
-
+
if not os.access(dir, os.F_OK):
raise util.CommandError("Path doesn't exist: %r. Please use "
"the 'init' command to create a new "
"scripts folder." % dir)
-
+
@classmethod
def from_config(cls, config):
return ScriptDirectory(
config.get_main_option('script_location'))
-
+
def walk_revisions(self):
"""Iterate through all revisions.
-
+
This is actually a breadth-first tree traversal,
with leaf nodes being heads.
-
+
"""
heads = set(self._get_heads())
base = self._get_rev("base")
break
else:
yield sc
-
+
def _get_rev(self, id_):
if id_ == 'head':
id_ = self._current_head()
return self._revision_map[id_]
except KeyError:
raise util.CommandError("No such revision %s" % id_)
-
+
def _revs(self, upper, lower):
lower = self._get_rev(lower)
upper = self._get_rev(upper)
while script != lower:
yield script
script = self._revision_map[script.down_revision]
-
+
def upgrade_from(self, destination, current_rev):
return [
(script.module.upgrade, script.revision) for script in
reversed(list(self._revs(destination, current_rev)))
]
-
+
def downgrade_to(self, destination, current_rev):
return [
(script.module.downgrade, script.down_revision) for script in
self._revs(current_rev, destination)
]
-
+
def run_env(self):
util.load_python_file(self.dir, 'env.py')
map_[rev.down_revision].add_nextrev(rev.revision)
map_[None] = None
return map_
-
+
def _rev_path(self, rev_id):
filename = "%s.py" % rev_id
return os.path.join(self.versions, filename)
-
+
def write(self, rev_id, content):
path = self._rev_path(rev_id)
file(path, 'w').write(content)
raise Exception("Can't change down_revision on a refresh operation.")
self._revision_map[script.revision] = script
script.nextrev = old.nextrev
-
+
def _current_head(self):
current_heads = self._get_heads()
if len(current_heads) > 1:
return current_heads[0]
else:
return None
-
+
def _get_heads(self):
heads = []
for script in self._revision_map.values():
if script and script.is_head:
heads.append(script.revision)
return heads
-
+
def _get_origin(self):
for script in self._revision_map.values():
if script.down_revision is None \
return script
else:
return None
-
+
def generate_template(self, src, dest, **kw):
util.status("Generating %s" % os.path.abspath(dest),
util.template_to_file,
dest,
**kw
)
-
+
def copy_file(self, src, dest):
util.status("Generating %s" % os.path.abspath(dest),
shutil.copy,
src, dest)
-
+
def generate_rev(self, revid, message):
current_head = self._current_head()
path = self._rev_path(revid)
if script.down_revision:
self._revision_map[script.down_revision].add_nextrev(script.revision)
return script
-
+
class Script(object):
nextrev = frozenset()
-
+
def __init__(self, module, rev_id):
self.module = module
self.revision = rev_id
self.down_revision = getattr(module, 'down_revision', None)
-
+
@property
def doc(self):
return re.split(r"\n\n", self.module.__doc__)[0]
def add_nextrev(self, rev):
self.nextrev = self.nextrev.union([rev])
-
+
@property
def is_head(self):
return not bool(self.nextrev)
-
+
@property
def is_branch_point(self):
return len(self.nextrev) > 1
-
+
def __str__(self):
return "%s -> %s%s%s, %s" % (
self.down_revision,
" (head)" if self.is_head else "",
" (branchpoint)" if self.is_branch_point else "",
self.doc)
-
+
@classmethod
def from_path(cls, path):
dir_, filename = os.path.split(path)
return cls.from_filename(dir_, filename)
-
+
@classmethod
def from_filename(cls, dir_, filename):
m = _rev_file.match(filename)
return None
module = util.load_python_file(dir_, filename)
return Script(module, m.group(1))
-
\ No newline at end of file
class CommandError(Exception):
pass
-
+
try:
width = int(os.environ['COLUMNS'])
except (KeyError, ValueError):
if u.password:
u.password = 'XXXXX'
return str(u)
-
+
def warn(msg):
warnings.warn(msg)
-
+
def msg(msg, newline=True):
lines = textwrap.wrap(msg, width)
if len(lines) > 1:
def load_python_file(dir_, filename):
"""Load a file from the given path as a Python module."""
-
+
module_id = re.sub(r'\W', "_", filename)
path = os.path.join(dir_, filename)
module = imp.load_source(module_id, path, open(path, 'rb'))
def rev_id():
val = int(uuid.uuid4()) % 100000000000000
return hex(val)[2:-1]
-
+
class memoized_property(object):
"""A read-only @property that is only evaluated once."""
def __init__(self, fget, doc=None):
if files:
out.append((root, [os.path.join(root, f) for f in files]))
return out
-
+
setup(name='alembic',
version=VERSION,
description="A database migration tool for SQLAlchemy.",
long_description="""\
Alembic is an open ended migrations tool.
Basic operation involves the creation of script files,
-each representing a version transition for one or more databases.
+each representing a version transition for one or more databases.
The scripts execute within the context of a particular connection
and transactional configuration that is explicitly constructed.
* The ability to integrate configuration with other frameworks.
A Pylons template is included which pulls all configuration
from the Pylons project environment.
-
+
""",
classifiers=[
'Development Status :: 3 - Alpha',
-from sqlalchemy.test.testing import eq_, ne_
from sqlalchemy.util import defaultdict
from sqlalchemy.engine import url, default
import shutil
return default.DefaultDialect()
else:
return _dialects[name]
-
-
+
+
def assert_compiled(element, assert_string, dialect=None):
dialect = _get_dialect(dialect)
eq_(
assert_string.replace("\n", "").replace("\t", "")
)
+def eq_(a, b, msg=None):
+ """Assert a == b, with repr messaging on failure."""
+ assert a == b, msg or "%r != %r" % (a, b)
+
+def ne_(a, b, msg=None):
+ """Assert a != b, with repr messaging on failure."""
+ assert a != b, msg or "%r == %r" % (a, b)
+
def _testing_config():
from alembic.config import Config
if not os.access(staging_directory, os.F_OK):
datefmt = %%H:%%M:%%S
""" % (dir_, dir_))
return cfg
-
+
def sqlite_db():
# sqlite caches table pragma info
# per connection, so create a new
# engine for each assertion
dir_ = os.path.join(staging_directory, 'scripts')
return create_engine('sqlite:///%s/foo.db' % dir_)
-
+
def staging_env(create=True):
from alembic import command, script
cfg = _testing_config()
if create:
command.init(cfg, os.path.join(staging_directory, 'scripts'))
return script.ScriptDirectory.from_config(cfg)
-
+
def clear_staging_env():
shutil.rmtree(staging_directory, True)
-
\ No newline at end of file
abc = util.rev_id()
def_ = util.rev_id()
ne_(abc, def_)
-
+
def test_003_heads():
eq_(env._get_heads(), [])
-
+
def test_004_rev():
script = env.generate_rev(abc, "this is a message")
eq_(script.doc, "this is a message")
assert os.access(os.path.join(env.dir, 'versions', '%s.py' % abc), os.F_OK)
assert callable(script.module.upgrade)
eq_(env._get_heads(), [abc])
-
+
def test_005_nextrev():
script = env.generate_rev(def_, "this is the next rev")
eq_(script.revision, def_)
def test_006_from_clean_env():
# test the environment so far with a
# new ScriptDirectory instance.
-
+
env = staging_env(create=False)
abc_rev = env._revision_map[abc]
def_rev = env._revision_map[def_]
eq_(abc_rev.revision, abc)
eq_(def_rev.down_revision, abc)
eq_(env._get_heads(), [def_])
-
+
def setup():
global env
env = staging_env()
-
+
def teardown():
clear_staging_env()
\ No newline at end of file
c = env.generate_rev(util.rev_id(), None)
d = env.generate_rev(util.rev_id(), None)
e = env.generate_rev(util.rev_id(), None)
-
+
def teardown():
clear_staging_env()
def test_upgrade_path():
-
+
eq_(
env.upgrade_from(e.revision, c.revision),
[
(c.module.upgrade, c.revision),
]
)
-
+
def test_downgrade_path():
eq_(
AddConstraint(fk),
"ALTER TABLE t1 ADD CONSTRAINT hoho FOREIGN KEY(foo, bar) REFERENCES t2 (bat, hoho)"
)
-
+
def test_unique_constraint():
uc = op._unique_constraint('uk_test', 't1', ['foo', 'bar'])
assert_compiled(
AddConstraint(uc),
"ALTER TABLE t1 ADD CONSTRAINT uk_test UNIQUE (foo, bar)"
)
-
+
def test_table():
tb = op._table("some_table",
"FOREIGN KEY(foo_id) REFERENCES foo (id), "
"FOREIGN KEY(foo_bar) REFERENCES foo (bar))"
)
-
+
m = MetaData()
foo = Table('foo', m, Column('id', Integer, primary_key=True))
tb = op._table("some_table",
a = util.rev_id()
b = util.rev_id()
c = util.rev_id()
-
+
script = ScriptDirectory.from_config(cfg)
script.generate_rev(a, None)
script.write(a, """
execute("DROP TABLE bat")
""" % b)
-
-
+
+
def test_002_upgrade():
command.upgrade(cfg, c)
db = sqlite_db()
global cfg, env
env = staging_env()
cfg = _sqlite_testing_config()
-
-
+
+
def teardown():
clear_staging_env()
\ No newline at end of file