]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
PEP8 tidy of test/engine/test_reconnect
authorTony Locke <tlocke@tlocke.org.uk>
Sat, 26 Jul 2014 17:15:06 +0000 (18:15 +0100)
committerTony Locke <tlocke@tlocke.org.uk>
Sat, 2 Aug 2014 14:29:36 +0000 (15:29 +0100)
test/engine/test_reconnect.py

index f92b874da8924fe0915f81d429cab5cf98b1a9e3..c82cca5a15d08305fe91736d3193c07e4ef7f728 100644 (file)
@@ -1,23 +1,24 @@
 from sqlalchemy.testing import eq_, assert_raises, assert_raises_message
 import time
-from sqlalchemy import select, MetaData, Integer, String, create_engine, pool
+from sqlalchemy import (
+    select, MetaData, Integer, String, create_engine, pool, exc, util)
 from sqlalchemy.testing.schema import Table, Column
 import sqlalchemy as tsa
 from sqlalchemy import testing
 from sqlalchemy.testing import engines
-from sqlalchemy.testing.util import gc_collect
-from sqlalchemy import exc, util
 from sqlalchemy.testing import fixtures
 from sqlalchemy.testing.engines import testing_engine
-from sqlalchemy.testing import is_not_
 from sqlalchemy.testing.mock import Mock, call
 
+
 class MockError(Exception):
     pass
 
+
 class MockDisconnect(MockError):
     pass
 
+
 def mock_connection():
     def mock_cursor():
         def execute(*args, **kwargs):
@@ -25,10 +26,12 @@ def mock_connection():
                 raise MockDisconnect("Lost the DB connection on execute")
             elif conn.explode in ('execute_no_disconnect', ):
                 raise MockError(
-                    "something broke on execute but we didn't lose the connection")
+                    "something broke on execute but we didn't lose the "
+                    "connection")
             elif conn.explode in ('rollback', 'rollback_no_disconnect'):
                 raise MockError(
-                    "something broke on execute but we didn't lose the connection")
+                    "something broke on execute but we didn't lose the "
+                    "connection")
             elif args and "SELECT" in args[0]:
                 cursor.description = [('foo', None, None, None, None, None)]
             else:
@@ -38,9 +41,8 @@ def mock_connection():
             cursor.fetchall = cursor.fetchone = \
                 Mock(side_effect=MockError("cursor closed"))
         cursor = Mock(
-                    execute=Mock(side_effect=execute),
-                    close=Mock(side_effect=close)
-                )
+            execute=Mock(side_effect=execute),
+            close=Mock(side_effect=close))
         return cursor
 
     def cursor():
@@ -52,18 +54,20 @@ def mock_connection():
             raise MockDisconnect("Lost the DB connection on rollback")
         if conn.explode == 'rollback_no_disconnect':
             raise MockError(
-                "something broke on rollback but we didn't lose the connection")
+                "something broke on rollback but we didn't lose the "
+                "connection")
         else:
             return
 
     conn = Mock(
-                rollback=Mock(side_effect=rollback),
-                cursor=Mock(side_effect=cursor())
-            )
+        rollback=Mock(side_effect=rollback),
+        cursor=Mock(side_effect=cursor()))
     return conn
 
+
 def MockDBAPI():
     connections = []
+
     def connect():
         while True:
             conn = mock_connection()
@@ -80,13 +84,12 @@ def MockDBAPI():
         connections[:] = []
 
     return Mock(
-                connect=Mock(side_effect=connect()),
-                shutdown=Mock(side_effect=shutdown),
-                dispose=Mock(side_effect=dispose),
-                paramstyle='named',
-                connections=connections,
-                Error=MockError
-            )
+        connect=Mock(side_effect=connect()),
+        shutdown=Mock(side_effect=shutdown),
+        dispose=Mock(side_effect=dispose),
+        paramstyle='named',
+        connections=connections,
+        Error=MockError)
 
 
 class MockReconnectTest(fixtures.TestBase):
@@ -94,13 +97,14 @@ class MockReconnectTest(fixtures.TestBase):
         self.dbapi = MockDBAPI()
 
         self.db = testing_engine(
-                    'postgresql://foo:bar@localhost/test',
-                    options=dict(module=self.dbapi, _initialize=False))
+            'postgresql://foo:bar@localhost/test',
+            options=dict(module=self.dbapi, _initialize=False))
 
-        self.mock_connect = call(host='localhost', password='bar',
-                                user='foo', database='test')
+        self.mock_connect = call(
+            host='localhost', password='bar', user='foo', database='test')
         # monkeypatch disconnect checker
-        self.db.dialect.is_disconnect = lambda e, conn, cursor: isinstance(e, MockDisconnect)
+        self.db.dialect.is_disconnect = \
+            lambda e, conn, cursor: isinstance(e, MockDisconnect)
 
     def teardown(self):
         self.dbapi.dispose()
@@ -194,10 +198,8 @@ class MockReconnectTest(fixtures.TestBase):
 
         assert_raises_message(
             tsa.exc.InvalidRequestError,
-            "Can't reconnect until invalid transaction is "
-                "rolled back",
-            trans.commit
-        )
+            "Can't reconnect until invalid transaction is rolled back",
+            trans.commit)
 
         assert trans.is_active
         trans.rollback()
@@ -351,16 +353,16 @@ class MockReconnectTest(fixtures.TestBase):
         )
 
     def test_dialect_initialize_once(self):
-        from sqlalchemy.engine.base import Engine
         from sqlalchemy.engine.url import URL
         from sqlalchemy.engine.default import DefaultDialect
