]> git.ipfire.org Git - thirdparty/sqlalchemy/alembic.git/commitdiff
- Repaired the inspection, copying and rendering of CHECK constraints
authorMike Bayer <mike_mp@zzzcomputing.com>
Sat, 29 Nov 2014 21:31:30 +0000 (16:31 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sat, 29 Nov 2014 21:31:30 +0000 (16:31 -0500)
and so-called "schema" types such as Boolean, Enum within the batch
copy system; the CHECK constraint will not be "doubled" when the table is
copied, and additionally the inspection of the CHECK constraint for
its member columns will no longer fail with an attribute error.
fixes #249
- Added two new arguments
:paramref:`.Operations.batch_alter_table.reflect_args`
and :paramref:`.Operations.batch_alter_table.reflect_kwargs`, so that
arguments may be passed directly to suit the
:class:`~.sqlalchemy.schema.Table`
object that will be reflected.

alembic/batch.py
alembic/ddl/base.py
alembic/operations.py
alembic/testing/fixtures.py
docs/build/batch.rst
docs/build/changelog.rst
tests/requirements.py
tests/test_batch.py

index f5957caf5f737c4f9b4f886846ef12e5fff0fe36..f5c1f31de90f86c56deed0519bb6413b09084da8 100644 (file)
@@ -3,11 +3,13 @@ from sqlalchemy import Table, MetaData, Index, select, Column, \
 from sqlalchemy import types as sqltypes
 from sqlalchemy.util import OrderedDict
 from . import util
+from .ddl.base import _columns_for_constraint, _is_type_bound
 
 
 class BatchOperationsImpl(object):
     def __init__(self, operations, table_name, schema, recreate,
-                 copy_from, table_args, table_kwargs):
+                 copy_from, table_args, table_kwargs,
+                 reflect_args, reflect_kwargs):
         if not util.sqla_08:
             raise NotImplementedError(
                 "batch mode requires SQLAlchemy 0.8 or greater.")
@@ -21,6 +23,8 @@ class BatchOperationsImpl(object):
         self.copy_from = copy_from
         self.table_args = table_args
         self.table_kwargs = table_kwargs
+        self.reflect_args = reflect_args
+        self.reflect_kwargs = reflect_kwargs
         self.batch = []
 
     @property
@@ -48,9 +52,13 @@ class BatchOperationsImpl(object):
                 fn(*arg, **kw)
         else:
             m1 = MetaData()
+
             existing_table = Table(
-                self.table_name, m1, schema=self.schema,
-                autoload=True, autoload_with=self.operations.get_bind())
+                self.table_name, m1,
+                schema=self.schema,
+                autoload=True,
+                autoload_with=self.operations.get_bind(),
+                *self.reflect_args, **self.reflect_kwargs)
 
             batch_impl = ApplyBatchImpl(
                 existing_table, self.table_args, self.table_kwargs)
@@ -113,6 +121,8 @@ class ApplyBatchImpl(object):
         self.unnamed_constraints = []
         self.indexes = {}
         for const in self.table.constraints:
+            if _is_type_bound(const):
+                continue
             if const.name:
                 self.named_constraints[const.name] = const
             else:
@@ -136,7 +146,7 @@ class ApplyBatchImpl(object):
                 self.unnamed_constraints:
 
             const_columns = set([
-                c.key for c in self._constraint_columns(const)])
+                c.key for c in _columns_for_constraint(const)])
 
             if not const_columns.issubset(self.column_transfers):
                 continue
@@ -151,12 +161,6 @@ class ApplyBatchImpl(object):
                   *[new_table.c[col] for col in index.columns.keys()],
                   **index.kwargs)
 
-    def _constraint_columns(self, constraint):
-        if isinstance(constraint, ForeignKeyConstraint):
-            return [fk.parent for fk in constraint.elements]
-        else:
-            return list(constraint.columns)
-
     def _setup_referent(self, metadata, constraint):
         spec = constraint.elements[0]._get_colspec()
         parts = spec.split(".")
index aa5fbe160d2c701663460f552881ea96bc75811e..32878b10efb22e9defa7ff41f5343b36e718d39e 100644 (file)
@@ -1,9 +1,11 @@
 import functools
 
 from sqlalchemy.ext.compiler import compiles
-from sqlalchemy.schema import DDLElement, Column
+from sqlalchemy.schema import DDLElement, Column, \
+    ForeignKeyConstraint, CheckConstraint
 from sqlalchemy import Integer
 from sqlalchemy import types as sqltypes
+from sqlalchemy.sql.visitors import traverse
 from .. import util
 
 if util.sqla_09:
@@ -152,6 +154,39 @@ def visit_column_default(element, compiler, **kw):
     )
 
 
+def _columns_for_constraint(constraint):
+    if isinstance(constraint, ForeignKeyConstraint):
+        return [fk.parent for fk in constraint.elements]
+    elif isinstance(constraint, CheckConstraint):
+        return _find_columns(constraint.sqltext)
+    else:
+        return list(constraint.columns)
+
+
+def _is_type_bound(constraint):
+    # this deals with SQLAlchemy #3260, don't copy CHECK constraints
+    # that will be generated by the type.
+    if util.sqla_100:
+        # new feature added for #3260
+        return constraint._type_bound
+    else:
+        # old way, look at what we know Boolean/Enum to use
+        return (
+            constraint._create_rule is not None and
+            isinstance(
+                getattr(constraint._create_rule, "target", None),
+                sqltypes.SchemaType)
+        )
+
+
+def _find_columns(clause):
+    """locate Column objects within the given expression."""
+
+    cols = set()
+    traverse(clause, {}, {'column': cols.add})
+    return cols
+
+
 def quote_dotted(name, quote):
     """quote the elements of a dotted name"""
 
index fc52e03eeee1d93bc3d9fa187956735610701ef7..62d48428194b977ef9e6cbc368f6ea56996f9197 100644 (file)
@@ -192,7 +192,8 @@ class Operations(object):
     @contextmanager
     def batch_alter_table(
             self, table_name, schema=None, recreate="auto", copy_from=None,
-            table_args=(), table_kwargs=util.immutabledict()):
+            table_args=(), table_kwargs=util.immutabledict(),
+            reflect_args=(), reflect_kwargs=util.immutabledict()):
         """Invoke a series of per-table migrations in batch.
 
         Batch mode allows a series of operations specific to a table
@@ -251,6 +252,31 @@ class Operations(object):
         :param copy_from: optional :class:`~sqlalchemy.schema.Table` object
          that will act as the structure of the table being copied.  If omitted,
          table reflection is used to retrieve the structure of the table.
+
+         .. seealso::
+
+            :paramref:`~.Operations.batch_alter_table.reflect_args`
+
+            :paramref:`~.Operations.batch_alter_table.reflect_kwargs`
+
+        :param reflect_args: a sequence of additional positional arguments that
+         will be applied to the table structure being reflected / copied;
+         this may be used to pass column and constraint overrides to the
+         table that will be reflected, in lieu of passing the whole
+         :class:`~sqlalchemy.schema.Table` using
+         :paramref:`~.Operations.batch_alter_table.copy_from`.
+
+         .. versionadded:: 0.7.1
+
+        :param reflect_kwargs: a dictionary of additional keyword arguments
+         that will be applied to the table structure being copied; this may be
+         used to pass additional table and reflection options to the table that
+         will be reflected, in lieu of passing the whole
+         :class:`~sqlalchemy.schema.Table` using
+         :paramref:`~.Operations.batch_alter_table.copy_from`.
+
+         .. versionadded:: 0.7.1
+
         :param table_args: a sequence of additional positional arguments that
          will be applied to the new :class:`~sqlalchemy.schema.Table` when
          created, in addition to those copied from the source table.
@@ -273,7 +299,7 @@ class Operations(object):
         """
         impl = batch.BatchOperationsImpl(
             self, table_name, schema, recreate,
-            copy_from, table_args, table_kwargs)
+            copy_from, table_args, table_kwargs, reflect_args, reflect_kwargs)
         batch_op = BatchOperations(self.migration_context, impl=impl)
         yield batch_op
         impl.flush()
