]> git.ipfire.org Git - thirdparty/sqlalchemy/alembic.git/commitdiff
- round trip tests
authorMike Bayer <mike_mp@zzzcomputing.com>
Sat, 8 Nov 2014 23:44:43 +0000 (18:44 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sat, 8 Nov 2014 23:44:43 +0000 (18:44 -0500)
- fixes to add column

alembic/batch.py
tests/test_batch.py

index 5f745116163b5ae8eb26faf113c5f27e8bd75ee4..a64d2bcef95f9bcf0120ea617d695a70d1ff7a16 100644 (file)
@@ -1,5 +1,5 @@
 from sqlalchemy import Table, MetaData, Index, select, Column, \
-    ForeignKeyConstraint
+    ForeignKeyConstraint, cast
 from sqlalchemy import types as sqltypes
 from sqlalchemy.util import OrderedDict
 
@@ -194,23 +194,23 @@ class ApplyBatchImpl(object):
     def alter_column(self, table_name, column_name,
                      nullable=None,
                      server_default=False,
-                     new_column_name=None,
+                     name=None,
                      type_=None,
                      autoincrement=None,
                      **kw
                      ):
         existing = self.columns[column_name]
         existing_transfer = self.column_transfers[column_name]
-        if new_column_name is not None and new_column_name != column_name:
+        if name is not None and name != column_name:
             # note that we don't change '.key' - we keep referring
             # to the renamed column by its old key in _create().  neat!
-            existing.name = new_column_name
-            existing_transfer["name"] = new_column_name
+            existing.name = name
+            existing_transfer["name"] = name
 
         if type_ is not None:
             type_ = sqltypes.to_instance(type_)
             existing.type = type_
-            existing_transfer["typecast"] = type_
+            existing_transfer["expr"] = cast(existing_transfer["expr"], type_)
         if nullable is not None:
             existing.nullable = nullable
         if server_default is not False:
@@ -219,7 +219,9 @@ class ApplyBatchImpl(object):
             existing.autoincrement = bool(autoincrement)
 
     def add_column(self, table_name, column, **kw):
-        self.columns[column.name] = column
+        # we copy the column because operations.add_column()
+        # gives us a Column that is part of a Table already.
+        self.columns[column.name] = column.copy(schema=self.table.schema)
         self.column_transfers[column.name] = {}
 
     def drop_column(self, table_name, column, **kw):
index dfddbc59f84b13643d9578d1ffc85ba8f081ec3c..4f2695b0332db4afc84b1de342126e4b4f03953c 100644 (file)
@@ -1,11 +1,12 @@
 from contextlib import contextmanager
 import re
 
-from alembic.testing import TestBase, eq_
+from alembic.testing import TestBase, eq_, config
 from alembic.testing.fixtures import op_fixture
 from alembic.testing import mock
 from alembic.operations import Operations
 from alembic.batch import ApplyBatchImpl
+from alembic.migration import MigrationContext
 
 from sqlalchemy import Integer, Table, Column, String, MetaData, ForeignKey, \
     UniqueConstraint, ForeignKeyConstraint
@@ -97,10 +98,18 @@ class BatchApplyTest(TestBase):
             'SELECT %(tname_colnames)s FROM tname' % {
                 "colnames": ", ".join([
                     impl.new_table.c[name].name
-                    for name in colnames if name in impl.table.c]),
+                    for name in colnames
+                    if name in impl.table.c]),
                 "tname_colnames":
-                ", ".join("tname.%s" % name
-                          for name in colnames if name in impl.table.c)
+                ", ".join(
+                    "CAST(tname.%s AS %s) AS anon_1" % (
+                        name, impl.new_table.c[name].type)
+                    if (
+                        impl.new_table.c[name].type
+                        is not impl.table.c[name].type)
+                    else "tname.%s" % name
+                    for name in colnames if name in impl.table.c
+                )
             },
             'DROP TABLE tname',
             'ALTER TABLE _alembic_batch_temp RENAME TO tname'
@@ -115,13 +124,22 @@ class BatchApplyTest(TestBase):
 
     def test_rename_col(self):
         impl = self._simple_fixture()
-        impl.alter_column('tname', 'x', new_column_name='q')
+        impl.alter_column('tname', 'x', name='q')
         new_table = self._assert_impl(impl)
         eq_(new_table.c.x.name, 'q')
 
+    def test_add_col(self):
+        impl = self._simple_fixture()
+        col = Column('g', Integer)
+        # operations.add_column produces a table
+        t = self.op._table('tname', col)  # noqa
+        impl.add_column('tname', col)
+        new_table = self._assert_impl(impl, colnames=['id', 'x', 'y', 'g'])
+        eq_(new_table.c.g.name, 'g')
+
     def test_rename_col_pk(self):
         impl = self._simple_fixture()
-        impl.alter_column('tname', 'id', new_column_name='foobar')
+        impl.alter_column('tname', 'id', name='foobar')
         new_table = self._assert_impl(
             impl, ddl_contains="PRIMARY KEY (foobar)")
         eq_(new_table.c.id.name, 'foobar')
@@ -129,7 +147,7 @@ class BatchApplyTest(TestBase):
 
     def test_rename_col_fk(self):
         impl = self._fk_fixture()
