]> git.ipfire.org Git - thirdparty/sqlalchemy/alembic.git/commitdiff
- ensure that all ops have full backwards/forwards maintenance of
authorMike Bayer <mike_mp@zzzcomputing.com>
Sat, 8 Aug 2015 02:25:41 +0000 (22:25 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sat, 8 Aug 2015 02:25:41 +0000 (22:25 -0400)
the original object passed in from autogenerate.  In particular this
ensures that the diff structure from compare_metadata is fully backwards compatible
with no chance of synthesized objects.

alembic/operations/ops.py
alembic/testing/__init__.py
alembic/testing/assertions.py
tests/test_autogen_diffs.py

index 314b49b1e9f00d2bde4acef2da813f9d90b0f91c..8735931efd87b62e5d888cbd448614d8ddc6fd98 100644 (file)
@@ -179,11 +179,12 @@ class CreatePrimaryKeyOp(AddConstraintOp):
 
     def __init__(
             self, constraint_name, table_name, columns,
-            schema=None, **kw):
+            schema=None, _orig_constraint=None, **kw):
         self.constraint_name = constraint_name
         self.table_name = table_name
         self.columns = columns
         self.schema = schema
+        self._orig_constraint = _orig_constraint
         self.kw = kw
 
     @classmethod
@@ -193,11 +194,15 @@ class CreatePrimaryKeyOp(AddConstraintOp):
         return cls(
             constraint.name,
             constraint_table.name,
+            constraint.columns,
             schema=constraint_table.schema,
-            *constraint.columns
+            _orig_constraint=constraint
         )
 
     def to_constraint(self, migration_context=None):
+        if self._orig_constraint is not None:
+            return self._orig_constraint
+
         schema_obj = schemaobj.SchemaObjects(migration_context)
         return schema_obj.primary_key_constraint(
             self.constraint_name, self.table_name,
@@ -289,11 +294,12 @@ class CreateUniqueConstraintOp(AddConstraintOp):
 
     def __init__(
             self, constraint_name, table_name,
-            columns, schema=None, **kw):
+            columns, schema=None, _orig_constraint=None, **kw):
         self.constraint_name = constraint_name
         self.table_name = table_name
         self.columns = columns
         self.schema = schema
+        self._orig_constraint = _orig_constraint
         self.kw = kw
 
     @classmethod
@@ -311,10 +317,14 @@ class CreateUniqueConstraintOp(AddConstraintOp):
             constraint_table.name,
             [c.name for c in constraint.columns],
             schema=constraint_table.schema,
+            _orig_constraint=constraint,
             **kw
         )
 
     def to_constraint(self, migration_context=None):
+        if self._orig_constraint is not None:
+            return self._orig_constraint
+
         schema_obj = schemaobj.SchemaObjects(migration_context)
         return schema_obj.unique_constraint(
             self.constraint_name, self.table_name, self.columns,
@@ -421,12 +431,13 @@ class CreateForeignKeyOp(AddConstraintOp):
 
     def __init__(
             self, constraint_name, source_table, referent_table, local_cols,
-            remote_cols, **kw):
+            remote_cols, _orig_constraint=None, **kw):
         self.constraint_name = constraint_name
         self.source_table = source_table
         self.referent_table = referent_table
         self.local_cols = local_cols
         self.remote_cols = remote_cols
+        self._orig_constraint = _orig_constraint
         self.kw = kw
 
     def to_diff_tuple(self):
@@ -459,10 +470,13 @@ class CreateForeignKeyOp(AddConstraintOp):
             target_table,
             source_columns,
             target_columns,
+            _orig_constraint=constraint,
             **kw
         )
 
     def to_constraint(self, migration_context=None):
+        if self._orig_constraint is not None:
+            return self._orig_constraint
         schema_obj = schemaobj.SchemaObjects(migration_context)
         return schema_obj.foreign_key_constraint(
             self.constraint_name,
@@ -606,11 +620,13 @@ class CreateCheckConstraintOp(AddConstraintOp):
     constraint_type = "check"
 
     def __init__(
-            self, constraint_name, table_name, condition, schema=None, **kw):
+            self, constraint_name, table_name,
+            condition, schema=None, _orig_constraint=None, **kw):
         self.constraint_name = constraint_name
         self.table_name = table_name
         self.condition = condition
         self.schema = schema
+        self._orig_constraint = _orig_constraint
         self.kw = kw
 
     @classmethod