index f0b0000ba8e89a152ead892b786edaf8e937399f..6336967fe9cec1aebf91d1e74e8c93b69b8d5917 100644 (file)
@@ -100,7 +100,7 @@ def op_fixture(dialect='default', as_sql=False, naming_convention=None):
             # TODO: this might need to
             # be more like a real connection
             # as tests get more involved
-            self.connection = None
+            self.connection = mock.Mock(dialect=dialect)
 
         def _exec(self, construct, *args, **kw):
             if isinstance(construct, string_types):
index 338ab8a703e6ed1269a5a1950cded6bef78d737f..4c03a668352a14c7a254b5d540d885c24b289a97 100644 (file)
@@ -61,6 +61,55 @@ which is the one kind of column-level ALTER statement that SQLite supports.
 to run "move and copy" unconditionally in all cases, including on databases
 other than SQLite; more on this is below.
 
+.. _batch_controlling_table_reflection:
+
+Controlling Table Reflection
+----------------------------
+
+The :class:`~sqlalchemy.schema.Table` object that is reflected when
+"move and copy" proceeds is performed using the standard ``autoload=True``
+approach.  This call can be affected using the
+:paramref:`~.Operations.batch_alter_table.reflect_args` and
+:paramref:`~.Operations.batch_alter_table.reflect_kwargs` arguments.
+For example, to override a :class:`~sqlalchemy.schema.Column` within
+the reflection process such that a :class:`~sqlalchemy.types.Boolean`
+object is reflected with the ``create_constraint`` flag set to ``False``::
+
+    with self.op.batch_alter_table(
+        "bar",
+        reflect_args=[Column('flag', Boolean(create_constraint=False))]
+    ) as batch_op:
+        batch_op.alter_column(
+            'flag', new_column_name='bflag', existing_type=Boolean)
+
+Another use case, add a listener to the :class:`~sqlalchemy.schema.Table`
+as it is reflected so that special logic can be applied to columns or
+types, using the :meth:`~sqlalchemy.events.DDLEvents.column_reflect` event::
+
+    def listen_for_reflect(inspector, table, column_info):
+        "correct an ENUM type"
+        if column_info['name'] == 'my_enum':
+            column_info['type'] = Enum('a', 'b', 'c')
+
+    with self.op.batch_alter_table(
+        "bar",
+        reflect_kwargs=dict(
+            listeners=[
+                ('column_reflect', listen_for_reflect)
+            ]
+        )
+    ) as batch_op:
+        batch_op.alter_column(
+            'flag', new_column_name='bflag', existing_type=Boolean)
+
+The reflection process may also be bypassed entirely by sending a
+pre-fabricated :class:`~sqlalchemy.schema.Table` object; see
+:ref:`batch_offline_mode` for an example.
+
+.. versionadded:: 0.7.1
+   added :paramref:`.Operations.batch_alter_table.reflect_args`
+   and :paramref:`.Operations.batch_alter_table.reflect_kwargs` options.
+
 
 Dealing with Constraints
 ------------------------
@@ -101,6 +150,8 @@ the recreation of unnamed UNIQUE constraints, either they should be named
 in the first place, or again specified within
 :paramref:`.Operations.batch_alter_table.table_args`.
 
+.. _batch_offline_mode:
+
 Working in Offline Mode
 -----------------------
 
@@ -113,7 +164,7 @@ get this information, which means that "online" mode is required; the
 To support offline mode, the system must work without table reflection
 present, which means the full table as it intends to be created must be
 passed to :meth:`.Operations.batch_alter_table` using
