tempname,
'README')
synopsis = open(readme).next()
- print util.format_opt(tempname, synopsis)
+ print "%s - %s" % (tempname, synopsis)
print "\nTemplates are used via the 'init' command, e.g.:"
print "\n alembic init --template pylons ./scripts"
from alembic import command, util
-from optparse import OptionParser
+from argparse import ArgumentParser
import ConfigParser
import inspect
import os
-import sys
class Config(object):
def __init__(self, file_):
def main(argv):
- # TODO:
- # OK, what's the super option parser library that
- # allows <command> plus command-specfic sub-options,
- # and derives everything from callables ?
- # we're inventing here a bit.
+ def add_options(parser, positional, kwargs):
+ parser.add_argument("-c", "--config",
+ type=str,
+ default="alembic.ini",
+ help="Alternate config file")
+ if 'template' in kwargs:
+ parser.add_argument("-t", "--template",
+ default='generic',
+ type=str,
+ help="Setup template for use with 'init'")
+ if 'message' in kwargs:
+ parser.add_argument("-m", "--message",
+ type=str,
+ help="Message string to use with 'revision'")
+ if 'sql' in kwargs:
+ parser.add_argument("--sql",
+ action="store_true",
+ help="Don't emit SQL to database - dump to "
+ "standard output instead")
+
+ positional_help = {
+ 'directory':"location of scripts directory",
+ 'revision':"revision identifier"
+ }
+ for arg in positional:
+ subparser.add_argument(arg, help=positional_help.get(arg))
+
+ parser = ArgumentParser()
+ subparsers = parser.add_subparsers()
- commands = {}
for fn in [getattr(command, n) for n in dir(command)]:
if inspect.isfunction(fn) and \
fn.__name__[0] != '_' and \
positional = spec[0][1:]
kwarg = []
- commands[fn.__name__] = {
- 'name':fn.__name__,
- 'fn':fn,
- 'positional':positional,
- 'kwargs':kwarg
- }
+ subparser = subparsers.add_parser(
+ fn.__name__,
+ help=fn.__doc__)
+ add_options(subparser, positional, kwarg)
+ subparser.set_defaults(cmd=(fn, positional, kwarg))
- def format_cmd(cmd):
- return "%s %s" % (
- cmd['name'],
- " ".join(["<%s>" % p for p in cmd['positional']])
- )
+ options = parser.parse_args()
- def format_opt(cmd, padding=32):
- opt = format_cmd(cmd)
- return " " + opt + \
- ((padding - len(opt)) * " ") + cmd['fn'].__doc__
-
- parser = OptionParser(
- "usage: %prog [options] <command> [command arguments]\n\n"
- "Available Commands:\n" +
- "\n".join(sorted([
- format_opt(cmd)
- for cmd in commands.values()
- ])) +
- "\n\n<revision> is a hex revision id, 'head' or 'base'."
- )
-
- parser.add_option("-c", "--config",
- type="string",
- default="alembic.ini",
- help="Alternate config file")
- parser.add_option("-t", "--template",
- default='generic',
- type="string",
- help="Setup template for use with 'init'")
- parser.add_option("-m", "--message",
- type="string",
- help="Message string to use with 'revision'")
- parser.add_option("--sql",
- action="store_true",
- help="Dump output to a SQL file")
-
- cmd_line_options, cmd_line_args = parser.parse_args(argv[1:])
-
- if len(cmd_line_args) < 1:
- util.err("no command specified")
-
- cmd = cmd_line_args.pop(0).replace('-', '_')
+ fn, positional, kwarg = options.cmd
+ cfg = Config(options.config)
try:
- cmd_fn = commands[cmd]
- except KeyError:
- util.err("no such command %r" % cmd)
-
- kw = dict(
- (k, getattr(cmd_line_options, k))
- for k in cmd_fn['kwargs']
- )
-
- if len(cmd_line_args) != len(cmd_fn['positional']):
- util.err("Usage: %s %s [options]" % (
- os.path.basename(argv[0]),
- format_cmd(cmd_fn)
- ))
-
- cfg = Config(cmd_line_options.config)
- try:
- cmd_fn['fn'](cfg, *cmd_line_args, **kw)
+ fn(cfg,
+ *[getattr(options, k) for k in positional],
+ **dict((k, getattr(options, k)) for k in kwarg)
+ )
except util.CommandError, e:
util.err(str(e))
from alembic import util
-from sqlalchemy import MetaData, Table, Column, String, literal_column, text
+from sqlalchemy import MetaData, Table, Column, String, literal_column, \
+ text
from sqlalchemy.schema import CreateTable
import logging
def _current_rev(self):
if self.as_sql:
- if not self.connection.dialect.has_table(self.connection, 'alembic_version'):
+ if not self.connection.dialect.has_table(self.connection,
+ 'alembic_version'):
self._exec(CreateTable(_version))
return None
else:
if new is None:
self._exec(_version.delete())
elif old is None:
- self._exec(_version.insert().values(version_num=literal_column("'%s'" % new)))
+ self._exec(_version.insert().
+ values(version_num=literal_column("'%s'" % new))
+ )
else:
- self._exec(_version.update().values(version_num=literal_column("'%s'" % new)))
+ self._exec(_version.update().
+ values(version_num=literal_column("'%s'" % new))
+ )
def run_migrations(self, **kw):
log.info("Context class %s.", self.__class__.__name__)
if isinstance(construct, basestring):
construct = text(construct)
if self.as_sql:
- print unicode(construct.compile(dialect=self.connection.dialect)).replace("\t", " ") + ";"
+ print unicode(
+ construct.compile(dialect=self.connection.dialect)
+ ).replace("\t", " ") + ";"
else:
self.connection.execute(construct)
if nullable is not util.NO_VALUE:
self._exec(base.ColumnNullable(table_name, column_name, nullable))
if server_default is not util.NO_VALUE:
- self._exec(base.ColumnDefault(table_name, column_name, server_default))
+ self._exec(base.ColumnDefault(
+ table_name, column_name, server_default
+ ))
# ... etc
def configure_connection(connection):
global _context
from alembic.ddl import base
- _context = _context_impls.get(connection.dialect.name, DefaultContext)(connection, **_context_opts)
+ _context = _context_impls.get(
+ connection.dialect.name,
+ DefaultContext)(connection, **_context_opts)
def run_migrations(**kw):
_context.run_migrations(**kw)
def create_foreign_key(name, source, referent, local_cols, remote_cols):
get_context().add_constraint(
- _foreign_key_constraint(source, referent, local_cols, remote_cols)
+ _foreign_key_constraint(source, referent,
+ local_cols, remote_cols)
)
def create_unique_constraint(name, source, local_cols):
if script is None:
continue
if script.revision in map_:
- util.warn("Revision %s is present more than once" % script.revision)
+ util.warn("Revision %s is present more than once" %
+ script.revision)
map_[script.revision] = script
for rev in map_.values():
if rev.down_revision is None:
script = Script.from_path(path)
old = self._revision_map[script.revision]
if old.down_revision != script.down_revision:
- raise Exception("Can't change down_revision on a refresh operation.")
+ raise Exception("Can't change down_revision "
+ "on a refresh operation.")
self._revision_map[script.revision] = script
script.nextrev = old.nextrev
script = Script.from_path(path)
self._revision_map[script.revision] = script
if script.down_revision:
- self._revision_map[script.down_revision].add_nextrev(script.revision)
+ self._revision_map[script.down_revision].\
+ add_nextrev(script.revision)
return script
class Script(object):
class memoized_property(object):
"""A read-only @property that is only evaluated once."""
+
def __init__(self, fget, doc=None):
self.fget = fget
self.__doc__ = doc or fget.__doc__
import os
import re
+extra = {}
+if sys.version_info >= (3, 0):
+ extra.update(
+ use_2to3=True,
+ )
+
v = open(os.path.join(os.path.dirname(__file__), 'alembic', '__init__.py'))
VERSION = re.compile(r".*__version__ = '(.*?)'", re.S).match(v.read()).group(1)
v.close()
names plus flags.
* Transparent and explicit declaration of all configuration
as well as the engine/transactional environment in which
- migrations run, based on templates which generate the
+ migrations run, starting with templates which generate the
migration environment. The environment can be modified
to suit the specifics of the use case.
* Support for multiple-database configurations, including
migrations that are run for all / some connections.
* Support for running migrations transactionally for
"transactional DDL" backends, which include Postgresql and
- SQLite.
+ Microsoft SQL Server.
* Allowing any series of migrations to be generated as SQL
- scripts.
+ scripts to standard out, instead of emitting to the database.
* Support for branched series of migrations, including the
ability to view branches and "splice" them together.
* The ability to "prune" old migration scripts, setting the
"root" of the system to a newer file.
* The ability to integrate configuration with other frameworks.
- A Pylons template is included which pulls all configuration
- from the Pylons project environment.
+ A sample Pylons template is included which pulls all
+ configuration from the Pylons project environment.
""",
classifiers=[
'Environment :: Console',
'Intended Audience :: Developers',
'Programming Language :: Python',
+ 'Programming Language :: Python :: 3',
'Topic :: Database :: Front-Ends',
],
keywords='SQLAlchemy migrations',
],
entry_points="""
""",
+ **extra
)
fileConfig(config.config_file_name)
-engine = engine_from_config(config.get_section('alembic'), prefix='sqlalchemy.')
+engine = engine_from_config(
+ config.get_section('alembic'), prefix='sqlalchemy.')
connection = engine.connect()
context.configure_connection(connection)
for name in re.split(r',\s*', db_names):
engines[name] = rec = {}
rec['engine'] = engine = \
- engine_from_config(options.get_section(name), prefix='sqlalchemy.')
+ engine_from_config(options.get_section(name),
+ prefix='sqlalchemy.')
rec['connection'] = conn = engine.connect()
-
+
if USE_TWOPHASE:
rec['transaction'] = conn.begin_twophase()
else:
rec['transaction'] = conn.begin()
-
+
try:
for name, rec in engines.items():
context.configure_connection(rec['connection'])
if USE_TWOPHASE:
for rec in engines.values():
rec['transaction'].prepare()
-
+
for rec in engines.values():
rec['transaction'].commit()
except:
def assert_compiled(element, assert_string, dialect=None):
dialect = _get_dialect(dialect)
eq_(
- unicode(element.compile(dialect=dialect)).replace("\n", "").replace("\t", ""),
+ unicode(element.compile(dialect=dialect)).\
+ replace("\n", "").replace("\t", ""),
assert_string.replace("\n", "").replace("\t", "")
)
"ALTER TABLE footable ADD COLUMN foocol VARCHAR(50) NOT NULL"
)
assert_compiled(
- AddColumn("footable", Column("foocol", String(50), server_default="12")),
+ AddColumn("footable", Column("foocol", String(50),
+ server_default="12")),
"ALTER TABLE footable ADD COLUMN foocol VARCHAR(50) DEFAULT '12'"
)
ColumnNullable("footable", "foocol", False),
"ALTER TABLE footable ALTER COLUMN foocol NOT NULL"
)
-
\ No newline at end of file
eq_(script.doc, "this is a message")
eq_(script.revision, abc)
eq_(script.down_revision, None)
- assert os.access(os.path.join(env.dir, 'versions', '%s.py' % abc), os.F_OK)
+ assert os.access(
+ os.path.join(env.dir, 'versions', '%s.py' % abc), os.F_OK)
assert callable(script.module.upgrade)
eq_(env._get_heads(), [abc])
from sqlalchemy import Integer
def test_foreign_key():
- fk = op._foreign_key_constraint('fk_test', 't1', 't2', ['foo', 'bar'], ['bat', 'hoho'])
+ fk = op._foreign_key_constraint('fk_test', 't1', 't2',
+ ['foo', 'bar'], ['bat', 'hoho'])
assert_compiled(
AddConstraint(fk),
- "ALTER TABLE t1 ADD CONSTRAINT hoho FOREIGN KEY(foo, bar) REFERENCES t2 (bat, hoho)"
+ "ALTER TABLE t1 ADD CONSTRAINT hoho FOREIGN KEY(foo, bar) "
+ "REFERENCES t2 (bat, hoho)"
)
def test_unique_constraint():