self.impl = ddl.DefaultImpl.get_by_dialect(dialect)(
dialect, self.connection, self.as_sql,
transactional_ddl,
- self.output_buffer
+ self.output_buffer,
+ _context_opts
)
log.info("Context impl %s.", self.impl.__class__.__name__)
if self.as_sql:
upgrade_token="upgrades",
downgrade_token="downgrades",
sqlalchemy_module_prefix="sa.",
+ **kw
):
"""Configure the migration environment.
what kind of "dialect" is in use. The second is to pass
an actual database connection, if one is required.
- If the :func:`.requires_connection` function returns False,
+ If the :func:`.is_offline_mode` function returns ``True``,
then no connection is needed here. Otherwise, the
``connection`` parameter should be present as an
instance of :class:`sqlalchemy.engine.base.Connection`.
for which it was called is the one that will be operated upon
by the next call to :func:`.run_migrations`.
- :param connection: a :class:`sqlalchemy.engine.base.Connection`. The type of dialect
- to be used will be derived from this.
+ General parameters:
+
+ :param connection: a :class:`~sqlalchemy.engine.base.Connection` to use
+ for SQL execution in "online" mode. When present, is also used to
+ determine the type of dialect in use.
:param url: a string database url, or a :class:`sqlalchemy.engine.url.URL` object.
The type of dialect to be used will be derived from this if ``connection`` is
not passed.
``--sql`` mode.
:param tag: a string tag for usage by custom ``env.py`` scripts. Set via
the ``--tag`` option, can be overridden here.
+
+ Parameters specific to the autogenerate feature, when ``alembic revision``
+ is run with the ``--autogenerate`` feature:
+
:param target_metadata: a :class:`sqlalchemy.schema.MetaData` object that
- will be consulted if the ``--autogenerate`` option is passed to the
- "alembic revision" command. The tables present will be compared against
+ will be consulted during autogeneration. The tables present will be compared against
what is locally available on the target :class:`~sqlalchemy.engine.base.Connection`
to produce candidate upgrade/downgrade operations.
to proceed. Note that some backends such as Postgresql actually execute
the two defaults on the database side to compare for equivalence.
- :param upgrade_token: when running "alembic revision" with the ``--autogenerate``
- option, the text of the candidate upgrade operations will be present in this
- template variable when ``script.py.mako`` is rendered. Defaults to ``upgrades``.
- :param downgrade_token: when running "alembic revision" with the ``--autogenerate``
- option, the text of the candidate downgrade operations will be present in this
- template variable when ``script.py.mako`` is rendered. Defaults to ``downgrades``.
+ :param upgrade_token: When autogenerate completes, the text of the
+ candidate upgrade operations will be present in this template
+ variable when ``script.py.mako`` is rendered. Defaults to ``upgrades``.
+ :param downgrade_token: When autogenerate completes, the text of the
+ candidate downgrade operations will be present in this
+ template variable when ``script.py.mako`` is rendered. Defaults to
+ ``downgrades``.
:param sqlalchemy_module_prefix: When autogenerate refers to SQLAlchemy
:class:`~sqlalchemy.schema.Column` or type classes, this prefix will be used
Note that when dialect-specific types are rendered, autogenerate
will render them using the dialect module name, i.e. ``mssql.BIT()``,
``postgresql.UUID()``.
+
+ Parameters specific to individual backends:
+
+ :param mssql_batch_separator: The "batch separator" which will be placed
+ between each statement when generating offline SQL Server
+ migrations. Defaults to ``GO``. Note this is in addition to the customary
+ semicolon ``;`` at the end of each statement; SQL Server considers
+ the "batch separator" to denote the end of an individual statement
+ execution, and cannot group certain dependent operations in
+ one step.
"""
opts['upgrade_token'] = upgrade_token
opts['downgrade_token'] = downgrade_token
opts['sqlalchemy_module_prefix'] = sqlalchemy_module_prefix
+ opts.update(kw)
+
_context = Context(
dialect, _script, connection,
opts['fn'],
transactional_ddl = False
def __init__(self, dialect, connection, as_sql,
- transactional_ddl, output_buffer):
+ transactional_ddl, output_buffer,
+ context_opts):
self.dialect = dialect
self.connection = connection
self.as_sql = as_sql
self.output_buffer = output_buffer
self.memo = {}
+ self.context_opts = context_opts
if transactional_ddl is not None:
self.transactional_ddl = transactional_ddl
class MSSQLImpl(DefaultImpl):
__dialect__ = 'mssql'
transactional_ddl = True
+ batch_separator = "GO"
+
+ def __init__(self, *arg, **kw):
+ super(MSSQLImpl, self).__init__(*arg, **kw)
+ self.batch_separator = self.context_opts.get(
+ "mssql_batch_separator",
+ self.batch_separator)
def start_migrations(self):
self.__dict__.pop('const_sym_counter', None)
def const_sym_counter(self):
return 1
+ def _exec(self, construct, *args, **kw):
+ super(MSSQLImpl, self)._exec(construct, *args, **kw)
+ if self.as_sql and self.batch_separator:
+ self.static_output(self.batch_separator)
+
def emit_begin(self):
self._exec("BEGIN TRANSACTION")
+ def alter_column(self, table_name, column_name,
+ nullable=None,
+ server_default=False,
+ name=None,
+ type_=None,
+ schema=None,
+ existing_type=None,
+ existing_server_default=None,
+ existing_nullable=None
+ ):
+
+ if nullable is not None and existing_type is None:
+ if type_ is not None:
+ existing_type = type_
+ # the NULL/NOT NULL alter will handle
+ # the type alteration
+ type_ = None
+ else:
+ raise util.CommandError(
+ "MS-SQL ALTER COLUMN operations "
+ "with NULL or NOT NULL require the "
+ "existing_type or a new type_ be passed.")
+
+ super(MSSQLImpl, self).alter_column(
+ table_name, column_name,
+ nullable=nullable,
+ server_default=server_default,
+ name=name,
+ type_=type_,
+ schema=schema,
+ existing_type=existing_type,
+ existing_server_default=existing_server_default,
+ existing_nullable=existing_nullable
+ )
+
def bulk_insert(self, table, rows):
if self.as_sql:
self._exec(
@compiles(ColumnNullable, 'mssql')
def visit_column_nullable(element, compiler, **kw):
- return "%s %s %s" % (
+ return "%s %s %s %s" % (
alter_table(compiler, element.table_name, element.schema),
alter_column(compiler, element.column_name),
- "NULL" if element.nullable else "SET NOT NULL"
+ compiler.dialect.type_compiler.process(element.existing_type),
+ "NULL" if element.nullable else "NOT NULL"
)
assert_string.replace("\n", "").replace("\t", "")
)
-def capture_context_buffer(transactional_ddl=None):
+def capture_context_buffer(**kw):
buf = StringIO.StringIO()
- if transactional_ddl is not None:
- context._context_opts['transactional_ddl'] = \
- transactional_ddl
-
class capture(object):
def __enter__(self):
- context._context_opts['output_buffer'] = buf
+ context.configure(
+ dialect_name="sqlite",
+ output_buffer = buf,
+ **kw
+ )
return buf
- def __exit__(self, *arg, **kw):
+ def __exit__(self, *arg, **kwarg):
print buf.getvalue()
- context._context_opts.pop('output_buffer', None)
+ for k in kw:
+ context._context_opts.pop(k, None)
return capture()
"""Test op functions against MSSQL."""
-from tests import op_fixture, capture_context_buffer, no_sql_testing_config, staging_env, three_rev_fixture, clear_staging_env
-from alembic import op, command
+from tests import op_fixture, capture_context_buffer, \
+ no_sql_testing_config, assert_raises_message, staging_env, \
+ three_rev_fixture, clear_staging_env
+from alembic import op, command, util
from sqlalchemy import Integer, Column, ForeignKey, \
UniqueConstraint, Table, MetaData, String
from sqlalchemy.sql import table
assert "BEGIN TRANSACTION" in buf.getvalue()
assert "COMMIT" in buf.getvalue()
+ def test_batch_separator_default(self):
+ with capture_context_buffer() as buf:
+ command.upgrade(self.cfg, self.a, sql=True)
+ assert "GO" in buf.getvalue()
+
+ def test_batch_separator_custom(self):
+ with capture_context_buffer(mssql_batch_separator="BYE") as buf:
+ command.upgrade(self.cfg, self.a, sql=True)
+ assert "BYE" in buf.getvalue()
+
class OpTest(TestCase):
def test_add_column(self):
context = op_fixture('mssql')
context.assert_contains("exec('alter table t1 drop constraint ' + @const_name_2)")
context.assert_contains("ALTER TABLE t1 DROP COLUMN c2")
- def test_alter_column_nullable(self):
+ def test_alter_column_nullable_w_existing_type(self):
+ context = op_fixture('mssql')
+ op.alter_column("t", "c", nullable=True, existing_type=Integer)
+ context.assert_(
+ "ALTER TABLE t ALTER COLUMN c INTEGER NULL"
+ )
+
+ def test_alter_column_not_nullable_w_existing_type(self):
context = op_fixture('mssql')
- op.alter_column("t", "c", nullable=True)
+ op.alter_column("t", "c", nullable=False, existing_type=Integer)
context.assert_(
- "ALTER TABLE t ALTER COLUMN c NULL"
+ "ALTER TABLE t ALTER COLUMN c INTEGER NOT NULL"
)
- def test_alter_column_not_nullable(self):
+ def test_alter_column_nullable_w_new_type(self):
context = op_fixture('mssql')
- op.alter_column("t", "c", nullable=False)
+ op.alter_column("t", "c", nullable=True, type_=Integer)
context.assert_(
- "ALTER TABLE t ALTER COLUMN c SET NOT NULL"
+ "ALTER TABLE t ALTER COLUMN c INTEGER NULL"
+ )
+
+ def test_alter_column_not_nullable_w_new_type(self):
+ context = op_fixture('mssql')
+ op.alter_column("t", "c", nullable=False, type_=Integer)
+ context.assert_(
+ "ALTER TABLE t ALTER COLUMN c INTEGER NOT NULL"
+ )
+
+ def test_alter_column_nullable_type_required(self):
+ context = op_fixture('mssql')
+ assert_raises_message(
+ util.CommandError,
+ "MS-SQL ALTER COLUMN operations with NULL or "
+ "NOT NULL require the existing_type or a new "
+ "type_ be passed.",
+ op.alter_column, "t", "c", nullable=False
)
# TODO: when we add schema support