]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- flake8
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 26 Sep 2014 18:36:55 +0000 (14:36 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 26 Sep 2014 18:36:55 +0000 (14:36 -0400)
test/engine/test_reconnect.py

index f92b874da8924fe0915f81d429cab5cf98b1a9e3..188bdcad54f7d115dcc2bd25fb82934d5120da25 100644 (file)
@@ -5,19 +5,20 @@ from sqlalchemy.testing.schema import Table, Column
 import sqlalchemy as tsa
 from sqlalchemy import testing
 from sqlalchemy.testing import engines
-from sqlalchemy.testing.util import gc_collect
 from sqlalchemy import exc, util
 from sqlalchemy.testing import fixtures
 from sqlalchemy.testing.engines import testing_engine
-from sqlalchemy.testing import is_not_
 from sqlalchemy.testing.mock import Mock, call
 
+
 class MockError(Exception):
     pass
 
+
 class MockDisconnect(MockError):
     pass
 
+
 def mock_connection():
     def mock_cursor():
         def execute(*args, **kwargs):
@@ -25,10 +26,12 @@ def mock_connection():
                 raise MockDisconnect("Lost the DB connection on execute")
             elif conn.explode in ('execute_no_disconnect', ):
                 raise MockError(
-                    "something broke on execute but we didn't lose the connection")
+                    "something broke on execute but we didn't "
+                    "lose the connection")
             elif conn.explode in ('rollback', 'rollback_no_disconnect'):
                 raise MockError(
-                    "something broke on execute but we didn't lose the connection")
+                    "something broke on execute but we didn't "
+                    "lose the connection")
             elif args and "SELECT" in args[0]:
                 cursor.description = [('foo', None, None, None, None, None)]
             else:
@@ -38,9 +41,9 @@ def mock_connection():
             cursor.fetchall = cursor.fetchone = \
                 Mock(side_effect=MockError("cursor closed"))
         cursor = Mock(
-                    execute=Mock(side_effect=execute),
-                    close=Mock(side_effect=close)
-                )
+            execute=Mock(side_effect=execute),
+            close=Mock(side_effect=close)
+        )
         return cursor
 
     def cursor():
@@ -52,18 +55,21 @@ def mock_connection():
             raise MockDisconnect("Lost the DB connection on rollback")
         if conn.explode == 'rollback_no_disconnect':
             raise MockError(
-                "something broke on rollback but we didn't lose the connection")
+                "something broke on rollback but we didn't "
+                "lose the connection")
         else:
             return
 
     conn = Mock(
-                rollback=Mock(side_effect=rollback),
-                cursor=Mock(side_effect=cursor())
-            )
+        rollback=Mock(side_effect=rollback),
+        cursor=Mock(side_effect=cursor())
+    )
     return conn
 
+
 def MockDBAPI():
     connections = []
+
     def connect():
         while True:
             conn = mock_connection()
@@ -80,27 +86,29 @@ def MockDBAPI():
         connections[:] = []
 
     return Mock(
-                connect=Mock(side_effect=connect()),
-                shutdown=Mock(side_effect=shutdown),
-                dispose=Mock(side_effect=dispose),
-                paramstyle='named',
-                connections=connections,
-                Error=MockError
-            )
+        connect=Mock(side_effect=connect()),
+        shutdown=Mock(side_effect=shutdown),
+        dispose=Mock(side_effect=dispose),
+        paramstyle='named',
+        connections=connections,
+        Error=MockError
+    )
 
 
 class MockReconnectTest(fixtures.TestBase):
+
     def setup(self):
         self.dbapi = MockDBAPI()
 
         self.db = testing_engine(
-                    'postgresql://foo:bar@localhost/test',
-                    options=dict(module=self.dbapi, _initialize=False))
+            'postgresql://foo:bar@localhost/test',
+            options=dict(module=self.dbapi, _initialize=False))
 
         self.mock_connect = call(host='localhost', password='bar',
-                                user='foo', database='test')
+                                 user='foo', database='test')
         # monkeypatch disconnect checker
-        self.db.dialect.is_disconnect = lambda e, conn, cursor: isinstance(e, MockDisconnect)
+        self.db.dialect.is_disconnect = \
+            lambda e, conn, cursor: isinstance(e, MockDisconnect)
 
     def teardown(self):
         self.dbapi.dispose()
@@ -195,7 +203,7 @@ class MockReconnectTest(fixtures.TestBase):
         assert_raises_message(
             tsa.exc.InvalidRequestError,
             "Can't reconnect until invalid transaction is "
-                "rolled back",
+            "rolled back",
             trans.commit
         )
 
@@ -351,16 +359,17 @@ class MockReconnectTest(fixtures.TestBase):
         )
 
     def test_dialect_initialize_once(self):
-        from sqlalchemy.engine.base import Engine
         from sqlalchemy.engine.url import URL
         from sqlalchemy.engine.default import DefaultDialect
-        from sqlalchemy.pool import QueuePool
         dbapi = self.dbapi
 
         mock_dialect = Mock()
+
         class MyURL(URL):
+
             def get_dialect(self):
                 return Dialect
+
         class Dialect(DefaultDialect):
             initialize = Mock()
 
@@ -371,11 +380,11 @@ class MockReconnectTest(fixtures.TestBase):
         eq_(Dialect.initialize.call_count, 1)
 
 
-
 class CursorErrTest(fixtures.TestBase):
     # this isn't really a "reconnect" test, it's more of
     # a generic "recovery".   maybe this test suite should have been
     # named "test_error_recovery".