-        from sqlalchemy.pool import QueuePool
         dbapi = self.dbapi
 
         mock_dialect = Mock()
+
         class MyURL(URL):
             def get_dialect(self):
                 return Dialect
+
         class Dialect(DefaultDialect):
             initialize = Mock()
 
@@ -371,7 +373,6 @@ class MockReconnectTest(fixtures.TestBase):
         eq_(Dialect.initialize.call_count, 1)
 
 
-
 class CursorErrTest(fixtures.TestBase):
     # this isn't really a "reconnect" test, it's more of
     # a generic "recovery".   maybe this test suite should have been
@@ -394,29 +395,24 @@ class CursorErrTest(fixtures.TestBase):
                             description=[],
                             close=Mock(side_effect=Exception("explode")),
                         )
+
             def connect():
                 while True:
                     yield Mock(
-                                spec=['cursor', 'commit', 'rollback', 'close'],
-                                cursor=Mock(side_effect=cursor()),
-                            )
+                        spec=['cursor', 'commit', 'rollback', 'close'],
+                        cursor=Mock(side_effect=cursor()),)
 
             return Mock(
-                        Error = DBAPIError,
-                        paramstyle='qmark',
-                        connect=Mock(side_effect=connect())
-                    )
+                Error=DBAPIError, paramstyle='qmark',
+                connect=Mock(side_effect=connect()))
         dbapi = MockDBAPI()
 
         from sqlalchemy.engine import default
         url = Mock(
-                    get_dialect=lambda: default.DefaultDialect,
-                    translate_connect_args=lambda: {},
-                    query={},
-                )
+            get_dialect=lambda: default.DefaultDialect,
+            translate_connect_args=lambda: {}, query={},)
         eng = testing_engine(
-                    url,
-                    options=dict(module=dbapi, _initialize=initialize))
+            url, options=dict(module=dbapi, _initialize=initialize))
         eng.pool.logger = Mock()
         return eng
 
@@ -508,7 +504,6 @@ class RealReconnectTest(fixtures.TestBase):
         # pool isn't replaced
         assert self.engine.pool is p2
 
-
     def test_ensure_is_disconnect_gets_connection(self):
         def is_disconnect(e, conn, cursor):
             # connection is still present
@@ -556,6 +551,7 @@ class RealReconnectTest(fixtures.TestBase):
         "Crashes on py3k+cx_oracle")
     def test_explode_in_initializer(self):
         engine = engines.testing_engine()
+
         def broken_initialize(connection):
             connection.execute("select fake_stuff from _fake_table")
 
@@ -569,6 +565,7 @@ class RealReconnectTest(fixtures.TestBase):
         "Crashes on py3k+cx_oracle")
     def test_explode_in_initializer_disconnect(self):
         engine = engines.testing_engine()
+
         def broken_initialize(connection):
             connection.execute("select fake_stuff from _fake_table")
 
@@ -584,7 +581,6 @@ class RealReconnectTest(fixtures.TestBase):
         # invalidate() also doesn't screw up
         assert_raises(exc.DBAPIError, engine.connect)
 
-
     def test_null_pool(self):
         engine = \
             engines.reconnecting_engine(options=dict(poolclass=pool.NullPool))
@@ -623,10 +619,8 @@ class RealReconnectTest(fixtures.TestBase):
         assert trans.is_active
         assert_raises_message(
             tsa.exc.StatementError,
-            "Can't reconnect until invalid transaction is "\
-                "rolled back",
-            conn.execute, select([1])
-        )
+            "Can't reconnect until invalid transaction is rolled back",
+            conn.execute, select([1]))
         assert trans.is_active
         assert_raises_message(
             tsa.exc.InvalidRequestError,
@@ -640,13 +634,14 @@ class RealReconnectTest(fixtures.TestBase):
         eq_(conn.execute(select([1])).scalar(), 1)
         assert not conn.invalidated
 
+
 class RecycleTest(fixtures.TestBase):
     __backend__ = True
 
     def test_basic(self):
         for threadlocal in False, True:
             engine = engines.reconnecting_engine(
-                        options={'pool_threadlocal': threadlocal})
+                options={'pool_threadlocal': threadlocal})
 
             conn = engine.contextual_connect()
             eq_(conn.execute(select([1])).scalar(), 1)
@@ -671,13 +666,15 @@ class RecycleTest(fixtures.TestBase):
             eq_(conn.execute(select([1])).scalar(), 1)
             conn.close()
 
+
 class InvalidateDuringResultTest(fixtures.TestBase):
     __backend__ = True
 
     def setup(self):
         self.engine = engines.reconnecting_engine()
         self.meta = MetaData(self.engine)
-        table = Table('sometable', self.meta,
+        table = Table(
+            'sometable', self.meta,
             Column('id', Integer, primary_key=True),
             Column('name', String(50)))
         self.meta.create_all()
@@ -690,10 +687,8 @@ class InvalidateDuringResultTest(fixtures.TestBase):
         self.engine.dispose()
 
     @testing.fails_if([
-                    '+mysqlconnector', '+mysqldb',
-                    '+cymysql', '+pymysql', '+pg8000'
-                    ], "Buffers the result set and doesn't check for "
-                        "connection close")
+        '+mysqlconnector', '+mysqldb', '+cymysql', '+pymysql', '+pg8000'],
+        "Buffers the result set and doesn't check for connection close")
     def test_invalidate_on_results(self):
         conn = self.engine.connect()
         result = conn.execute('select * from sometable')
@@ -702,4 +697,3 @@ class InvalidateDuringResultTest(fixtures.TestBase):
         self.engine.test_shutdown()
         _assert_invalidated(result.fetchone)
         assert conn.invalidated
-