import re
import inspect
-_uuid_re = re.compile(r'[a-z0-9]{16}')
-_mod_def_re = re.compile(r'(upgrade|downgrade)_([a-z0-9]{16})')
+_uuid_re = re.compile(r'[a-z0-9]{32}\.py$')
+_mod_def_re = re.compile(r'(upgrade|downgrade)_([a-z0-9]{32})')
class ScriptDirectory(object):
def __init__(self, dir, options):
self.dir = dir
- self.options = otions
+ self.versions = os.path.join(self.dir, 'versions')
+
+ if not os.access(dir, os.F_OK):
+ options.err("Path doesn't exist: %r. Please use "
+ "the 'init' command to create a new "
+ "scripts folder." % dir)
+ self.options = options
@classmethod
def from_options(cls, options):
@util.memoized_property
def _revision_map(self):
- for file_ in os.listdir(self.dir):
- script = Script.from_file(self.dir, file_)
+ map_ = {}
+ for file_ in os.listdir(self.versions):
+ script = Script.from_path(self.versions, file_)
if script is None:
continue
+ if script.upgrade in map_:
+ util.warn("Revision %s is present more than once" % script.upgrade)
map_[script.upgrade] = script
+ for rev in map_.values():
+ if rev.downgrade is None:
+ continue
+ if rev.downgrade not in map_:
+ util.warn("Revision %s referenced from %s is not present"
+ % (rev.downgrade, rev))
+ rev.downgrade = None
+ else:
+ map_[rev.downgrade].nextrev = rev.upgrade
return map_
- def _get_head(self):
+ def _get_heads(self):
# TODO: keep map sorted chronologically
-
+ heads = []
for script in self._revision_map.values():
- if script.upgrade is None \
- and script.downgrade in self._revision_map:
- return script
- else:
- return None
+ if script.nextrev is None:
+ heads.append(script)
+ return heads
def _get_origin(self):
# TODO: keep map sorted chronologically
return None
def generate_template(self, src, dest, **kw):
- util.status("Generating %s" % os.path.abspath(src),
+ util.status("Generating %s" % os.path.abspath(dest),
util.template_to_file,
src,
dest,
util.status("Generating %s" % os.path.abspath(dest),
shutil.copy,
src, dest)
-
- def generate_rev(self, revid):
- current_head = self._get_head()
+ def generate_rev(self, revid, message):
+ current_heads = self._get_heads()
+ if len(current_heads) > 1:
+ raise Exception("Only a single head supported so far...")
+ if current_heads:
+ current_head = current_heads[0]
+ else:
+ current_head = None
self.generate_template(
- os.path.join(self.dir, "script.py.mako",
- up_revision=revid,
- down_revision=current_head.upgrade if current_head else None
- )
+ os.path.join(self.dir, "script.py.mako"),
+ os.path.join(self.versions, "%s.py" % revid.hex),
+ up_revision=str(revid.hex),
+ down_revision=current_head.upgrade if current_head else None,
+ message=message if message is not None else ("Alembic revision %s" % revid.hex)
)
class Script(object):
+ nextrev = None
+
def __init__(self, module):
self.module = module
self.upgrade = self.downgrade = None
self.downgrade = m.group(2)
if not self.downgrade and not self.upgrade:
raise Exception("Script %s has no upgrade or downgrade path" % module)
-
+
+ def __str__(self):
+ return "revision %s" % self.upgrade
+
@classmethod
def from_path(cls, dir_, filename):
if not _uuid_re.match(filename):
return None
-
+
+ print "LOAD PYTHON FILE", filename
module = util.load_python_file(dir_, filename)
return Script(module)
\ No newline at end of file
import textwrap
from sqlalchemy import util
import imp
+import warnings
+import re
NO_VALUE = util.symbol("NO_VALUE")
return " " + opt + \
((padding - len(opt)) * " ") + hlp
-def status(message, fn, *arg, **kw):
- msg(message + "...", False)
+def status(_statmsg, fn, *arg, **kw):
+ msg(_statmsg + "...", False)
try:
ret = fn(*arg, **kw)
sys.stdout.write("done\n")
sys.stdout.write("FAILED\n")
raise
-
+def warn(msg):
+ warnings.warn(msg)
+
def msg(msg, newline=True):
lines = textwrap.wrap(msg, width)
if len(lines) > 1: