class BatchOperationsImpl(object):
- def __init__(self, operations, table_name, schema, recreate, copy_from):
+ def __init__(self, operations, table_name, schema, recreate,
+ copy_from, table_args, table_kwargs):
self.operations = operations
self.table_name = table_name
self.schema = schema
"recreate may be one of 'auto', 'always', or 'never'.")
self.recreate = recreate
self.copy_from = copy_from
+ self.table_args = table_args
+ self.table_kwargs = table_kwargs
self.batch = []
@property
self.table_name, m1, schema=self.schema,
autoload=True, autoload_with=self.operations.get_bind())
- batch_impl = ApplyBatchImpl(existing_table)
+ batch_impl = ApplyBatchImpl(
+ existing_table, self.table_args, self.table_kwargs)
for opname, arg, kw in self.batch:
fn = getattr(batch_impl, opname)
fn(*arg, **kw)
class ApplyBatchImpl(object):
- def __init__(self, table):
+ def __init__(self, table, table_args, table_kwargs):
self.table = table # this is a Table object
+ self.table_args = table_args
+ self.table_kwargs = table_kwargs
self.new_table = None
self.column_transfers = OrderedDict(
- (c.name, {}) for c in self.table.c
+ (c.name, {'expr': c}) for c in self.table.c
)
self._grab_table_elements()
m = MetaData()
schema = self.table.schema
self.new_table = new_table = Table(
- '_alembic_batch_temp', m, *self.columns.values(), schema=schema)
+ '_alembic_batch_temp', m,
+ *(list(self.columns.values()) + list(self.table_args)),
+ schema=schema,
+ **self.table_kwargs)
for const in list(self.named_constraints.values()) + \
self.unnamed_constraints:
- const_columns = set([c.key for c in const.columns])
+
+ const_columns = set([
+ c.key for c in self._constraint_columns(const)])
+
if not const_columns.issubset(self.column_transfers):
continue
const_copy = const.copy(schema=schema, target_table=new_table)
*[new_table.c[col] for col in index.columns.keys()],
**index.kwargs)
+ def _constraint_columns(self, constraint):
+ if isinstance(constraint, ForeignKeyConstraint):
+ return [fk.parent for fk in constraint.elements]
+ else:
+ return list(constraint.columns)
+
def _setup_referent(self, metadata, constraint):
spec = constraint.elements[0]._get_colspec()
parts = spec.split(".")
op_impl._exec(
self.new_table.insert(inline=True).from_select(
- list(self.column_transfers.keys()),
+ list(k for k, transfer in
+ self.column_transfers.items() if 'expr' in transfer),
select([
- self.table.c[key]
- for key in self.column_transfers
+ transfer['expr']
+ for transfer in self.column_transfers.values()
+ if 'expr' in transfer
])
)
)
def add_column(self, table_name, column, **kw):
self.columns[column.name] = column
+ self.column_transfers[column.name] = {}
def drop_column(self, table_name, column, **kw):
del self.columns[column.name]
del self.column_transfers[column.name]
def add_constraint(self, const):
- raise NotImplementedError("TODO")
+ if not const.name:
+ raise ValueError("Constraint must have a name")
+ self.named_constraints[const.name] = const
def drop_constraint(self, const):
- raise NotImplementedError("TODO")
+ if not const.name:
+ raise ValueError("Constraint must have a name")
+ try:
+ del self.named_constraints[const.name]
+ except KeyError:
+ raise ValueError("No such constraint: '%s'" % const.name)
def rename_table(self, *arg, **kw):
raise NotImplementedError("TODO")
@contextmanager
def batch_alter_table(
- self, table_name, schema=None, recreate="auto", copy_from=None):
+ self, table_name, schema=None, recreate="auto", copy_from=None,
+ table_args=(), table_kwargs=util.immutabledict()):
"""Invoke a series of per-table migrations in batch.
Batch mode allows a series of operations specific to a table
:param copy_from: optional :class:`~sqlalchemy.schema.Table` object
that will act as the structure of the table being copied. If omitted,
table reflection is used to retrieve the structure of the table.
+ :param table_args: a sequence of additional positional arguments that
+ will be applied to the new :class:`~sqlalchemy.schema.Table` when
+ created, in addition to those copied from the source table.
+ This may be used to provide additional constraints such as CHECK
+ constraints that may not be reflected.
+ :param table_kwargs: a dictionary of additional keyword arguments
+ that will be applied to the new :class:`~sqlalchemy.schema.Table`
+ when created, in addition to those copied from the source table.
+ This may be used to provide for additional table options that may
+ not be reflected.
.. versionadded:: 0.7.0
"""
impl = batch.BatchOperationsImpl(
- self, table_name, schema, recreate, copy_from)
+ self, table_name, schema, recreate,
+ copy_from, table_args, table_kwargs)
batch_op = BatchOperations(self.migration_context, impl=impl)
yield batch_op
impl.flush()
"""
+ def _noop(self, operation):
+ raise NotImplementedError(
+ "The %s method does not apply to a batch table alter operation."
+ % operation)
+
def add_column(self, column):
"""Issue an "add column" instruction using the current
batch migration context.
"""
- def create_foreign_key(self, name, referent, local_cols,
- remote_cols, onupdate=None, ondelete=None,
- deferrable=None, initially=None, match=None,
- referent_schema=None,
- **dialect_kw):
+ def create_foreign_key(
+ self, name, referent, local_cols, remote_cols, **kw):
"""Issue a "create foreign key" instruction using the
current batch migration context.
:meth:`.Operations.create_foreign_key`
"""
+ return super(BatchOperations, self).create_foreign_key(
+ name, self.impl.table_name, referent, local_cols, remote_cols,)
def create_unique_constraint(self, name, local_cols, **kw):
"""Issue a "create unique constraint" instruction using the
:meth:`.Operations.create_unique_constraint`
"""
+ return super(BatchOperations, self).create_unique_constraint(
+ name, self.impl.table_name, local_cols, **kw)
def create_check_constraint(self, name, condition, **kw):
"""Issue a "create check constraint" instruction using the
"""
- def create_index(self, name, table_name, columns, schema=None,
- unique=False, quote=None, **kw):
- """Issue a "create index" instruction using the
- current batch migration context.
-
- The batch form of this call omits the ``table_name`` and ``schema``
- arguments from the call.
-
- .. seealso::
-
- :meth:`.Operations.create_index`
-
- """
-
- def drop_index(self, name):
- """Issue a "drop index" instruction using the
- current batch migration context.
-
- The batch form of this call omits the ``table_name`` and ``schema``
- arguments from the call.
-
- .. seealso::
-
- :meth:`.Operations.drop_index`
-
- """
-
def drop_constraint(self, name, type_=None):
"""Issue a "drop constraint" instruction using the
current batch migration context.
:meth:`.Operations.drop_constraint`
"""
+ return super(BatchOperations, self).drop_constraint(
+ name, self.impl.table_name, type_=type_)
+
+ def create_index(self, *arg, **kw):
+ """Not implemented for batch table operations."""
+ self._noop("create_index")
+
+ def drop_index(self, name):
+ """Not implemented for batch table operations."""
+ self._noop("drop_index")
+
from alembic.batch import ApplyBatchImpl
from sqlalchemy import Integer, Table, Column, String, MetaData, ForeignKey, \
- UniqueConstraint, Index, CheckConstraint, PrimaryKeyConstraint, \
- ForeignKeyConstraint
+ UniqueConstraint, ForeignKeyConstraint
from sqlalchemy.sql import column
from sqlalchemy.schema import CreateTable
class BatchApplyTest(TestBase):
- def _simple_fixture(self):
+ def setUp(self):
+ self.op = Operations(mock.Mock(opts={}))
+
+ def _simple_fixture(self, table_args=(), table_kwargs={}):
m = MetaData()
t = Table(
'tname', m,
Column('id', Integer, primary_key=True),
- Column('x', String()),
+ Column('x', String(10)),
Column('y', Integer)
)
- return ApplyBatchImpl(t)
+ return ApplyBatchImpl(t, table_args, table_kwargs)
+
+ def _uq_fixture(self, table_args=(), table_kwargs={}):
+ m = MetaData()
+ t = Table(
+ 'tname', m,
+ Column('id', Integer, primary_key=True),
+ Column('x', String()),
+ Column('y', Integer),
+ UniqueConstraint('y', name='uq1')
+ )
+ return ApplyBatchImpl(t, table_args, table_kwargs)
- def _fk_fixture(self):
+ def _fk_fixture(self, table_args=(), table_kwargs={}):
m = MetaData()
t = Table(
'tname', m,
Column('email', String()),
Column('user_id', Integer, ForeignKey('user.id'))
)
- return ApplyBatchImpl(t)
+ return ApplyBatchImpl(t, table_args, table_kwargs)
- def _selfref_fk_fixture(self):
+ def _named_fk_fixture(self, table_args=(), table_kwargs={}):
+ m = MetaData()
+ t = Table(
+ 'tname', m,
+ Column('id', Integer, primary_key=True),
+ Column('email', String()),
+ Column('user_id', Integer, ForeignKey('user.id', name='ufk'))
+ )
+ return ApplyBatchImpl(t, table_args, table_kwargs)
+
+ def _selfref_fk_fixture(self, table_args=(), table_kwargs={}):
m = MetaData()
t = Table(
'tname', m,
Column('parent_id', ForeignKey('tname.id')),
Column('data', String)
)
- return ApplyBatchImpl(t)
+ return ApplyBatchImpl(t, table_args, table_kwargs)
- def _assert_impl(self, impl, colnames=None):
- context = op_fixture()
+ def _assert_impl(self, impl, colnames=None,
+ ddl_contains=None, ddl_not_contains=None,
+ dialect='default'):
+ context = op_fixture(dialect=dialect)
impl._create(context.impl)
create_stmt = str(
CreateTable(impl.new_table).compile(dialect=context.dialect))
create_stmt = re.sub(r'[\n\t]', '', create_stmt)
- if pk_cols:
- assert "PRIMARY KEY" in create_stmt
- else:
- assert "PRIMARY KEY" not in create_stmt
+
+ if ddl_contains:
+ assert ddl_contains in create_stmt
+ if ddl_not_contains:
+ assert ddl_not_contains not in create_stmt
context.assert_(
create_stmt,
'INSERT INTO _alembic_batch_temp (%(colnames)s) '
'SELECT %(tname_colnames)s FROM tname' % {
"colnames": ", ".join([
- impl.new_table.c[name].name for name in colnames]),
+ impl.new_table.c[name].name
+ for name in colnames if name in impl.table.c]),
"tname_colnames":
- ", ".join("tname.%s" % name for name in colnames)
+ ", ".join("tname.%s" % name
+ for name in colnames if name in impl.table.c)
},
'DROP TABLE tname',
'ALTER TABLE _alembic_batch_temp RENAME TO tname'
def test_rename_col_pk(self):
impl = self._simple_fixture()
impl.alter_column('tname', 'id', new_column_name='foobar')
- new_table = self._assert_impl(impl)
+ new_table = self._assert_impl(
+ impl, ddl_contains="PRIMARY KEY (foobar)")
eq_(new_table.c.id.name, 'foobar')
eq_(list(new_table.primary_key), [new_table.c.id])
impl = self._fk_fixture()
impl.alter_column('tname', 'user_id', new_column_name='foobar')
new_table = self._assert_impl(
- impl, colnames=['id', 'email', 'user_id'])
+ impl, colnames=['id', 'email', 'user_id'],
+ ddl_contains='FOREIGN KEY(foobar) REFERENCES "user" (id)')
eq_(new_table.c.user_id.name, 'foobar')
eq_(
list(new_table.c.user_id.foreign_keys)[0]._get_colspec(),
def test_drop_col_remove_pk(self):
impl = self._simple_fixture()
impl.drop_column('tname', column('id'))
- new_table = self._assert_impl(impl, colnames=['x', 'y'])
+ new_table = self._assert_impl(
+ impl, colnames=['x', 'y'], ddl_not_contains="PRIMARY KEY")
assert 'y' in new_table.c
assert 'id' not in new_table.c
assert not new_table.primary_key
def test_drop_col_remove_fk(self):
impl = self._fk_fixture()
impl.drop_column('tname', column('user_id'))
- new_table = self._assert_impl(impl, colnames=['id', 'email'])
+ new_table = self._assert_impl(
+ impl, colnames=['id', 'email'], ddl_not_contains="FOREIGN KEY")
assert 'user_id' not in new_table.c
assert not new_table.foreign_keys
def test_drop_col_retain_fk(self):
impl = self._fk_fixture()
impl.drop_column('tname', column('email'))
- new_table = self._assert_impl(impl, colnames=['id', 'user_id'])
+ new_table = self._assert_impl(
+ impl, colnames=['id', 'user_id'],
+ ddl_contains='FOREIGN KEY(user_id) REFERENCES "user" (id)')
assert 'email' not in new_table.c
assert new_table.c.user_id.foreign_keys
assert 'data' not in new_table.c
assert new_table.c.parent_id.foreign_keys
+ def test_add_fk(self):
+ impl = self._simple_fixture()
+ impl.add_column('tname', Column('user_id', Integer))
+ fk = self.op._foreign_key_constraint(
+ 'fk1', 'tname', 'user',
+ ['user_id'], ['id'])
+ impl.add_constraint(fk)
+ new_table = self._assert_impl(
+ impl, colnames=['id', 'x', 'y', 'user_id'],
+ ddl_contains='CONSTRAINT fk1 FOREIGN KEY(user_id) '
+ 'REFERENCES "user" (id)')
+ eq_(
+ list(new_table.c.user_id.foreign_keys)[0]._get_colspec(),
+ 'user.id'
+ )
+
+ def test_drop_fk(self):
+ impl = self._named_fk_fixture()
+ fk = ForeignKeyConstraint([], [], name='ufk')
+ impl.drop_constraint(fk)
+ new_table = self._assert_impl(
+ impl, colnames=['id', 'email', 'user_id'],
+ ddl_not_contains="CONSTRANT fk1")
+ eq_(
+ list(new_table.foreign_keys),
+ []
+ )
+
+ def test_add_uq(self):
+ impl = self._simple_fixture()
+ uq = self.op._unique_constraint(
+ 'uq1', 'tname', ['y']
+ )
+
+ impl.add_constraint(uq)
+ self._assert_impl(
+ impl, colnames=['id', 'x', 'y'],
+ ddl_contains="CONSTRAINT uq1 UNIQUE")
+
+ def test_drop_uq(self):
+ impl = self._uq_fixture()
+
+ uq = self.op._unique_constraint(
+ 'uq1', 'tname', ['y']
+ )
+ impl.drop_constraint(uq)
+ self._assert_impl(
+ impl, colnames=['id', 'x', 'y'],
+ ddl_not_contains="CONSTRAINT uq1 UNIQUE")
+
+ def test_add_table_opts(self):
+ impl = self._simple_fixture(table_kwargs={'mysql_engine': 'InnoDB'})
+ self._assert_impl(
+ impl, ddl_contains="ENGINE=InnoDB",
+ dialect='mysql'
+ )
+
class BatchAPITest(TestBase):
@contextmanager
[mock.call.add_column(
'tname', column, schema=None)]
)
+
+ def test_create_fk(self):
+ with self._fixture() as batch:
+ batch.create_foreign_key('myfk', 'user', ['x'], ['y'])
+
+ eq_(
+ self.mock_schema.ForeignKeyConstraint.mock_calls,
+ [
+ mock.call(
+ ['x'], ['user.y'],
+ onupdate=None, ondelete=None, name='myfk',
+ initially=None, deferrable=None, match=None)
+ ]
+ )
+ eq_(
+ batch.impl.operations.impl.mock_calls,
+ [mock.call.add_constraint(
+ self.mock_schema.ForeignKeyConstraint())]
+ )
+
+ def test_create_uq(self):
+ with self._fixture() as batch:
+ batch.create_unique_constraint('uq1', ['a', 'b'])
+
+ eq_(
+ self.mock_schema.Table().c.__getitem__.mock_calls,
+ [mock.call('a'), mock.call('b')]
+ )
+
+ eq_(
+ self.mock_schema.UniqueConstraint.mock_calls,
+ [
+ mock.call(
+ self.mock_schema.Table().c.__getitem__(),
+ self.mock_schema.Table().c.__getitem__(),
+ name='uq1'
+ )
+ ]
+ )
+ eq_(
+ batch.impl.operations.impl.mock_calls,
+ [mock.call.add_constraint(
+ self.mock_schema.UniqueConstraint())]
+ )
+
+ def test_drop_constraint(self):
+ with self._fixture() as batch:
+ batch.drop_constraint('uq1')
+
+ eq_(
+ self.mock_schema.Constraint.mock_calls,
+ [
+ mock.call(name='uq1')
+ ]
+ )
+ eq_(
+ batch.impl.operations.impl.mock_calls,
+ [mock.call.drop_constraint(self.mock_schema.Constraint())]
+ )