-:paramref:`.Operations.batch_alter_table.copy_from`::
+:paramref:`~.Operations.batch_alter_table.copy_from`::
 
     meta = MetaData()
     some_table = Table(
index 2287b9bcd449f6a6c50fa68ef08e7899de518de4..3c9927cee792148bf02025b7065cbc44964b8db7 100644 (file)
@@ -3,6 +3,33 @@
 ==========
 Changelog
 ==========
+.. changelog::
+    :version: 0.7.1
+
+    .. change::
+      :tags: bug, batch
+      :tickets: 249
+
+      Repaired the inspection, copying and rendering of CHECK constraints
+      and so-called "schema" types such as Boolean, Enum within the batch
+      copy system; the CHECK constraint will not be "doubled" when the table is
+      copied, and additionally the inspection of the CHECK constraint for
+      its member columns will no longer fail with an attribute error.
+
+    .. change::
+      :tags: feature, batch
+
+      Added two new arguments
+      :paramref:`.Operations.batch_alter_table.reflect_args`
+      and :paramref:`.Operations.batch_alter_table.reflect_kwargs`, so that
+      arguments may be passed directly to suit the
+      :class:`~.sqlalchemy.schema.Table`
+      object that will be reflected.
+
+      .. seealso::
+
+        :ref:`batch_controlling_table_reflection`
+
 .. changelog::
     :version: 0.7.0
     :released: November 24, 2014
index 65c59d08a0db8206a304ed21c90679f66b7b4ecc..7512adf1f456d748cd1807b06b704c794f0bc67f 100644 (file)
@@ -19,3 +19,13 @@ class DefaultRequirements(SuiteRequirements):
         """test will fail if referential integrity is enforced"""
 
         return exclusions.fails_on_everything_except("sqlite")
+
+    @property
+    def non_native_boolean(self):
+        """test will fail if native boolean is provided"""
+
+        return exclusions.fails_if(
+            exclusions.LambdaPredicate(
+                lambda config: config.db.dialect.supports_native_boolean
+            )
+        )
index 389295174d843205aa8b9217fb228799fa45ad72..662714e135fd931c203bbd47e10988280d8ab79c 100644 (file)
@@ -10,7 +10,8 @@ from alembic.batch import ApplyBatchImpl
 from alembic.migration import MigrationContext
 
 from sqlalchemy import Integer, Table, Column, String, MetaData, ForeignKey, \
-    UniqueConstraint, ForeignKeyConstraint, Index
+    UniqueConstraint, ForeignKeyConstraint, Index, Boolean, CheckConstraint, \
+    Enum
 from sqlalchemy.sql import column
 from sqlalchemy.schema import CreateTable, CreateIndex
 
@@ -53,6 +54,30 @@ class BatchApplyTest(TestBase):
         )
         return ApplyBatchImpl(t, table_args, table_kwargs)
 
+    def _literal_ck_fixture(
+            self, copy_from=None, table_args=(), table_kwargs={}):
+        m = MetaData()
+        if copy_from is not None:
+            t = copy_from
+        else:
+            t = Table(
+                'tname', m,
+                Column('id', Integer, primary_key=True),
+                Column('email', String()),
+                CheckConstraint("email LIKE '%@%'")
+            )
+        return ApplyBatchImpl(t, table_args, table_kwargs)
+
+    def _sql_ck_fixture(self, table_args=(), table_kwargs={}):
+        m = MetaData()
+        t = Table(
+            'tname', m,
+            Column('id', Integer, primary_key=True),
+            Column('email', String())
+        )
+        t.append_constraint(CheckConstraint(t.c.email.like('%@%')))
+        return ApplyBatchImpl(t, table_args, table_kwargs)
+
     def _fk_fixture(self, table_args=(), table_kwargs={}):
         m = MetaData()
         t = Table(
@@ -83,6 +108,33 @@ class BatchApplyTest(TestBase):
         )
         return ApplyBatchImpl(t, table_args, table_kwargs)
 
