from sqlalchemy import MetaData, Table, Column, String, literal_column, \
text
from sqlalchemy import schema, create_engine
+from sqlalchemy.engine import url as sqla_url
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import _BindParamClause
import sys
transactional_ddl = False
as_sql = False
- def __init__(self, connection, fn, as_sql=False, output_buffer=sys.stdout):
+ def __init__(self, dialect, connection, fn, as_sql=False,
+ output_buffer=None,
+ transactional_ddl=None):
+ self.dialect = dialect
if as_sql:
self.connection = self._stdout_connection(connection)
assert self.connection is not None
self.connection = connection
self._migrations_fn = fn
self.as_sql = as_sql
- self.output_buffer = output_buffer
+ if output_buffer is None:
+ self.output_buffer = sys.stdout
+ else:
+ self.output_buffer = output_buffer
+ if transactional_ddl is not None:
+ self.transactional_ddl = transactional_ddl
def _current_rev(self):
if self.as_sql:
def run_migrations(self, **kw):
log.info("Context class %s.", self.__class__.__name__)
+ if self.as_sql:
+ log.info("Generating static SQL")
log.info("Will assume %s DDL.",
"transactional" if self.transactional_ddl
else "non-transactional")
else:
self.connection.execute(construct, *args, **kw)
- @property
- def dialect(self):
- return self.connection.dialect
-
def static_output(self, text):
self.output_buffer.write(text + "\n\n")
def dump(construct, *multiparams, **params):
self._exec(construct)
- return create_engine(connection.engine.url,
+ return create_engine("%s://" % self.dialect.name,
strategy="mock", executor=dump)
@property
return compiler.render_literal_bindparam(element, **kw)
_context_opts = {}
+_context = None
def opts(cfg, **kw):
"""Set up options that will be used by the :func:`.configure_connection`
_context_opts.update(kw)
config = cfg
-def configure_connection(connection):
- """Configure the migration environment against a specific
- database connection, an instance of :class:`sqlalchemy.engine.Connection`.
+def requires_connection():
+ """Return True if the current migrations environment should have
+ an active database connection.
+
+ """
+ return not _context_opts.get('as_sql', False)
+
+def configure(
+ connection=None,
+ url=None,
+ dialect_name=None,
+ transactional_ddl=None,
+ output_buffer=None
+ ):
+ """Configure the migration environment.
+
+ The important thing needed here is first a way to figure out
+ what kind of "dialect" is in use. The second is to pass
+ an actual database connection, if one is required.
+
+ If the :func:`requires_connection` function returns False,
+ then no connection is needed here. Otherwise, the
+ object should be an instance of :class:`sqlalchemy.engine.Connection`.
This function is typically called from the ``env.py``
script within a migration environment. It can be called
for which it was called is the one that will be operated upon
by the next call to :func:`.run_migrations`.
+ :param connection: a :class:`sqlalchemy.engine.Connection`. The type of dialect
+ to be used will be derived from this.
+ :param url: a string database url, or a :class:`sqlalchemy.engine.url.URL` object.
+ The type of dialect to be used will be derived from this if ``connection`` is
+ not passed.
+ :param dialect_name: string name of a dialect, such as "postgresql", "mssql", etc.
+ The type of dialect to be used will be derived from this if ``connection``
+ and ``url`` are not passed.
+ :param transactional_ddl: Force the usage of "transactional" DDL on or off;
+ this otherwise defaults to whether or not the dialect in use supports it.
+ :param output_buffer: a file-like object that will be used for textual output
+ when the ``--sql`` option is used to generate SQL scripts. Defaults to
+ ``sys.stdout`` it not passed here.
"""
+
+ if connection:
+ dialect = connection.dialect
+ elif url:
+ url = sqla_url.make_url(url)
+ dialect = url.get_dialect()()
+ elif dialect_name:
+ url = sqla_url.make_url("%s://" % dialect_name)
+ dialect = url.get_dialect()()
+ else:
+ raise Exception("Connection, url, or dialect_name is required.")
+
global _context
from alembic.ddl import base
+ opts = _context_opts.copy()
+ opts.setdefault("transactional_ddl", transactional_ddl)
+ opts.setdefault("output_buffer", output_buffer)
_context = _context_impls.get(
- connection.dialect.name,
- DefaultContext)(connection, **_context_opts)
+ dialect.name,
+ DefaultContext)(dialect, connection, **opts)
+
+def configure_connection(connection):
+ """Deprecated; use :func:`alembic.context.configure`."""
+ configure(connection=connection)
def run_migrations(**kw):
"""Run migrations as determined by the current command line configuration
"""
_context.run_migrations(**kw)
+def execute(sql):
+ """Execute the given SQL using the current change context.
+
+ In a SQL script context, the statement is emitted directly to the
+ output stream.
+
+ """
+ get_context().execute(sql)
+
def get_context():
+ if _context is None:
+ raise Exception("No context has been configured yet.")
return _context
\ No newline at end of file
logging.fileConfig(options.config_file)
db_names = options.get_main_option('databases')
-
engines = {}
for name in re.split(r',\s*', db_names):
engines[name] = rec = {}
rec['engine'] = engine = \
engine_from_config(options.get_section(name),
prefix='sqlalchemy.')
- rec['connection'] = conn = engine.connect()
- if USE_TWOPHASE:
- rec['transaction'] = conn.begin_twophase()
- else:
- rec['transaction'] = conn.begin()
-try:
+if not context.requires_connection():
for name, rec in engines.items():
- context.configure_connection(rec['connection'])
+ context.configure(
+ dialect_name=rec['engine'].name
+ )
context.run_migrations(engine=name)
+else:
+ for name, rec in engines.items():
+ engine = rec['engine']
+ rec['connection'] = conn = engine.connect()
+
+ if USE_TWOPHASE:
+ rec['transaction'] = conn.begin_twophase()
+ else:
+ rec['transaction'] = conn.begin()
+
+ try:
+ for name, rec in engines.items():
+ context.configure(
+ connection=rec['connection'],
+ dialect_name=rec['engine'].name
+ )
+ context.execute("--running migrations for engine %s" % name)
+ context.run_migrations(engine=name)
+
+ if USE_TWOPHASE:
+ for rec in engines.values():
+ rec['transaction'].prepare()
- if USE_TWOPHASE:
for rec in engines.values():
- rec['transaction'].prepare()
-
- for rec in engines.values():
- rec['transaction'].commit()
-except:
- for rec in engines.values():
- rec['transaction'].rollback()
- raise
\ No newline at end of file
+ rec['transaction'].commit()
+ except:
+ for rec in engines.values():
+ rec['transaction'].rollback()
+ raise
\ No newline at end of file