]> git.ipfire.org Git - thirdparty/sqlalchemy/alembic.git/commitdiff
- finish up most features
authorMike Bayer <mike_mp@zzzcomputing.com>
Sat, 8 Nov 2014 22:51:34 +0000 (17:51 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sat, 8 Nov 2014 22:51:34 +0000 (17:51 -0500)
alembic/batch.py
alembic/operations.py
tests/test_batch.py

index 35b7494f7ccc821e73e42b3dff80fcf7a8c14eda..5f745116163b5ae8eb26faf113c5f27e8bd75ee4 100644 (file)
@@ -5,7 +5,8 @@ from sqlalchemy.util import OrderedDict
 
 
 class BatchOperationsImpl(object):
-    def __init__(self, operations, table_name, schema, recreate, copy_from):
+    def __init__(self, operations, table_name, schema, recreate,
+                 copy_from, table_args, table_kwargs):
         self.operations = operations
         self.table_name = table_name
         self.schema = schema
@@ -14,6 +15,8 @@ class BatchOperationsImpl(object):
                 "recreate may be one of 'auto', 'always', or 'never'.")
         self.recreate = recreate
         self.copy_from = copy_from
+        self.table_args = table_args
+        self.table_kwargs = table_kwargs
         self.batch = []
 
     @property
@@ -45,7 +48,8 @@ class BatchOperationsImpl(object):
                 self.table_name, m1, schema=self.schema,
                 autoload=True, autoload_with=self.operations.get_bind())
 
-            batch_impl = ApplyBatchImpl(existing_table)
+            batch_impl = ApplyBatchImpl(
+                existing_table, self.table_args, self.table_kwargs)
             for opname, arg, kw in self.batch:
                 fn = getattr(batch_impl, opname)
                 fn(*arg, **kw)
@@ -84,11 +88,13 @@ class BatchOperationsImpl(object):
 
 
 class ApplyBatchImpl(object):
-    def __init__(self, table):
+    def __init__(self, table, table_args, table_kwargs):
         self.table = table  # this is a Table object
+        self.table_args = table_args
+        self.table_kwargs = table_kwargs
         self.new_table = None
         self.column_transfers = OrderedDict(
-            (c.name, {}) for c in self.table.c
+            (c.name, {'expr': c}) for c in self.table.c
         )
         self._grab_table_elements()
 
@@ -116,11 +122,17 @@ class ApplyBatchImpl(object):
         m = MetaData()
         schema = self.table.schema
         self.new_table = new_table = Table(
-            '_alembic_batch_temp', m, *self.columns.values(), schema=schema)
+            '_alembic_batch_temp', m,
+            *(list(self.columns.values()) + list(self.table_args)),
+            schema=schema,
+            **self.table_kwargs)
 
         for const in list(self.named_constraints.values()) + \
                 self.unnamed_constraints:
-            const_columns = set([c.key for c in const.columns])
+
+            const_columns = set([
+                c.key for c in self._constraint_columns(const)])
+
             if not const_columns.issubset(self.column_transfers):
                 continue
             const_copy = const.copy(schema=schema, target_table=new_table)
@@ -134,6 +146,12 @@ class ApplyBatchImpl(object):
                   *[new_table.c[col] for col in index.columns.keys()],
                   **index.kwargs)
 
+    def _constraint_columns(self, constraint):
+        if isinstance(constraint, ForeignKeyConstraint):
+            return [fk.parent for fk in constraint.elements]
+        else:
+            return list(constraint.columns)
+
     def _setup_referent(self, metadata, constraint):
         spec = constraint.elements[0]._get_colspec()
         parts = spec.split(".")
@@ -156,10 +174,12 @@ class ApplyBatchImpl(object):
 
         op_impl._exec(
             self.new_table.insert(inline=True).from_select(
-                list(self.column_transfers.keys()),
+                list(k for k, transfer in
+                     self.column_transfers.items() if 'expr' in transfer),
                 select([
-                    self.table.c[key]
-                    for key in self.column_transfers
+                    transfer['expr']
+                    for transfer in self.column_transfers.values()
+                    if 'expr' in transfer
                 ])
             )
         )
@@ -200,16 +220,24 @@ class ApplyBatchImpl(object):
 
     def add_column(self, table_name, column, **kw):
         self.columns[column.name] = column
