]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
PG dialect test fixes
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 2 Oct 2017 20:50:09 +0000 (16:50 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Mon, 2 Oct 2017 23:03:40 +0000 (19:03 -0400)
Make sure we clear the plugin registry before testing
that the "postgres" name raises.   Also move non-backend
tests out of MiscTest into a new suite.

Change-Id: Icd1bb4745aa07f52d585fcf959f76fcd8bdc7f24

lib/sqlalchemy/util/langhelpers.py
test/dialect/postgresql/test_dialect.py

index 9ca19f13812b028c94fbf9b17e64abe031754859..c90431d337abf09373aa007815e2fde5a7953aab 100644 (file)
@@ -193,6 +193,9 @@ class PluginLoader(object):
         self.impls = {}
         self.auto_fn = auto_fn
 
+    def clear(self):
+        self.impls.clear()
+
     def load(self, name):
         if name in self.impls:
             return self.impls[name]()
index 27e0ac065946dd985d2299a9abb92acfede5e026..e985718c772cdfb593b82afaefa0351a83a254cc 100644 (file)
@@ -20,33 +20,19 @@ from sqlalchemy.engine import url
 from sqlalchemy.testing import is_
 from sqlalchemy.testing import expect_deprecated
 from ...engine import test_execute
+from sqlalchemy import dialects
 
 
-class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
+class DialectTest(fixtures.TestBase):
+    """python-side dialect tests.  """
 
-    __only_on__ = 'postgresql'
-    __backend__ = True
-
-    @testing.provide_metadata
-    def test_date_reflection(self):
-        metadata = self.metadata
-        Table(
-            'pgdate', metadata, Column('date1', DateTime(timezone=True)),
-            Column('date2', DateTime(timezone=False)))
-        metadata.create_all()
-        m2 = MetaData(testing.db)
-        t2 = Table('pgdate', m2, autoload=True)
-        assert t2.c.date1.type.timezone is True
-        assert t2.c.date2.type.timezone is False
-
-    @testing.fails_on('+zxjdbc',
-                      'The JDBC driver handles the version parsing')
     def test_version_parsing(self):
 
         def mock_conn(res):
             return Mock(
                 execute=Mock(return_value=Mock(scalar=Mock(return_value=res))))
 
+        dialect = postgresql.dialect()
         for string, version in [
                 (
                     'PostgreSQL 8.3.8 on i686-redhat-linux-gnu, compiled by '
@@ -70,9 +56,52 @@ class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
                     'compiled by gcc (GCC) 4.8.5 20150623 '
                     '(Red Hat 4.8.5-11), 64-bit', (10,))
         ]:
-            eq_(testing.db.dialect._get_server_version_info(mock_conn(string)),
+            eq_(dialect._get_server_version_info(mock_conn(string)),
                 version)
 
+    def test_deprecated_dialect_name_still_loads(self):
+        dialects.registry.clear()
+        with expect_deprecated(
+                "The 'postgres' dialect name "
+                "has been renamed to 'postgresql'"):
+            dialect = url.URL("postgres").get_dialect()
+        is_(dialect, postgresql.dialect)
+
+    @testing.requires.psycopg2_compatibility
+    def test_pg_dialect_use_native_unicode_from_config(self):
+        config = {
+            'sqlalchemy.url': testing.db.url,
+            'sqlalchemy.use_native_unicode': "false"}
+
+        e = engine_from_config(config, _initialize=False)
+        eq_(e.dialect.use_native_unicode, False)
+
+        config = {
+            'sqlalchemy.url': testing.db.url,
+            'sqlalchemy.use_native_unicode': "true"}
+
+        e = engine_from_config(config, _initialize=False)
+        eq_(e.dialect.use_native_unicode, True)
+
+
+class MiscBackendTest(
+        fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
+
+    __only_on__ = 'postgresql'
+    __backend__ = True
+
+    @testing.provide_metadata
+    def test_date_reflection(self):
+        metadata = self.metadata
+        Table(
+            'pgdate', metadata, Column('date1', DateTime(timezone=True)),
+            Column('date2', DateTime(timezone=False)))
+        metadata.create_all()
+        m2 = MetaData(testing.db)
+        t2 = Table('pgdate', m2, autoload=True)
+        assert t2.c.date1.type.timezone is True
+        assert t2.c.date2.type.timezone is False
+
     @testing.requires.psycopg2_compatibility
     def test_psycopg2_version(self):
         v = testing.db.dialect.psycopg2_version
@@ -92,13 +121,6 @@ class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
             psycopg2.Error)
         assert isinstance(exception, exc.OperationalError)
 
-    def test_deprecated_dialect_name_still_loads(self):
-        with expect_deprecated(
-                "The 'postgres' dialect name "
-                "has been renamed to 'postgresql'"):
-            dialect = url.URL("postgres").get_dialect()
-        is_(dialect, postgresql.dialect)
-
     # currently not passing with pg 9.3 that does not seem to generate
     # any notices here, would rather find a way to mock this
     @testing.requires.no_coverage
@@ -142,22 +164,6 @@ class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
         new_encoding = c.execute("show client_encoding").fetchone()[0]
         eq_(new_encoding, test_encoding)
 
-    @testing.requires.psycopg2_compatibility
-    def test_pg_dialect_use_native_unicode_from_config(self):
-        config = {
-            'sqlalchemy.url': testing.db.url,
-            'sqlalchemy.use_native_unicode': "false"}
-
-        e = engine_from_config(config, _initialize=False)
-        eq_(e.dialect.use_native_unicode, False)
-
-        config = {
-            'sqlalchemy.url': testing.db.url,
-            'sqlalchemy.use_native_unicode': "true"}
-
-        e = engine_from_config(config, _initialize=False)
-        eq_(e.dialect.use_native_unicode, True)
-
     @testing.requires.psycopg2_or_pg8000_compatibility
     @engines.close_open_connections
     def test_autocommit_isolation_level(self):
@@ -186,45 +192,41 @@ class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
             ).scalar()
             eq_(r, exp)
 
+    @testing.provide_metadata
     def test_checksfor_sequence(self):
-        meta1 = MetaData(testing.db)
+        meta1 = self.metadata
         seq = Sequence('fooseq')
         t = Table(
             'mytable', meta1,
             Column('col1', Integer, seq)
         )
         seq.drop()
-        try:
-            testing.db.execute('CREATE SEQUENCE fooseq')
-            t.create(checkfirst=True)
-        finally:
-            t.drop(checkfirst=True)
+        testing.db.execute('CREATE SEQUENCE fooseq')
+        t.create(checkfirst=True)
 
+    @testing.provide_metadata
     def test_schema_roundtrips(self):
-        meta = MetaData(testing.db)
+        meta = self.metadata
         users = Table(
             'users', meta, Column(
                 'id', Integer, primary_key=True), Column(
                 'name', String(50)), schema='test_schema')
         users.create()
-        try:
-            users.insert().execute(id=1, name='name1')
-            users.insert().execute(id=2, name='name2')
-            users.insert().execute(id=3, name='name3')
-            users.insert().execute(id=4, name='name4')
-            eq_(users.select().where(users.c.name == 'name2')
-                .execute().fetchall(), [(2, 'name2')])
-            eq_(users.select(use_labels=True).where(
-                users.c.name == 'name2').execute().fetchall(), [(2, 'name2')])
-            users.delete().where(users.c.id == 3).execute()
-            eq_(users.select().where(users.c.name == 'name3')
-                .execute().fetchall(), [])
-            users.update().where(users.c.name == 'name4'
-                                 ).execute(name='newname')
-            eq_(users.select(use_labels=True).where(
-                users.c.id == 4).execute().fetchall(), [(4, 'newname')])
-        finally:
-            users.drop()
+        users.insert().execute(id=1, name='name1')
+        users.insert().execute(id=2, name='name2')
+        users.insert().execute(id=3, name='name3')
+        users.insert().execute(id=4, name='name4')
+        eq_(users.select().where(users.c.name == 'name2')
+            .execute().fetchall(), [(2, 'name2')])
+        eq_(users.select(use_labels=True).where(
+            users.c.name == 'name2').execute().fetchall(), [(2, 'name2')])
+        users.delete().where(users.c.id == 3).execute()
+        eq_(users.select().where(users.c.name == 'name3')
+            .execute().fetchall(), [])
+        users.update().where(users.c.name == 'name4'
+                             ).execute(name='newname')
+        eq_(users.select(use_labels=True).where(
+            users.c.id == 4).execute().fetchall(), [(4, 'newname')])
 
     def test_quoted_name_bindparam_ok(self):
         from sqlalchemy.sql.elements import quoted_name