-        impl.alter_column('tname', 'user_id', new_column_name='foobar')
+        impl.alter_column('tname', 'user_id', name='foobar')
         new_table = self._assert_impl(
             impl, colnames=['id', 'email', 'user_id'],
             ddl_contains='FOREIGN KEY(foobar) REFERENCES "user" (id)')
@@ -329,3 +347,126 @@ class BatchAPITest(TestBase):
             batch.impl.operations.impl.mock_calls,
             [mock.call.drop_constraint(self.mock_schema.Constraint())]
         )
+
+
+class BatchRoundTripTest(TestBase):
+    __only_on__ = "sqlite"
+
+    def setUp(self):
+        self.conn = config.db.connect()
+        t1 = Table(
+            'foo', MetaData(),
+            Column('id', Integer, primary_key=True),
+            Column('data', String(50)),
+            Column('x', Integer)
+        )
+        t1.create(self.conn)
+        self.conn.execute(
+            t1.insert(),
+            [
+                {"id": 1, "data": "d1", "x": 5},
+                {"id": 2, "data": "22", "x": 6},
+                {"id": 3, "data": "8.5", "x": 7},
+                {"id": 4, "data": "9.46", "x": 8},
+                {"id": 5, "data": "d5", "x": 9}
+            ]
+        )
+        context = MigrationContext.configure(self.conn)
+        self.op = Operations(context)
+
+    def tearDown(self):
+        self.conn.execute("drop table foo")
+        self.conn.close()
+
+    def _assert_data(self, data):
+        eq_(
+            [dict(row) for row in self.conn.execute("select * from foo")],
+            data
+        )
+
+    def test_change_type(self):
+        with self.op.batch_alter_table("foo") as batch_op:
+            batch_op.alter_column('data', type_=Integer)
+
+        self._assert_data([
+            {"id": 1, "data": 0, "x": 5},
+            {"id": 2, "data": 22, "x": 6},
+            {"id": 3, "data": 8, "x": 7},
+            {"id": 4, "data": 9, "x": 8},
+            {"id": 5, "data": 0, "x": 9}
+        ])
+
+    def test_drop_column(self):
+        with self.op.batch_alter_table("foo") as batch_op:
+            batch_op.drop_column('data')
+
+        self._assert_data([
+            {"id": 1, "x": 5},
+            {"id": 2, "x": 6},
+            {"id": 3, "x": 7},
+            {"id": 4, "x": 8},
+            {"id": 5, "x": 9}
+        ])
+
+    def test_rename_column(self):
+        with self.op.batch_alter_table("foo") as batch_op:
+            batch_op.alter_column('x', new_column_name='y')
+
+        self._assert_data([
+            {"id": 1, "data": "d1", "y": 5},
+            {"id": 2, "data": "22", "y": 6},
+            {"id": 3, "data": "8.5", "y": 7},
+            {"id": 4, "data": "9.46", "y": 8},
+            {"id": 5, "data": "d5", "y": 9}
+        ])
+
+    def test_drop_column_pk(self):
+        with self.op.batch_alter_table("foo") as batch_op:
+            batch_op.drop_column('id')
+
+        self._assert_data([
+            {"data": "d1", "x": 5},
+            {"data": "22", "x": 6},
+            {"data": "8.5", "x": 7},
+            {"data": "9.46", "x": 8},
+            {"data": "d5", "x": 9}
+        ])
+
+    def test_rename_column_pk(self):
+        with self.op.batch_alter_table("foo") as batch_op:
+            batch_op.alter_column('id', new_column_name='ident')
+
+        self._assert_data([
+            {"ident": 1, "data": "d1", "x": 5},
+            {"ident": 2, "data": "22", "x": 6},
+            {"ident": 3, "data": "8.5", "x": 7},
+            {"ident": 4, "data": "9.46", "x": 8},
+            {"ident": 5, "data": "d5", "x": 9}
+        ])
+
+    def test_add_column_auto(self):
+        # note this uses ALTER
+        with self.op.batch_alter_table("foo") as batch_op:
+            batch_op.add_column(
+                Column('data2', String(50), server_default='hi'))
+
+        self._assert_data([
+            {"id": 1, "data": "d1", "x": 5, 'data2': 'hi'},
+            {"id": 2, "data": "22", "x": 6, 'data2': 'hi'},
+            {"id": 3, "data": "8.5", "x": 7, 'data2': 'hi'},
+            {"id": 4, "data": "9.46", "x": 8, 'data2': 'hi'},
+            {"id": 5, "data": "d5", "x": 9, 'data2': 'hi'}
+        ])
+
+    def test_add_column_recreate(self):
+        with self.op.batch_alter_table("foo", recreate='always') as batch_op:
+            batch_op.add_column(
+                Column('data2', String(50), server_default='hi'))
+
+        self._assert_data([
+            {"id": 1, "data": "d1", "x": 5, 'data2': 'hi'},
+            {"id": 2, "data": "22", "x": 6, 'data2': 'hi'},
+            {"id": 3, "data": "8.5", "x": 7, 'data2': 'hi'},
+            {"id": 4, "data": "9.46", "x": 8, 'data2': 'hi'},
+            {"id": 5, "data": "d5", "x": 9, 'data2': 'hi'}
+        ])