]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- flake8 test_reflection and rework the type-based tests into
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 7 Dec 2015 22:09:11 +0000 (17:09 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Mon, 7 Dec 2015 22:09:11 +0000 (17:09 -0500)
individual categories w/ modernized fixtures, assert attributes
within type objects.

test/dialect/mysql/test_reflection.py

index 266fc335eb8f9b1046dbc6e7260ebe5fa42c0a41..024567828f071e3792c9aee68c8ce6b740a88857 100644 (file)
 # coding: utf-8
 
-from sqlalchemy.testing import eq_
-from sqlalchemy import *
+from sqlalchemy.testing import eq_, is_
+from sqlalchemy import Column, Table, DDL, MetaData, TIMESTAMP, \
+    DefaultClause, String, Integer, Text, UnicodeText, SmallInteger,\
+    NCHAR, LargeBinary, DateTime, select, UniqueConstraint, Unicode,\
+    BigInteger
+from sqlalchemy import event
 from sqlalchemy import sql
+from sqlalchemy import inspect
 from sqlalchemy.dialects.mysql import base as mysql
 from sqlalchemy.testing import fixtures, AssertsExecutionResults
 from sqlalchemy import testing
 
 
+class TypeReflectionTest(fixtures.TestBase):
+    __only_on__ = 'mysql'
+    __backend__ = True
+
+    @testing.provide_metadata
+    def _run_test(self, specs, attributes):
+        columns = [Column('c%i' % (i + 1), t[0]) for i, t in enumerate(specs)]
+
+        # Early 5.0 releases seem to report more "general" for columns
+        # in a view, e.g. char -> varchar, tinyblob -> mediumblob
+        use_views = testing.db.dialect.server_version_info > (5, 0, 10)
+
+        m = self.metadata
+        Table('mysql_types', m, *columns)
+
+        if use_views:
+            event.listen(
+                m, 'after_create',
+                DDL(
+                    'CREATE OR REPLACE VIEW mysql_types_v '
+                    'AS SELECT * from mysql_types')
+            )
+            event.listen(
+                m, 'before_drop',
+                DDL("DROP VIEW mysql_types_v")
+            )
+        m.create_all()
+
+        m2 = MetaData(testing.db)
+        tables = [
+            Table('mysql_types', m2, autoload=True)
+        ]
+        if use_views:
+            tables.append(Table('mysql_types_v', m2, autoload=True))
+
+        for table in tables:
+            for i, (reflected_col, spec) in enumerate(zip(table.c, specs)):
+                expected_spec = spec[1]
+                reflected_type = reflected_col.type
+                is_(type(reflected_type), type(expected_spec))
+
+                for attr in attributes:
+                    eq_(
+                        getattr(reflected_type, attr),
+                        getattr(expected_spec, attr),
+                        "Column %s: Attribute %s value of %s does not "
+                        "match %s for type %s" % (
+                            "c%i" % (i + 1),
+                            attr,
+                            getattr(reflected_type, attr),
+                            getattr(expected_spec, attr),
+                            spec[0]
+                        )
+                    )
+
+    def test_time_types(self):
+        specs = []
+
+        for type_ in (mysql.TIMESTAMP, mysql.DATETIME, mysql.TIME):
+            typespec = type_()
+            specs.append((typespec, typespec))
+
+        specs.extend([
+            (TIMESTAMP(), mysql.TIMESTAMP()),
+            (DateTime(), mysql.DATETIME()),
+        ])
+
+        # note 'timezone' should always be None on both
+        self._run_test(specs, ['fsp', 'timezone'])
+
+    def test_year_types(self):
+        specs = [
+            (mysql.YEAR(), mysql.YEAR(display_width=4)),
+            (mysql.YEAR(display_width=2), mysql.YEAR(display_width=2)),
+            (mysql.YEAR(display_width=4), mysql.YEAR(display_width=4)),
+        ]
+
+        self._run_test(specs, ['display_width'])
+
+    def test_string_types(self):
+        specs = [
+            (String(1), mysql.MSString(1)),
+            (String(3), mysql.MSString(3)),
+            (Text(), mysql.MSText()),
+            (Unicode(1), mysql.MSString(1)),
+            (Unicode(3), mysql.MSString(3)),
+            (UnicodeText(), mysql.MSText()),
+            (mysql.MSChar(1), mysql.MSChar(1)),
+            (mysql.MSChar(3), mysql.MSChar(3)),
+            (NCHAR(2), mysql.MSChar(2)),
+            (mysql.MSNChar(2), mysql.MSChar(2)),
+            (mysql.MSNVarChar(22), mysql.MSString(22),),
+        ]
+        self._run_test(specs, ['length'])
+
+    def test_integer_types(self):
+        specs = []
+        for type_ in [
+                mysql.TINYINT, mysql.SMALLINT,
+                mysql.MEDIUMINT, mysql.INTEGER, mysql.BIGINT]:
+            for display_width in [None, 4, 7]:
+                for unsigned in [False, True]:
+                    for zerofill in [None, True]:
+                        kw = {}
+                        if display_width:
+                            kw['display_width'] = display_width
+                        if unsigned is not None:
+                            kw['unsigned'] = unsigned
+                        if zerofill is not None:
+                            kw['zerofill'] = zerofill
+
+                        zerofill = bool(zerofill)
+                        source_type = type_(**kw)
+
+                        if display_width is None:
+                            display_width = {
+                                mysql.MEDIUMINT: 9,
+                                mysql.SMALLINT: 6,
+                                mysql.TINYINT: 4,
+                                mysql.INTEGER: 11,
+                                mysql.BIGINT: 20
+                            }[type_]
+
+                        if zerofill:
+                            unsigned = True
+
+                        expected_type = type_(
+                            display_width=display_width,
+                            unsigned=unsigned,
+                            zerofill=zerofill
+                        )
+                        specs.append(
+                            (source_type, expected_type)
+                        )
+
+        specs.extend([
+            (SmallInteger(), mysql.SMALLINT(display_width=6)),
+            (Integer(), mysql.INTEGER(display_width=11)),
+            (BigInteger, mysql.BIGINT(display_width=20))
+        ])
+        self._run_test(specs, ['display_width', 'unsigned', 'zerofill'])
+
+    def test_binary_types(self):
+        specs = [
+            (LargeBinary(3), mysql.TINYBLOB(), ),
+            (LargeBinary(), mysql.BLOB()),
+            (mysql.MSBinary(3), mysql.MSBinary(3), ),
+            (mysql.MSVarBinary(3), mysql.MSVarBinary(3)),
+            (mysql.MSTinyBlob(), mysql.MSTinyBlob()),
+            (mysql.MSBlob(), mysql.MSBlob()),
+            (mysql.MSBlob(1234), mysql.MSBlob()),
+            (mysql.MSMediumBlob(), mysql.MSMediumBlob()),
+            (mysql.MSLongBlob(), mysql.MSLongBlob()),
+        ]
+        self._run_test(specs, [])
+
+    @testing.uses_deprecated('Manually quoting ENUM value literals')
+    def test_legacy_enum_types(self):
+
+        specs = [
+            (mysql.ENUM("''","'fleem'"), mysql.ENUM("''","'fleem'")),  # noqa
+        ]
+
+        self._run_test(specs, ['enums'])
+
+
 class ReflectionTest(fixtures.TestBase, AssertsExecutionResults):
 
     __only_on__ = 'mysql'
@@ -75,7 +246,8 @@ class ReflectionTest(fixtures.TestBase, AssertsExecutionResults):
     def test_reflection_with_table_options(self):
         comment = r"""Comment types type speedily ' " \ '' Fun!"""
 
-        def_table = Table('mysql_def', MetaData(testing.db),
+        def_table = Table(
+            'mysql_def', MetaData(testing.db),
             Column('c1', Integer()),
             mysql_engine='MEMORY',
             mysql_comment=comment,
@@ -88,8 +260,9 @@ class ReflectionTest(fixtures.TestBase, AssertsExecutionResults):
 
         def_table.create()
         try:
-            reflected = Table('mysql_def', MetaData(testing.db),
-                          autoload=True)
+            reflected = Table(
+                'mysql_def', MetaData(testing.db),
+                autoload=True)
         finally:
             def_table.drop()
 
@@ -108,15 +281,16 @@ class ReflectionTest(fixtures.TestBase, AssertsExecutionResults):
         assert reflected.kwargs['mysql_connection'] == 'fish'
 
         # This field doesn't seem to be returned by mysql itself.
-        #assert reflected.kwargs['mysql_password'] == 'secret'
+        # assert reflected.kwargs['mysql_password'] == 'secret'
 
         # This is explicitly ignored when reflecting schema.
-        #assert reflected.kwargs['mysql_auto_increment'] == '5'
+        # assert reflected.kwargs['mysql_auto_increment'] == '5'
 
     def test_reflection_on_include_columns(self):
         """Test reflection of include_columns to be sure they respect case."""
 
-        case_table = Table('mysql_case', MetaData(testing.db),
+        case_table = Table(
+            'mysql_case', MetaData(testing.db),
             Column('c1', String(10)),
             Column('C2', String(10)),
             Column('C3', String(10)))
@@ -128,85 +302,15 @@ class ReflectionTest(fixtures.TestBase, AssertsExecutionResults):
             for t in case_table, reflected:
                 assert 'c1' in t.c.keys()
                 assert 'C2' in t.c.keys()
-            reflected2 = Table('mysql_case', MetaData(testing.db),
-                              autoload=True, include_columns=['c1', 'c2'])
+            reflected2 = Table(
+                'mysql_case', MetaData(testing.db),
+                autoload=True, include_columns=['c1', 'c2'])
             assert 'c1' in reflected2.c.keys()
             for c in ['c2', 'C2', 'C3']:
                 assert c not in reflected2.c.keys()
         finally:
             case_table.drop()
 
-    @testing.exclude('mysql', '<', (5, 0, 0), 'early types are squirrely')
-    @testing.uses_deprecated('Using String type with no length')
-    @testing.uses_deprecated('Manually quoting ENUM value literals')
-    def test_type_reflection(self):
-        # (ask_for, roundtripped_as_if_different)
-        specs = [(String(1), mysql.MSString(1), ),
-                 (String(3), mysql.MSString(3), ),
-                 (Text(), mysql.MSText(), ),
-                 (Unicode(1), mysql.MSString(1), ),
-                 (Unicode(3), mysql.MSString(3), ),
-                 (UnicodeText(), mysql.MSText(), ),
-                 (mysql.MSChar(1), ),
-                 (mysql.MSChar(3), ),
-                 (NCHAR(2), mysql.MSChar(2), ),
-                 (mysql.MSNChar(2), mysql.MSChar(2), ), # N is CREATE only
-                 (mysql.MSNVarChar(22), mysql.MSString(22), ),
-                 (SmallInteger(), mysql.MSSmallInteger(), ),
-                 (SmallInteger(), mysql.MSSmallInteger(4), ),
-                 (mysql.MSSmallInteger(), ),
-                 (mysql.MSSmallInteger(4), mysql.MSSmallInteger(4), ),
-                 (mysql.MSMediumInteger(), mysql.MSMediumInteger(), ),
-                 (mysql.MSMediumInteger(8), mysql.MSMediumInteger(8), ),
-                 (LargeBinary(3), mysql.TINYBLOB(), ),
-                 (LargeBinary(), mysql.BLOB() ),
-                 (mysql.MSBinary(3), mysql.MSBinary(3), ),
-                 (mysql.MSVarBinary(3),),
-                 (mysql.MSTinyBlob(),),
-                 (mysql.MSBlob(),),
-                 (mysql.MSBlob(1234), mysql.MSBlob()),
-                 (mysql.MSMediumBlob(),),
-                 (mysql.MSLongBlob(),),
-                 (mysql.ENUM("''","'fleem'"), ),
-                 ]
-
-        columns = [Column('c%i' % (i + 1), t[0]) for i, t in enumerate(specs)]
-
-        db = testing.db
-        m = MetaData(db)
-        t_table = Table('mysql_types', m, *columns)
-        try:
-            m.create_all()
-
-            m2 = MetaData(db)
-            rt = Table('mysql_types', m2, autoload=True)
-            try:
-                db.execute('CREATE OR REPLACE VIEW mysql_types_v '
-                           'AS SELECT * from mysql_types')
-                rv = Table('mysql_types_v', m2, autoload=True)
-
-                expected = [len(c) > 1 and c[1] or c[0] for c in specs]
-
-                # Early 5.0 releases seem to report more "general" for columns
-                # in a view, e.g. char -> varchar, tinyblob -> mediumblob
-                #
-                # Not sure exactly which point version has the fix.
-                if db.dialect.server_version_info < (5, 0, 11):
-                    tables = rt,
-                else:
-                    tables = rt, rv
-
-                for table in tables:
-                    for i, reflected in enumerate(table.c):
-                        assert isinstance(reflected.type,
-                                type(expected[i])), \
-                            'element %d: %r not instance of %r' % (i,
-                                reflected.type, type(expected[i]))
-            finally:
-                db.execute('DROP VIEW mysql_types_v')
-        finally:
-            m.drop_all()
-
     def test_autoincrement(self):
         meta = MetaData(testing.db)
         try:
@@ -315,7 +419,7 @@ class ReflectionTest(fixtures.TestBase, AssertsExecutionResults):
             ["t TIMESTAMP"],
             ["u TIMESTAMP DEFAULT CURRENT_TIMESTAMP"]
         ]):
-            Table("nn_t%d" % idx, meta) # to allow DROP
+            Table("nn_t%d" % idx, meta)  # to allow DROP
 
             testing.db.execute("""
                 CREATE TABLE nn_t%d (
@@ -386,7 +490,8 @@ class ReflectionTest(fixtures.TestBase, AssertsExecutionResults):
 class RawReflectionTest(fixtures.TestBase):
     def setup(self):
         dialect = mysql.dialect()
-        self.parser = mysql.MySQLTableDefinitionParser(dialect, dialect.identifier_preparer)
+        self.parser = mysql.MySQLTableDefinitionParser(
+            dialect, dialect.identifier_preparer)
 
     def test_key_reflection(self):
         regex = self.parser._re_key
@@ -397,10 +502,14 @@ class RawReflectionTest(fixtures.TestBase):
         assert regex.match('  PRIMARY KEY (`id`)')
         assert regex.match('  PRIMARY KEY USING BTREE (`id`)')
         assert regex.match('  PRIMARY KEY (`id`) USING BTREE')
-        assert regex.match('  PRIMARY KEY (`id`) USING BTREE KEY_BLOCK_SIZE 16')
-        assert regex.match('  PRIMARY KEY (`id`) USING BTREE KEY_BLOCK_SIZE=16')
-        assert regex.match('  PRIMARY KEY (`id`) USING BTREE KEY_BLOCK_SIZE  = 16')
-        assert not regex.match('  PRIMARY KEY (`id`) USING BTREE KEY_BLOCK_SIZE = = 16')
+        assert regex.match(
+            '  PRIMARY KEY (`id`) USING BTREE KEY_BLOCK_SIZE 16')
+        assert regex.match(
+            '  PRIMARY KEY (`id`) USING BTREE KEY_BLOCK_SIZE=16')
+        assert regex.match(
+            '  PRIMARY KEY (`id`) USING BTREE KEY_BLOCK_SIZE  = 16')
+        assert not regex.match(
+            '  PRIMARY KEY (`id`) USING BTREE KEY_BLOCK_SIZE = = 16')
 
     def test_fk_reflection(self):
         regex = self.parser._re_constraint