+
     def _fixture(self, explode_on_exec, initialize):
         class DBAPIError(Exception):
             pass
@@ -394,29 +403,30 @@ class CursorErrTest(fixtures.TestBase):
                             description=[],
                             close=Mock(side_effect=Exception("explode")),
                         )
+
             def connect():
                 while True:
                     yield Mock(
-                                spec=['cursor', 'commit', 'rollback', 'close'],
-                                cursor=Mock(side_effect=cursor()),
-                            )
+                        spec=['cursor', 'commit', 'rollback', 'close'],
+                        cursor=Mock(side_effect=cursor()),
+                    )
 
             return Mock(
-                        Error = DBAPIError,
-                        paramstyle='qmark',
-                        connect=Mock(side_effect=connect())
-                    )
+                Error=DBAPIError,
+                paramstyle='qmark',
+                connect=Mock(side_effect=connect())
+            )
         dbapi = MockDBAPI()
 
         from sqlalchemy.engine import default
         url = Mock(
-                    get_dialect=lambda: default.DefaultDialect,
-                    translate_connect_args=lambda: {},
-                    query={},
-                )
+            get_dialect=lambda: default.DefaultDialect,
+            translate_connect_args=lambda: {},
+            query={},
+        )
         eng = testing_engine(
-                    url,
-                    options=dict(module=dbapi, _initialize=initialize))
+            url,
+            options=dict(module=dbapi, _initialize=initialize))
         eng.pool.logger = Mock()
         return eng
 
@@ -508,7 +518,6 @@ class RealReconnectTest(fixtures.TestBase):
         # pool isn't replaced
         assert self.engine.pool is p2
 
-
     def test_ensure_is_disconnect_gets_connection(self):
         def is_disconnect(e, conn, cursor):
             # connection is still present
@@ -556,6 +565,7 @@ class RealReconnectTest(fixtures.TestBase):
         "Crashes on py3k+cx_oracle")
     def test_explode_in_initializer(self):
         engine = engines.testing_engine()
+
         def broken_initialize(connection):
             connection.execute("select fake_stuff from _fake_table")
 
@@ -569,6 +579,7 @@ class RealReconnectTest(fixtures.TestBase):
         "Crashes on py3k+cx_oracle")
     def test_explode_in_initializer_disconnect(self):
         engine = engines.testing_engine()
+
         def broken_initialize(connection):
             connection.execute("select fake_stuff from _fake_table")
 
@@ -584,7 +595,6 @@ class RealReconnectTest(fixtures.TestBase):
         # invalidate() also doesn't screw up
         assert_raises(exc.DBAPIError, engine.connect)
 
-
     def test_null_pool(self):
         engine = \
             engines.reconnecting_engine(options=dict(poolclass=pool.NullPool))
@@ -623,8 +633,8 @@ class RealReconnectTest(fixtures.TestBase):
         assert trans.is_active
         assert_raises_message(
             tsa.exc.StatementError,
-            "Can't reconnect until invalid transaction is "\
-                "rolled back",
+            "Can't reconnect until invalid transaction is "
+            "rolled back",
             conn.execute, select([1])
         )
         assert trans.is_active
@@ -640,13 +650,14 @@ class RealReconnectTest(fixtures.TestBase):
         eq_(conn.execute(select([1])).scalar(), 1)
         assert not conn.invalidated
 
+
 class RecycleTest(fixtures.TestBase):
     __backend__ = True
 
     def test_basic(self):
         for threadlocal in False, True:
             engine = engines.reconnecting_engine(
-                        options={'pool_threadlocal': threadlocal})
+                options={'pool_threadlocal': threadlocal})
 
             conn = engine.contextual_connect()
             eq_(conn.execute(select([1])).scalar(), 1)
@@ -671,6 +682,7 @@ class RecycleTest(fixtures.TestBase):
             eq_(conn.execute(select([1])).scalar(), 1)
             conn.close()
 
+
 class InvalidateDuringResultTest(fixtures.TestBase):
     __backend__ = True
 
@@ -678,8 +690,8 @@ class InvalidateDuringResultTest(fixtures.TestBase):
         self.engine = engines.reconnecting_engine()
         self.meta = MetaData(self.engine)
         table = Table('sometable', self.meta,
-            Column('id', Integer, primary_key=True),
-            Column('name', String(50)))
+                      Column('id', Integer, primary_key=True),
+                      Column('name', String(50)))
         self.meta.create_all()
         table.insert().execute(
             [{'id': i, 'name': 'row %d' % i} for i in range(1, 100)]
@@ -690,10 +702,10 @@ class InvalidateDuringResultTest(fixtures.TestBase):
         self.engine.dispose()
 
     @testing.fails_if([
-                    '+mysqlconnector', '+mysqldb',
-                    '+cymysql', '+pymysql', '+pg8000'
-                    ], "Buffers the result set and doesn't check for "
-                        "connection close")
+        '+mysqlconnector', '+mysqldb',
+        '+cymysql', '+pymysql', '+pg8000'
+    ], "Buffers the result set and doesn't check for "
+        "connection close")
     def test_invalidate_on_results(self):
         conn = self.engine.connect()
         result = conn.execute('select * from sometable')
@@ -702,4 +714,3 @@ class InvalidateDuringResultTest(fixtures.TestBase):
         self.engine.test_shutdown()
         _assert_invalidated(result.fetchone)
         assert conn.invalidated
-