+    def _boolean_fixture(self, table_args=(), table_kwargs={}):
+        m = MetaData()
+        t = Table(
+            'tname', m,
+            Column('id', Integer, primary_key=True),
+            Column('flag', Boolean)
+        )
+        return ApplyBatchImpl(t, table_args, table_kwargs)
+
+    def _boolean_no_ck_fixture(self, table_args=(), table_kwargs={}):
+        m = MetaData()
+        t = Table(
+            'tname', m,
+            Column('id', Integer, primary_key=True),
+            Column('flag', Boolean(create_constraint=False))
+        )
+        return ApplyBatchImpl(t, table_args, table_kwargs)
+
+    def _enum_fixture(self, table_args=(), table_kwargs={}):
+        m = MetaData()
+        t = Table(
+            'tname', m,
+            Column('id', Integer, primary_key=True),
+            Column('thing', Enum('a', 'b', 'c'))
+        )
+        return ApplyBatchImpl(t, table_args, table_kwargs)
+
     def _assert_impl(self, impl, colnames=None,
                      ddl_contains=None, ddl_not_contains=None,
                      dialect='default'):
@@ -129,8 +181,8 @@ class BatchApplyTest(TestBase):
                     "CAST(tname.%s AS %s) AS anon_1" % (
                         name, impl.new_table.c[name].type)
                     if (
-                        impl.new_table.c[name].type
-                        is not impl.table.c[name].type)
+                        impl.new_table.c[name].type._type_affinity
+                        is not impl.table.c[name].type._type_affinity)
                     else "tname.%s" % name
                     for name in colnames if name in impl.table.c
                 )
@@ -153,6 +205,92 @@ class BatchApplyTest(TestBase):
         new_table = self._assert_impl(impl)
         eq_(new_table.c.x.name, 'q')
 
+    def test_rename_col_boolean(self):
+        impl = self._boolean_fixture()
+        impl.alter_column('tname', 'flag', name='bflag')
+        new_table = self._assert_impl(
+            impl, ddl_contains="CHECK (bflag IN (0, 1)",
+            colnames=["id", "flag"])
+        eq_(new_table.c.flag.name, 'bflag')
+        eq_(
+            len([
+                const for const
+                in new_table.constraints
+                if isinstance(const, CheckConstraint)]),
+            1)
+
+    def test_rename_col_boolean_no_ck(self):
+        impl = self._boolean_no_ck_fixture()
+        impl.alter_column('tname', 'flag', name='bflag')
+        new_table = self._assert_impl(
+            impl, ddl_not_contains="CHECK",
+            colnames=["id", "flag"])
+        eq_(new_table.c.flag.name, 'bflag')
+        eq_(
+            len([
+                const for const
+                in new_table.constraints
+                if isinstance(const, CheckConstraint)]),
+            0)
+
+    def test_rename_col_enum(self):
+        impl = self._enum_fixture()
+        impl.alter_column('tname', 'thing', name='thang')
+        new_table = self._assert_impl(
+            impl, ddl_contains="CHECK (thang IN ('a', 'b', 'c')",
+            colnames=["id", "thing"])
+        eq_(new_table.c.thing.name, 'thang')
+        eq_(
+            len([
+                const for const
+                in new_table.constraints
+                if isinstance(const, CheckConstraint)]),
+            1)
+
+    def test_rename_col_literal_ck(self):
+        impl = self._literal_ck_fixture()
+        impl.alter_column('tname', 'email', name='emol')
+        new_table = self._assert_impl(
+            # note this is wrong, we don't dig into the SQL
+            impl, ddl_contains="CHECK (email LIKE '%@%')",
+            colnames=["id", "email"])
+        eq_(
+            len([c for c in new_table.constraints
+                if isinstance(c, CheckConstraint)]), 1)
+
+        eq_(new_table.c.email.name, 'emol')
+
+    def test_rename_col_literal_ck_workaround(self):
+        impl = self._literal_ck_fixture(
+            copy_from=Table(
+                'tname', MetaData(),
+                Column('id', Integer, primary_key=True),
+                Column('email', String),
+            ),
+            table_args=[CheckConstraint("emol LIKE '%@%'")])
+
+        impl.alter_column('tname', 'email', name='emol')
+        new_table = self._assert_impl(
+            impl, ddl_contains="CHECK (emol LIKE '%@%')",
+            colnames=["id", "email"])
+        eq_(
+            len([c for c in new_table.constraints
+                if isinstance(c, CheckConstraint)]), 1)
+        eq_(new_table.c.email.name, 'emol')
+
+    def test_rename_col_sql_ck(self):
+        impl = self._sql_ck_fixture()
+
+        impl.alter_column('tname', 'email', name='emol')
+        new_table = self._assert_impl(
+            impl, ddl_contains="CHECK (emol LIKE '%@%')",
+            colnames=["id", "email"])
+        eq_(
+            len([c for c in new_table.constraints
+                if isinstance(c, CheckConstraint)]), 1)
+
+        eq_(new_table.c.email.name, 'emol')
+
     def test_add_col(self):
         impl = self._simple_fixture()
         col = Column('g', Integer)