+        self.column_transfers[column.name] = {}
 
     def drop_column(self, table_name, column, **kw):
         del self.columns[column.name]
         del self.column_transfers[column.name]
 
     def add_constraint(self, const):
-        raise NotImplementedError("TODO")
+        if not const.name:
+            raise ValueError("Constraint must have a name")
+        self.named_constraints[const.name] = const
 
     def drop_constraint(self, const):
-        raise NotImplementedError("TODO")
+        if not const.name:
+            raise ValueError("Constraint must have a name")
+        try:
+            del self.named_constraints[const.name]
+        except KeyError:
+            raise ValueError("No such constraint: '%s'" % const.name)
 
     def rename_table(self, *arg, **kw):
         raise NotImplementedError("TODO")
index 7d3345f8f893e9ef6c56312def729e2bdb21cc96..9a869d01e9ee296ce92e101c1855361110174135 100644 (file)
@@ -191,7 +191,8 @@ class Operations(object):
 
     @contextmanager
     def batch_alter_table(
-            self, table_name, schema=None, recreate="auto", copy_from=None):
+            self, table_name, schema=None, recreate="auto", copy_from=None,
+            table_args=(), table_kwargs=util.immutabledict()):
         """Invoke a series of per-table migrations in batch.
 
         Batch mode allows a series of operations specific to a table
@@ -250,6 +251,16 @@ class Operations(object):
         :param copy_from: optional :class:`~sqlalchemy.schema.Table` object
          that will act as the structure of the table being copied.  If omitted,
          table reflection is used to retrieve the structure of the table.
+        :param table_args: a sequence of additional positional arguments that
+         will be applied to the new :class:`~sqlalchemy.schema.Table` when
+         created, in addition to those copied from the source table.
+         This may be used to provide additional constraints such as CHECK
+         constraints that may not be reflected.
+        :param table_kwargs: a dictionary of additional keyword arguments
+         that will be applied to the new :class:`~sqlalchemy.schema.Table`
+         when created, in addition to those copied from the source table.
+         This may be used to provide for additional table options that may
+         not be reflected.
 
         .. versionadded:: 0.7.0
 
@@ -259,7 +270,8 @@ class Operations(object):
 
         """
         impl = batch.BatchOperationsImpl(
-            self, table_name, schema, recreate, copy_from)
+            self, table_name, schema, recreate,
+            copy_from, table_args, table_kwargs)
         batch_op = BatchOperations(self.migration_context, impl=impl)
         yield batch_op
         impl.flush()
@@ -1265,6 +1277,11 @@ class BatchOperations(Operations):
 
     """
 
+    def _noop(self, operation):
+        raise NotImplementedError(
+            "The %s method does not apply to a batch table alter operation."
+            % operation)
+
     def add_column(self, column):
         """Issue an "add column" instruction using the current
         batch migration context.
@@ -1316,11 +1333,8 @@ class BatchOperations(Operations):
 
         """
 
-    def create_foreign_key(self, name, referent, local_cols,
-                           remote_cols, onupdate=None, ondelete=None,
-                           deferrable=None, initially=None, match=None,
-                           referent_schema=None,
-                           **dialect_kw):
+    def create_foreign_key(
+            self, name, referent, local_cols, remote_cols, **kw):
         """Issue a "create foreign key" instruction using the
         current batch migration context.
 
@@ -1339,6 +1353,8 @@ class BatchOperations(Operations):
             :meth:`.Operations.create_foreign_key`
 
         """
+        return super(BatchOperations, self).create_foreign_key(
+            name, self.impl.table_name, referent, local_cols, remote_cols,)
 
     def create_unique_constraint(self, name, local_cols, **kw):
         """Issue a "create unique constraint" instruction using the
@@ -1352,6 +1368,8 @@ class BatchOperations(Operations):
             :meth:`.Operations.create_unique_constraint`
 
         """
+        return super(BatchOperations, self).create_unique_constraint(
+            name, self.impl.table_name, local_cols, **kw)
 
     def create_check_constraint(self, name, condition, **kw):
         """Issue a "create check constraint" instruction using the
