"""Initialize a new scripts directory."""
if os.access(directory, os.F_OK):
- util.err("Directory %s already exists" % directory)
+ raise util.CommandException("Directory %s already exists" % directory)
template_dir = os.path.join(config.get_template_directory(),
template)
if not os.access(template_dir, os.F_OK):
- util.err("No such template %r" % template)
+ raise util.CommandException("No such template %r" % template)
util.status("Creating directory %s" % os.path.abspath(directory),
os.makedirs, directory)
context.config = config
script.run_env()
-def revert(config, revision):
+def downgrade(config, revision):
"""Revert to a previous version."""
script = ScriptDirectory.from_config(config)
"""List changeset scripts in chronological order."""
script = ScriptDirectory.from_config(config)
+ heads = set(script._get_heads())
+ base = script._get_rev("base")
+ while heads:
+ todo = set(heads)
+ heads = set()
+ for head in todo:
+ print
+ if head in heads:
+ break
+ for sc in script._revs(head, base):
+ if sc.is_branch_point and sc.revision not in todo:
+ heads.add(sc.revision)
+ break
+ else:
+ print sc
+
+def current(config):
+ """Display the current revision for each database."""
+
+ script = ScriptDirectory.from_config(config)
+ def display_version(rev):
+ print "Current revision for %s: %s" % (
+ util.obfuscate_url_pw(
+ context.get_context().connection.engine.url),
+ script._get_rev(rev))
+ return []
+
+ context._migration_fn = display_version
+ context.config = config
+ script.run_env()
def splice(config, parent, child):
"""'splice' two branches, creating a new revision file."""
def __init__(cls, classname, bases, dict_):
newtype = type.__init__(cls, classname, bases, dict_)
if '__dialect__' in dict_:
- _context_impls[dict_['__dialect__']] = newtype
+ _context_impls[dict_['__dialect__']] = cls
return newtype
_context_impls = {}
import shutil
import re
import inspect
+import datetime
_rev_file = re.compile(r'([a-z0-9]+)\.py$')
_mod_def_re = re.compile(r'(upgrade|downgrade)_([a-z0-9]+)')
% (rev.down_revision, rev))
rev.down_revision = None
else:
- map_[rev.down_revision].nextrev = rev.revision
+ map_[rev.down_revision].add_nextrev(rev.revision)
map_[None] = None
return map_
return None
def _get_heads(self):
- # TODO: keep map sorted chronologically
heads = []
for script in self._revision_map.values():
- if script and script.nextrev is None:
+ if script and script.is_head:
heads.append(script.revision)
return heads
def _get_origin(self):
- # TODO: keep map sorted chronologically
-
for script in self._revision_map.values():
if script.down_revision is None \
and script.revision in self._revision_map:
os.path.join(self.versions, filename),
up_revision=str(revid),
down_revision=current_head,
- message=message if message is not None else ("Alembic revision %s" % revid)
+ create_date=datetime.datetime.now(),
+ message=message if message is not None else ("empty message")
)
script = Script.from_path(self.versions, filename)
self._revision_map[script.revision] = script
if script.down_revision:
- self._revision_map[script.down_revision].nextrev = script.revision
+ self._revision_map[script.down_revision].add_nextrev(script.revision)
return script
class Script(object):
- nextrev = None
+ nextrev = frozenset()
def __init__(self, module, rev_id):
self.module = module
self.revision = rev_id
self.down_revision = getattr(module, 'down_revision', None)
+ @property
+ def doc(self):
+ return re.split(r"\n\n", self.module.__doc__)[0]
+
+ def add_nextrev(self, rev):
+ self.nextrev = self.nextrev.union([rev])
+
+ @property
+ def is_head(self):
+ return not bool(self.nextrev)
+
+ @property
+ def is_branch_point(self):
+ return len(self.nextrev) > 1
+
def __str__(self):
- return "revision %s" % self.revision
+ return "%s -> %s%s%s, %s" % (
+ self.down_revision,
+ self.revision,
+ " (head)" if self.is_head else "",
+ " (branchpoint)" if self.is_branch_point else "",
+ self.doc)
@classmethod
def from_path(cls, dir_, filename):
import os
import textwrap
from sqlalchemy import util
+from sqlalchemy.engine import url
import imp
import warnings
import re
msg(message)
sys.exit(-1)
-
+def obfuscate_url_pw(u):
+ u = url.make_url(u)
+ if u.password:
+ u.password = 'XXXXX'
+ return str(u)
+
def warn(msg):
warnings.warn(msg)
-"""${message}"""
+"""${message}
+
+Revision ID: ${up_revision}
+Revises: ${down_revision}
+Create Date: ${create_date}
+
+"""
# downgrade revision identifier, used by Alembic.
down_revision = ${repr(down_revision)}
-"""${message}"""
+"""${message}
+
+Revision ID: ${up_revision}
+Revises: ${down_revision}
+Create Date: ${create_date}
+
+"""
# downgrade revision identifier, used by Alembic.
down_revision = ${repr(down_revision)}
-"""${message}"""
+"""${message}
+
+Revision ID: ${up_revision}
+Revises: ${down_revision}
+Create Date: ${create_date}
+
+"""
# downgrade revision identifier, used by Alembic.
down_revision = ${repr(down_revision)}
def test_004_rev():
script = env.generate_rev(abc, "this is a message")
- eq_(script.module.__doc__,"this is a message")
+ eq_(script.doc, "this is a message")
eq_(script.revision, abc)
eq_(script.down_revision, None)
assert os.access(os.path.join(env.dir, 'versions', '%s.py' % abc), os.F_OK)
script = env.generate_rev(def_, "this is the next rev")
eq_(script.revision, def_)
eq_(script.down_revision, abc)
- eq_(env._revision_map[abc].nextrev, def_)
+ eq_(env._revision_map[abc].nextrev, set([def_]))
assert script.module.down_revision == abc
assert callable(script.module.upgrade)
assert callable(script.module.downgrade)
env = staging_env(create=False)
abc_rev = env._revision_map[abc]
def_rev = env._revision_map[def_]
- eq_(abc_rev.nextrev, def_)
+ eq_(abc_rev.nextrev, set([def_]))
eq_(abc_rev.revision, abc)
eq_(def_rev.down_revision, abc)
eq_(env._get_heads(), [def_])