@@ -428,9 +566,10 @@ class BatchRoundTripTest(TestBase):
         self.metadata.drop_all(self.conn)
         self.conn.close()
 
-    def _assert_data(self, data):
+    def _assert_data(self, data, tablename='foo'):
         eq_(
-            [dict(row) for row in self.conn.execute("select * from foo")],
+            [dict(row) for row
+             in self.conn.execute("select * from %s" % tablename)],
             data
         )
 
@@ -506,6 +645,54 @@ class BatchRoundTripTest(TestBase):
             {"id": 5, "data": "d5", "y": 9}
         ])
 
+    def test_rename_column_boolean(self):
+        bar = Table(
+            'bar', self.metadata,
+            Column('id', Integer, primary_key=True),
+            Column('flag', Boolean()),
+            mysql_engine='InnoDB'
+        )
+        bar.create(self.conn)
+        self.conn.execute(bar.insert(), {'id': 1, 'flag': True})
+        self.conn.execute(bar.insert(), {'id': 2, 'flag': False})
+
+        with self.op.batch_alter_table(
+            "bar"
+        ) as batch_op:
+            batch_op.alter_column(
+                'flag', new_column_name='bflag', existing_type=Boolean)
+
+        self._assert_data([
+            {"id": 1, 'bflag': True},
+            {"id": 2, 'bflag': False},
+        ], 'bar')
+
+    @config.requirements.non_native_boolean
+    def test_rename_column_non_native_boolean_no_ck(self):
+        bar = Table(
+            'bar', self.metadata,
+            Column('id', Integer, primary_key=True),
+            Column('flag', Boolean(create_constraint=False)),
+            mysql_engine='InnoDB'
+        )
+        bar.create(self.conn)
+        self.conn.execute(bar.insert(), {'id': 1, 'flag': True})
+        self.conn.execute(bar.insert(), {'id': 2, 'flag': False})
+        self.conn.execute(bar.insert(), {'id': 3, 'flag': 5})
+
+        with self.op.batch_alter_table(
+            "bar",
+            reflect_args=[Column('flag', Boolean(create_constraint=False))]
+        ) as batch_op:
+            batch_op.alter_column(
+                'flag', new_column_name='bflag', existing_type=Boolean)
+
+        self._assert_data([
+            {"id": 1, 'bflag': True},
+            {"id": 2, 'bflag': False},
+            {'id': 3, 'bflag': 5}
+        ], 'bar')
+
     def test_drop_column_pk(self):
         with self.op.batch_alter_table("foo") as batch_op:
             batch_op.drop_column('id')