@@ -1366,33 +1384,6 @@ class BatchOperations(Operations):
 
         """
 
-    def create_index(self, name, table_name, columns, schema=None,
-                     unique=False, quote=None, **kw):
-        """Issue a "create index" instruction using the
-        current batch migration context.
-
-        The batch form of this call omits the ``table_name`` and ``schema``
-        arguments from the call.
-
-        .. seealso::
-
-            :meth:`.Operations.create_index`
-
-        """
-
-    def drop_index(self, name):
-        """Issue a "drop index" instruction using the
-        current batch migration context.
-
-        The batch form of this call omits the ``table_name`` and ``schema``
-        arguments from the call.
-
-        .. seealso::
-
-            :meth:`.Operations.drop_index`
-
-        """
-
     def drop_constraint(self, name, type_=None):
         """Issue a "drop constraint" instruction using the
         current batch migration context.
@@ -1405,3 +1396,14 @@ class BatchOperations(Operations):
             :meth:`.Operations.drop_constraint`
 
         """
+        return super(BatchOperations, self).drop_constraint(
+            name, self.impl.table_name, type_=type_)
+
+    def create_index(self, *arg, **kw):
+        """Not implemented for batch table operations."""
+        self._noop("create_index")
+
+    def drop_index(self, name):
+        """Not implemented for batch table operations."""
+        self._noop("drop_index")
+
index c4a309e21428d76f73f2b4b6a113149b31ce4ecd..dfddbc59f84b13643d9578d1ffc85ba8f081ec3c 100644 (file)
@@ -8,24 +8,37 @@ from alembic.operations import Operations
 from alembic.batch import ApplyBatchImpl
 
 from sqlalchemy import Integer, Table, Column, String, MetaData, ForeignKey, \
-    UniqueConstraint, Index, CheckConstraint, PrimaryKeyConstraint, \
-    ForeignKeyConstraint
+    UniqueConstraint, ForeignKeyConstraint
 from sqlalchemy.sql import column
 from sqlalchemy.schema import CreateTable
 
 
 class BatchApplyTest(TestBase):
-    def _simple_fixture(self):
+    def setUp(self):
+        self.op = Operations(mock.Mock(opts={}))
+
+    def _simple_fixture(self, table_args=(), table_kwargs={}):
         m = MetaData()
         t = Table(
             'tname', m,
             Column('id', Integer, primary_key=True),
-            Column('x', String()),
+            Column('x', String(10)),
             Column('y', Integer)
         )
-        return ApplyBatchImpl(t)
+        return ApplyBatchImpl(t, table_args, table_kwargs)
+
+    def _uq_fixture(self, table_args=(), table_kwargs={}):
+        m = MetaData()
+        t = Table(
+            'tname', m,
+            Column('id', Integer, primary_key=True),
+            Column('x', String()),
+            Column('y', Integer),
+            UniqueConstraint('y', name='uq1')
+        )
+        return ApplyBatchImpl(t, table_args, table_kwargs)
 
-    def _fk_fixture(self):
+    def _fk_fixture(self, table_args=(), table_kwargs={}):
         m = MetaData()
         t = Table(
             'tname', m,
@@ -33,9 +46,19 @@ class BatchApplyTest(TestBase):
             Column('email', String()),
             Column('user_id', Integer, ForeignKey('user.id'))
         )
-        return ApplyBatchImpl(t)
+        return ApplyBatchImpl(t, table_args, table_kwargs)
 
-    def _selfref_fk_fixture(self):
+    def _named_fk_fixture(self, table_args=(), table_kwargs={}):
+        m = MetaData()
+        t = Table(
+            'tname', m,
+            Column('id', Integer, primary_key=True),
+            Column('email', String()),
+            Column('user_id', Integer, ForeignKey('user.id', name='ufk'))
+        )
+        return ApplyBatchImpl(t, table_args, table_kwargs)
+
+    def _selfref_fk_fixture(self, table_args=(), table_kwargs={}):
         m = MetaData()
         t = Table(
             'tname', m,
@@ -43,10 +66,12 @@ class BatchApplyTest(TestBase):
             Column('parent_id', ForeignKey('tname.id')),
             Column('data', String)
         )
-        return ApplyBatchImpl(t)
+        return ApplyBatchImpl(t, table_args, table_kwargs)
 
-    def _assert_impl(self, impl, colnames=None):
-        context = op_fixture()
+    def _assert_impl(self, impl, colnames=None,
+                     ddl_contains=None, ddl_not_contains=None,
+                     dialect='default'):
+        context = op_fixture(dialect=dialect)
 
         impl._create(context.impl)
 
@@ -60,19 +85,22 @@ class BatchApplyTest(TestBase):
         create_stmt = str(
             CreateTable(impl.new_table).compile(dialect=context.dialect))
         create_stmt = re.sub(r'[\n\t]', '', create_stmt)
-        if pk_cols:
-            assert "PRIMARY KEY" in create_stmt
-        else:
-            assert "PRIMARY KEY" not in create_stmt
+
+        if ddl_contains:
+            assert ddl_contains in create_stmt
+        if ddl_not_contains:
+            assert ddl_not_contains not in create_stmt
 
         context.assert_(
             create_stmt,
             'INSERT INTO _alembic_batch_temp (%(colnames)s) '
             'SELECT %(tname_colnames)s FROM tname' % {
                 "colnames": ", ".join([
-                    impl.new_table.c[name].name for name in colnames]),
+                    impl.new_table.c[name].name
+                    for name in colnames if name in impl.table.c]),
                 "tname_colnames":
-                ", ".join("tname.%s" % name for name in colnames)
+                ", ".join("tname.%s" % name
+                          for name in colnames if name in impl.table.c)
             },
             'DROP TABLE tname',
             'ALTER TABLE _alembic_batch_temp RENAME TO tname'
@@ -94,7 +122,8 @@ class BatchApplyTest(TestBase):
     def test_rename_col_pk(self):
         impl = self._simple_fixture()
         impl.alter_column('tname', 'id', new_column_name='foobar')
-        new_table = self._assert_impl(impl)
+        new_table = self._assert_impl(
+            impl, ddl_contains="PRIMARY KEY (foobar)")
         eq_(new_table.c.id.name, 'foobar')
         eq_(list(new_table.primary_key), [new_table.c.id])
 
@@ -102,7 +131,8 @@ class BatchApplyTest(TestBase):
         impl = self._fk_fixture()
         impl.alter_column('tname', 'user_id', new_column_name='foobar')
         new_table = self._assert_impl(
-            impl, colnames=['id', 'email', 'user_id'])
+            impl, colnames=['id', 'email', 'user_id'],
+            ddl_contains='FOREIGN KEY(foobar) REFERENCES "user" (id)')
         eq_(new_table.c.user_id.name, 'foobar')
         eq_(
             list(new_table.c.user_id.foreign_keys)[0]._get_colspec(),
@@ -119,7 +149,8 @@ class BatchApplyTest(TestBase):
     def test_drop_col_remove_pk(self):
         impl = self._simple_fixture()
         impl.drop_column('tname', column('id'))
-        new_table = self._assert_impl(impl, colnames=['x', 'y'])
+        new_table = self._assert_impl(
+            impl, colnames=['x', 'y'], ddl_not_contains="PRIMARY KEY")
         assert 'y' in new_table.c
         assert 'id' not in new_table.c
         assert not new_table.primary_key
@@ -127,14 +158,17 @@ class BatchApplyTest(TestBase):
     def test_drop_col_remove_fk(self):
         impl = self._fk_fixture()
         impl.drop_column('tname', column('user_id'))
-        new_table = self._assert_impl(impl, colnames=['id', 'email'])
+        new_table = self._assert_impl(
+            impl, colnames=['id', 'email'], ddl_not_contains="FOREIGN KEY")
         assert 'user_id' not in new_table.c
         assert not new_table.foreign_keys
 
     def test_drop_col_retain_fk(self):
         impl = self._fk_fixture()
         impl.drop_column('tname', column('email'))
-        new_table = self._assert_impl(impl, colnames=['id', 'user_id'])
+        new_table = self._assert_impl(
+            impl, colnames=['id', 'user_id'],
+            ddl_contains='FOREIGN KEY(user_id) REFERENCES "user" (id)')
         assert 'email' not in new_table.c
         assert new_table.c.user_id.foreign_keys
 
@@ -145,6 +179,63 @@ class BatchApplyTest(TestBase):
         assert 'data' not in new_table.c
         assert new_table.c.parent_id.foreign_keys
 
+    def test_add_fk(self):
+        impl = self._simple_fixture()
+        impl.add_column('tname', Column('user_id', Integer))
+        fk = self.op._foreign_key_constraint(
+            'fk1', 'tname', 'user',
+            ['user_id'], ['id'])
+        impl.add_constraint(fk)
+        new_table = self._assert_impl(
+            impl, colnames=['id', 'x', 'y', 'user_id'],
+            ddl_contains='CONSTRAINT fk1 FOREIGN KEY(user_id) '
+            'REFERENCES "user" (id)')
+        eq_(
+            list(new_table.c.user_id.foreign_keys)[0]._get_colspec(),
+            'user.id'
+        )
+
+    def test_drop_fk(self):
+        impl = self._named_fk_fixture()
+        fk = ForeignKeyConstraint([], [], name='ufk')
+        impl.drop_constraint(fk)
+        new_table = self._assert_impl(
+            impl, colnames=['id', 'email', 'user_id'],
+            ddl_not_contains="CONSTRANT fk1")
+        eq_(
+            list(new_table.foreign_keys),
+            []
+        )
+
+    def test_add_uq(self):
+        impl = self._simple_fixture()
+        uq = self.op._unique_constraint(
+            'uq1', 'tname', ['y']
+        )
+
+        impl.add_constraint(uq)
+        self._assert_impl(
+            impl, colnames=['id', 'x', 'y'],
+            ddl_contains="CONSTRAINT uq1 UNIQUE")
+
+    def test_drop_uq(self):
+        impl = self._uq_fixture()
+
+        uq = self.op._unique_constraint(
+            'uq1', 'tname', ['y']
+        )
+        impl.drop_constraint(uq)
+        self._assert_impl(
+            impl, colnames=['id', 'x', 'y'],
+            ddl_not_contains="CONSTRAINT uq1 UNIQUE")
+
+    def test_add_table_opts(self):
+        impl = self._simple_fixture(table_kwargs={'mysql_engine': 'InnoDB'})
+        self._assert_impl(
+            impl, ddl_contains="ENGINE=InnoDB",
+            dialect='mysql'
+        )
+
 
 class BatchAPITest(TestBase):
     @contextmanager
@@ -179,3 +270,62 @@ class BatchAPITest(TestBase):
             [mock.call.add_column(
                 'tname', column, schema=None)]
         )
+
+    def test_create_fk(self):
+        with self._fixture() as batch:
+            batch.create_foreign_key('myfk', 'user', ['x'], ['y'])
+
+        eq_(
+            self.mock_schema.ForeignKeyConstraint.mock_calls,
+            [
+                mock.call(
+                    ['x'], ['user.y'],
+                    onupdate=None, ondelete=None, name='myfk',
+                    initially=None, deferrable=None, match=None)
+            ]
+        )
+        eq_(
+            batch.impl.operations.impl.mock_calls,
+            [mock.call.add_constraint(
+                self.mock_schema.ForeignKeyConstraint())]
+        )
+
+    def test_create_uq(self):
+        with self._fixture() as batch:
+            batch.create_unique_constraint('uq1', ['a', 'b'])
+
+        eq_(
+            self.mock_schema.Table().c.__getitem__.mock_calls,
+            [mock.call('a'), mock.call('b')]
+        )
+
+        eq_(
+            self.mock_schema.UniqueConstraint.mock_calls,
+            [
+                mock.call(
+                    self.mock_schema.Table().c.__getitem__(),
+                    self.mock_schema.Table().c.__getitem__(),
+                    name='uq1'
+                )
+            ]
+        )
+        eq_(
+            batch.impl.operations.impl.mock_calls,
+            [mock.call.add_constraint(
+                self.mock_schema.UniqueConstraint())]
+        )
+
+    def test_drop_constraint(self):
+        with self._fixture() as batch:
+            batch.drop_constraint('uq1')
+
+        eq_(
+            self.mock_schema.Constraint.mock_calls,
+            [
+                mock.call(name='uq1')
+            ]
+        )
+        eq_(
+            batch.impl.operations.impl.mock_calls,
+            [mock.call.drop_constraint(self.mock_schema.Constraint())]
+        )