@@ -620,11 +636,14 @@ class CreateCheckConstraintOp(AddConstraintOp):
         return cls(
             constraint.name,
             constraint_table.name,
-            constraint.condition,
+            constraint.sqltext,
             schema=constraint_table.schema,
+            _orig_constraint=constraint
         )
 
     def to_constraint(self, migration_context=None):
+        if self._orig_constraint is not None:
+            return self._orig_constraint
         schema_obj = schemaobj.SchemaObjects(migration_context)
         return schema_obj.check_constraint(
             self.constraint_name, self.table_name,
@@ -1444,6 +1463,9 @@ class AddColumnOp(AlterTableOp):
     def to_diff_tuple(self):
         return ("add_column", self.schema, self.table_name, self.column)
 
+    def to_column(self):
+        return self.column
+
     @classmethod
     def from_column(cls, col):
         return cls(col.table.name, col, schema=col.table.schema)
@@ -1558,6 +1580,8 @@ class DropColumnOp(AlterTableOp):
         return cls(tname, col.name, schema=schema, _orig_column=col)
 
     def to_column(self, migration_context=None):
+        if self._orig_column is not None:
+            return self._orig_column
         schema_obj = schemaobj.SchemaObjects(migration_context)
         return schema_obj.column(self.column_name, NULLTYPE)
 
index b14fb8816e6807596d6f2b34843ccb5f1ef59b66..553f501b901a2adeb766ba751db378ce05cc7750 100644 (file)
@@ -1,5 +1,5 @@
 from .fixtures import TestBase
-from .assertions import eq_, ne_, is_, assert_raises_message, \
+from .assertions import eq_, ne_, is_, is_not_, assert_raises_message, \
     eq_ignore_whitespace, assert_raises
 
 from .util import provide_metadata
index 6acca2166da829ed23c7d6aa34e03362c2771b34..b64725ce8604a86ce9d1f8aff7e3d19fd6afa0a9 100644 (file)
@@ -25,6 +25,10 @@ if not util.sqla_094:
         """Assert a is b, with repr messaging on failure."""
         assert a is b, msg or "%r is not %r" % (a, b)
 
+    def is_not_(a, b, msg=None):
+        """Assert a is not b, with repr messaging on failure."""
+        assert a is not b, msg or "%r is %r" % (a, b)
+
     def assert_raises(except_cls, callable_, *args, **kw):
         try:
             callable_(*args, **kw)
@@ -45,7 +49,7 @@ if not util.sqla_094:
             print(text_type(e).encode('utf-8'))
 
 else:
-    from sqlalchemy.testing.assertions import eq_, ne_, is_, \
+    from sqlalchemy.testing.assertions import eq_, ne_, is_, is_not_, \
         assert_raises_message, assert_raises
 
 
index d176b913a2c1203a531b85e7cab4764e31696065..ada6210271412d9d8c41b98919581dd67ca41bc0 100644 (file)
@@ -2,7 +2,8 @@ import sys
 
 from sqlalchemy import MetaData, Column, Table, Integer, String, Text, \
     Numeric, CHAR, ForeignKey, INTEGER, Index, UniqueConstraint, \
-    TypeDecorator, CheckConstraint, text, PrimaryKeyConstraint
+    TypeDecorator, CheckConstraint, text, PrimaryKeyConstraint, \
+    ForeignKeyConstraint
 from sqlalchemy.types import NULLTYPE
 from sqlalchemy.engine.reflection import Inspector
 
@@ -13,7 +14,7 @@ from alembic.testing import TestBase
 from alembic.testing import config
 from alembic.testing import assert_raises_message
 from alembic.testing.mock import Mock
-from alembic.testing import eq_
+from alembic.testing import eq_, is_, is_not_
 from alembic.util import CommandError
 from ._autogen_fixtures import AutogenTest, AutogenFixtureTest
 
@@ -324,6 +325,10 @@ class AutogenerateDiffTest(ModelOne, AutogenTest, TestBase):
 
         eq_(diffs[10][0], 'remove_column')
         eq_(diffs[10][3].name, 'pw')
+        eq_(diffs[10][3].table.name, 'user')
+        assert isinstance(
+            diffs[10][3].type, String
+        )
 
     def test_include_symbol(self):
 
@@ -963,3 +968,129 @@ class PGCompareMetaData(ModelOne, AutogenTest, TestBase):
         eq_(diffs[5][0][0], 'modify_nullable')
         eq_(diffs[5][0][5], False)
         eq_(diffs[5][0][6], True)
+
+
+class OrigObjectTest(TestBase):
+    def setUp(self):
+        self.metadata = m = MetaData()
+        t = Table(
+            't', m,
+            Column('id', Integer(), primary_key=True),
+            Column('x', Integer())
+        )
+        self.ix = Index('ix1', t.c.id)
+        fk = ForeignKeyConstraint(['t_id'], ['t.id'])
+        q = Table(
+            'q', m,
+            Column('t_id', Integer()),
+            fk
+        )
+        self.table = t
+        self.fk = fk
+        self.ck = CheckConstraint(t.c.x > 5)
+        self.uq = UniqueConstraint(q.c.t_id)
+        self.pk = t.primary_key
+
+    def test_drop_fk(self):
+        fk = self.fk
+        op = ops.DropConstraintOp.from_constraint(fk)
+        is_(op.to_constraint(), fk)
+        is_(op.reverse().to_constraint(), fk)
+
+    def test_add_fk(self):
+        fk = self.fk
+        op = ops.AddConstraintOp.from_constraint(fk)
+        is_(op.to_constraint(), fk)
+        is_(op.reverse().to_constraint(), fk)
+        is_not_(None, op.to_constraint().table)
+
+    def test_add_check(self):
+        ck = self.ck
+        op = ops.AddConstraintOp.from_constraint(ck)
+        is_(op.to_constraint(), ck)
+        is_(op.reverse().to_constraint(), ck)
+        is_not_(None, op.to_constraint().table)
+
+    def test_drop_check(self):
+        ck = self.ck
+        op = ops.DropConstraintOp.from_constraint(ck)
+        is_(op.to_constraint(), ck)
+        is_(op.reverse().to_constraint(), ck)
+        is_not_(None, op.to_constraint().table)
+
+    def test_add_unique(self):
+        uq = self.uq
+        op = ops.AddConstraintOp.from_constraint(uq)
+        is_(op.to_constraint(), uq)
+        is_(op.reverse().to_constraint(), uq)
+        is_not_(None, op.to_constraint().table)
+
+    def test_drop_unique(self):
+        uq = self.uq
+        op = ops.DropConstraintOp.from_constraint(uq)
+        is_(op.to_constraint(), uq)
+        is_(op.reverse().to_constraint(), uq)
+        is_not_(None, op.to_constraint().table)
+
+    def test_add_pk_no_orig(self):
+        op = ops.CreatePrimaryKeyOp('pk1', 't', ['x', 'y'])
+        pk = op.to_constraint()
+        eq_(pk.name, 'pk1')
+        eq_(pk.table.name, 't')
+
+    def test_add_pk(self):
+        pk = self.pk
+        op = ops.AddConstraintOp.from_constraint(pk)
+        is_(op.to_constraint(), pk)
+        is_(op.reverse().to_constraint(), pk)
+        is_not_(None, op.to_constraint().table)
+
+    def test_drop_pk(self):
+        pk = self.pk
+        op = ops.DropConstraintOp.from_constraint(pk)
+        is_(op.to_constraint(), pk)
+        is_(op.reverse().to_constraint(), pk)
+        is_not_(None, op.to_constraint().table)
+
+    def test_drop_column(self):
+        t = self.table
+
+        op = ops.DropColumnOp.from_column_and_tablename(None, 't', t.c.x)
+        is_(op.to_column(), t.c.x)
+        is_(op.reverse().to_column(), t.c.x)
+        is_not_(None, op.to_column().table)
+
+    def test_add_column(self):
+        t = self.table
+
+        op = ops.AddColumnOp.from_column_and_tablename(None, 't', t.c.x)
+        is_(op.to_column(), t.c.x)
+        is_(op.reverse().to_column(), t.c.x)
+        is_not_(None, op.to_column().table)
+
+    def test_drop_table(self):
+        t = self.table
+
+        op = ops.DropTableOp.from_table(t)
+        is_(op.to_table(), t)
+        is_(op.reverse().to_table(), t)
+        is_(self.metadata, op.to_table().metadata)
+
+    def test_add_table(self):
+        t = self.table
+
+        op = ops.CreateTableOp.from_table(t)
+        is_(op.to_table(), t)
+        is_(op.reverse().to_table(), t)
+        is_(self.metadata, op.to_table().metadata)
+
+    def test_drop_index(self):
+        op = ops.DropIndexOp.from_index(self.ix)
+        is_(op.to_index(), self.ix)
+        is_(op.reverse().to_index(), self.ix)
+
+    def test_create_index(self):
+        op = ops.CreateIndexOp.from_index(self.ix)
+        is_(op.to_index(), self.ix)
+        is_(op.reverse().to_index(), self.ix)
+