From: Mike Bayer Date: Fri, 26 Sep 2014 18:36:55 +0000 (-0400) Subject: - flake8 X-Git-Tag: rel_0_9_8~15 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=d6aea828c45d1ecf386e5654e8b5f5f94e6e5ef6;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - flake8 --- diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py index f92b874da8..188bdcad54 100644 --- a/test/engine/test_reconnect.py +++ b/test/engine/test_reconnect.py @@ -5,19 +5,20 @@ from sqlalchemy.testing.schema import Table, Column import sqlalchemy as tsa from sqlalchemy import testing from sqlalchemy.testing import engines -from sqlalchemy.testing.util import gc_collect from sqlalchemy import exc, util from sqlalchemy.testing import fixtures from sqlalchemy.testing.engines import testing_engine -from sqlalchemy.testing import is_not_ from sqlalchemy.testing.mock import Mock, call + class MockError(Exception): pass + class MockDisconnect(MockError): pass + def mock_connection(): def mock_cursor(): def execute(*args, **kwargs): @@ -25,10 +26,12 @@ def mock_connection(): raise MockDisconnect("Lost the DB connection on execute") elif conn.explode in ('execute_no_disconnect', ): raise MockError( - "something broke on execute but we didn't lose the connection") + "something broke on execute but we didn't " + "lose the connection") elif conn.explode in ('rollback', 'rollback_no_disconnect'): raise MockError( - "something broke on execute but we didn't lose the connection") + "something broke on execute but we didn't " + "lose the connection") elif args and "SELECT" in args[0]: cursor.description = [('foo', None, None, None, None, None)] else: @@ -38,9 +41,9 @@ def mock_connection(): cursor.fetchall = cursor.fetchone = \ Mock(side_effect=MockError("cursor closed")) cursor = Mock( - execute=Mock(side_effect=execute), - close=Mock(side_effect=close) - ) + execute=Mock(side_effect=execute), + close=Mock(side_effect=close) + ) return cursor def cursor(): @@ -52,18 +55,21 @@ def mock_connection(): raise MockDisconnect("Lost the DB connection on rollback") if conn.explode == 'rollback_no_disconnect': raise MockError( - "something broke on rollback but we didn't lose the connection") + "something broke on rollback but we didn't " + "lose the connection") else: return conn = Mock( - rollback=Mock(side_effect=rollback), - cursor=Mock(side_effect=cursor()) - ) + rollback=Mock(side_effect=rollback), + cursor=Mock(side_effect=cursor()) + ) return conn + def MockDBAPI(): connections = [] + def connect(): while True: conn = mock_connection() @@ -80,27 +86,29 @@ def MockDBAPI(): connections[:] = [] return Mock( - connect=Mock(side_effect=connect()), - shutdown=Mock(side_effect=shutdown), - dispose=Mock(side_effect=dispose), - paramstyle='named', - connections=connections, - Error=MockError - ) + connect=Mock(side_effect=connect()), + shutdown=Mock(side_effect=shutdown), + dispose=Mock(side_effect=dispose), + paramstyle='named', + connections=connections, + Error=MockError + ) class MockReconnectTest(fixtures.TestBase): + def setup(self): self.dbapi = MockDBAPI() self.db = testing_engine( - 'postgresql://foo:bar@localhost/test', - options=dict(module=self.dbapi, _initialize=False)) + 'postgresql://foo:bar@localhost/test', + options=dict(module=self.dbapi, _initialize=False)) self.mock_connect = call(host='localhost', password='bar', - user='foo', database='test') + user='foo', database='test') # monkeypatch disconnect checker - self.db.dialect.is_disconnect = lambda e, conn, cursor: isinstance(e, MockDisconnect) + self.db.dialect.is_disconnect = \ + lambda e, conn, cursor: isinstance(e, MockDisconnect) def teardown(self): self.dbapi.dispose() @@ -195,7 +203,7 @@ class MockReconnectTest(fixtures.TestBase): assert_raises_message( tsa.exc.InvalidRequestError, "Can't reconnect until invalid transaction is " - "rolled back", + "rolled back", trans.commit ) @@ -351,16 +359,17 @@ class MockReconnectTest(fixtures.TestBase): ) def test_dialect_initialize_once(self): - from sqlalchemy.engine.base import Engine from sqlalchemy.engine.url import URL from sqlalchemy.engine.default import DefaultDialect - from sqlalchemy.pool import QueuePool dbapi = self.dbapi mock_dialect = Mock() + class MyURL(URL): + def get_dialect(self): return Dialect + class Dialect(DefaultDialect): initialize = Mock() @@ -371,11 +380,11 @@ class MockReconnectTest(fixtures.TestBase): eq_(Dialect.initialize.call_count, 1) - class CursorErrTest(fixtures.TestBase): # this isn't really a "reconnect" test, it's more of # a generic "recovery". maybe this test suite should have been # named "test_error_recovery". + def _fixture(self, explode_on_exec, initialize): class DBAPIError(Exception): pass @@ -394,29 +403,30 @@ class CursorErrTest(fixtures.TestBase): description=[], close=Mock(side_effect=Exception("explode")), ) + def connect(): while True: yield Mock( - spec=['cursor', 'commit', 'rollback', 'close'], - cursor=Mock(side_effect=cursor()), - ) + spec=['cursor', 'commit', 'rollback', 'close'], + cursor=Mock(side_effect=cursor()), + ) return Mock( - Error = DBAPIError, - paramstyle='qmark', - connect=Mock(side_effect=connect()) - ) + Error=DBAPIError, + paramstyle='qmark', + connect=Mock(side_effect=connect()) + ) dbapi = MockDBAPI() from sqlalchemy.engine import default url = Mock( - get_dialect=lambda: default.DefaultDialect, - translate_connect_args=lambda: {}, - query={}, - ) + get_dialect=lambda: default.DefaultDialect, + translate_connect_args=lambda: {}, + query={}, + ) eng = testing_engine( - url, - options=dict(module=dbapi, _initialize=initialize)) + url, + options=dict(module=dbapi, _initialize=initialize)) eng.pool.logger = Mock() return eng @@ -508,7 +518,6 @@ class RealReconnectTest(fixtures.TestBase): # pool isn't replaced assert self.engine.pool is p2 - def test_ensure_is_disconnect_gets_connection(self): def is_disconnect(e, conn, cursor): # connection is still present @@ -556,6 +565,7 @@ class RealReconnectTest(fixtures.TestBase): "Crashes on py3k+cx_oracle") def test_explode_in_initializer(self): engine = engines.testing_engine() + def broken_initialize(connection): connection.execute("select fake_stuff from _fake_table") @@ -569,6 +579,7 @@ class RealReconnectTest(fixtures.TestBase): "Crashes on py3k+cx_oracle") def test_explode_in_initializer_disconnect(self): engine = engines.testing_engine() + def broken_initialize(connection): connection.execute("select fake_stuff from _fake_table") @@ -584,7 +595,6 @@ class RealReconnectTest(fixtures.TestBase): # invalidate() also doesn't screw up assert_raises(exc.DBAPIError, engine.connect) - def test_null_pool(self): engine = \ engines.reconnecting_engine(options=dict(poolclass=pool.NullPool)) @@ -623,8 +633,8 @@ class RealReconnectTest(fixtures.TestBase): assert trans.is_active assert_raises_message( tsa.exc.StatementError, - "Can't reconnect until invalid transaction is "\ - "rolled back", + "Can't reconnect until invalid transaction is " + "rolled back", conn.execute, select([1]) ) assert trans.is_active @@ -640,13 +650,14 @@ class RealReconnectTest(fixtures.TestBase): eq_(conn.execute(select([1])).scalar(), 1) assert not conn.invalidated + class RecycleTest(fixtures.TestBase): __backend__ = True def test_basic(self): for threadlocal in False, True: engine = engines.reconnecting_engine( - options={'pool_threadlocal': threadlocal}) + options={'pool_threadlocal': threadlocal}) conn = engine.contextual_connect() eq_(conn.execute(select([1])).scalar(), 1) @@ -671,6 +682,7 @@ class RecycleTest(fixtures.TestBase): eq_(conn.execute(select([1])).scalar(), 1) conn.close() + class InvalidateDuringResultTest(fixtures.TestBase): __backend__ = True @@ -678,8 +690,8 @@ class InvalidateDuringResultTest(fixtures.TestBase): self.engine = engines.reconnecting_engine() self.meta = MetaData(self.engine) table = Table('sometable', self.meta, - Column('id', Integer, primary_key=True), - Column('name', String(50))) + Column('id', Integer, primary_key=True), + Column('name', String(50))) self.meta.create_all() table.insert().execute( [{'id': i, 'name': 'row %d' % i} for i in range(1, 100)] @@ -690,10 +702,10 @@ class InvalidateDuringResultTest(fixtures.TestBase): self.engine.dispose() @testing.fails_if([ - '+mysqlconnector', '+mysqldb', - '+cymysql', '+pymysql', '+pg8000' - ], "Buffers the result set and doesn't check for " - "connection close") + '+mysqlconnector', '+mysqldb', + '+cymysql', '+pymysql', '+pg8000' + ], "Buffers the result set and doesn't check for " + "connection close") def test_invalidate_on_results(self): conn = self.engine.connect() result = conn.execute('select * from sometable') @@ -702,4 +714,3 @@ class InvalidateDuringResultTest(fixtures.TestBase): self.engine.test_shutdown() _assert_invalidated(result.fetchone) assert conn.invalidated -