From: Mike Bayer Date: Tue, 29 Jul 2014 18:17:29 +0000 (-0400) Subject: pep8 cleanup X-Git-Tag: rel_1_0_0b1~263 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e5620993bf89338959a0cce4ba567d92b1dd1821;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git pep8 cleanup --- diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index 36f3fbf729..2d2086d5f9 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -14,15 +14,17 @@ from sqlalchemy import testing from sqlalchemy.testing import engines from sqlalchemy import util from sqlalchemy.testing.engines import testing_engine -import logging.handlers from sqlalchemy.dialects.oracle.zxjdbc import ReturningParam from sqlalchemy.engine import result as _result, default from sqlalchemy.engine.base import Engine from sqlalchemy.testing import fixtures from sqlalchemy.testing.mock import Mock, call, patch from contextlib import contextmanager, nested +import logging.handlers # needed for logging tests to work correctly users, metadata, users_autoinc = None, None, None + + class ExecuteTest(fixtures.TestBase): __backend__ = True @@ -30,13 +32,16 @@ class ExecuteTest(fixtures.TestBase): def setup_class(cls): global users, users_autoinc, metadata metadata = MetaData(testing.db) - users = Table('users', metadata, + users = Table( + 'users', metadata, Column('user_id', INT, primary_key=True, autoincrement=False), Column('user_name', VARCHAR(20)), ) - users_autoinc = Table('users_autoinc', metadata, - Column('user_id', INT, primary_key=True, - test_needs_autoincrement=True), + users_autoinc = Table( + 'users_autoinc', metadata, + Column( + 'user_id', INT, primary_key=True, + test_needs_autoincrement=True), Column('user_name', VARCHAR(20)), ) metadata.create_all() @@ -54,12 +59,12 @@ class ExecuteTest(fixtures.TestBase): "pg8000 still doesn't allow single paren without params") def test_no_params_option(self): stmt = "SELECT '%'" + testing.db.dialect.statement_compiler( - testing.db.dialect, None).default_from() + testing.db.dialect, None).default_from() conn = testing.db.connect() result = conn.\ - execution_options(no_parameters=True).\ - scalar(stmt) + execution_options(no_parameters=True).\ + scalar(stmt) eq_(result, '%') @testing.fails_on_everything_except('firebird', @@ -86,7 +91,7 @@ class ExecuteTest(fixtures.TestBase): (5, 'barney'), (6, 'donkey'), (7, 'sally'), - ] + ] for multiparam, param in [ (("jack", "fred"), {}), ((["jack", "fred"],), {}) @@ -99,7 +104,8 @@ class ExecuteTest(fixtures.TestBase): (1, 'jack'), (2, 'fred') ] - res = conn.execute("select * from users where user_name=?", + res = conn.execute( + "select * from users where user_name=?", "jack" ) assert res.fetchall() == [(1, 'jack')] @@ -113,8 +119,9 @@ class ExecuteTest(fixtures.TestBase): conn.close() # some psycopg2 versions bomb this. - @testing.fails_on_everything_except('mysql+mysqldb', 'mysql+pymysql', - 'mysql+cymysql', 'mysql+mysqlconnector', 'postgresql') + @testing.fails_on_everything_except( + 'mysql+mysqldb', 'mysql+pymysql', + 'mysql+cymysql', 'mysql+mysqlconnector', 'postgresql') @testing.fails_on('postgresql+zxjdbc', 'sprintf not supported') def test_raw_sprintf(self): def go(conn): @@ -126,8 +133,10 @@ class ExecuteTest(fixtures.TestBase): 'values (%s, %s)', 4, 'sally') conn.execute('insert into users (user_id) values (%s)', 5) res = conn.execute('select * from users order by user_id') - assert res.fetchall() == [(1, 'jack'), (2, 'ed'), (3, - 'horse'), (4, 'sally'), (5, None)] + assert res.fetchall() == [ + (1, 'jack'), (2, 'ed'), + (3, 'horse'), (4, 'sally'), (5, None) + ] for multiparam, param in [ (("jack", "ed"), {}), ((["jack", "ed"],), {}) @@ -140,7 +149,8 @@ class ExecuteTest(fixtures.TestBase): (1, 'jack'), (2, 'ed') ] - res = conn.execute("select * from users where user_name=%s", + res = conn.execute( + "select * from users where user_name=%s", "jack" ) assert res.fetchall() == [(1, 'jack')] @@ -157,25 +167,29 @@ class ExecuteTest(fixtures.TestBase): # versions have a bug that bombs out on this test. (1.2.2b3, # 1.2.2c1, 1.2.2) - @testing.skip_if(lambda : testing.against('mysql+mysqldb'), - 'db-api flaky') - @testing.fails_on_everything_except('postgresql+psycopg2', - 'postgresql+pypostgresql', 'mysql+mysqlconnector', - 'mysql+pymysql', 'mysql+cymysql') + @testing.skip_if( + lambda: testing.against('mysql+mysqldb'), 'db-api flaky') + @testing.fails_on_everything_except( + 'postgresql+psycopg2', + 'postgresql+pypostgresql', 'mysql+mysqlconnector', + 'mysql+pymysql', 'mysql+cymysql') def test_raw_python(self): def go(conn): - conn.execute('insert into users (user_id, user_name) ' - 'values (%(id)s, %(name)s)', {'id': 1, 'name' - : 'jack'}) - conn.execute('insert into users (user_id, user_name) ' - 'values (%(id)s, %(name)s)', {'id': 2, 'name' - : 'ed'}, {'id': 3, 'name': 'horse'}) - conn.execute('insert into users (user_id, user_name) ' - 'values (%(id)s, %(name)s)', id=4, name='sally' - ) + conn.execute( + 'insert into users (user_id, user_name) ' + 'values (%(id)s, %(name)s)', + {'id': 1, 'name': 'jack'}) + conn.execute( + 'insert into users (user_id, user_name) ' + 'values (%(id)s, %(name)s)', + {'id': 2, 'name': 'ed'}, {'id': 3, 'name': 'horse'}) + conn.execute( + 'insert into users (user_id, user_name) ' + 'values (%(id)s, %(name)s)', id=4, name='sally' + ) res = conn.execute('select * from users order by user_id') - assert res.fetchall() == [(1, 'jack'), (2, 'ed'), (3, - 'horse'), (4, 'sally')] + assert res.fetchall() == [ + (1, 'jack'), (2, 'ed'), (3, 'horse'), (4, 'sally')] conn.execute('delete from users') go(testing.db) conn = testing.db.connect() @@ -189,18 +203,18 @@ class ExecuteTest(fixtures.TestBase): def go(conn): conn.execute('insert into users (user_id, user_name) ' 'values (:id, :name)', {'id': 1, 'name': 'jack' - }) + }) conn.execute('insert into users (user_id, user_name) ' 'values (:id, :name)', {'id': 2, 'name': 'ed' - }, {'id': 3, 'name': 'horse'}) + }, {'id': 3, 'name': 'horse'}) conn.execute('insert into users (user_id, user_name) ' 'values (:id, :name)', id=4, name='sally') res = conn.execute('select * from users order by user_id') - assert res.fetchall() == [(1, 'jack'), (2, 'ed'), (3, - 'horse'), (4, 'sally')] + assert res.fetchall() == [ + (1, 'jack'), (2, 'ed'), (3, 'horse'), (4, 'sally')] conn.execute('delete from users') go(testing.db) - conn= testing.db.connect() + conn = testing.db.connect() try: go(conn) finally: @@ -223,11 +237,11 @@ class ExecuteTest(fixtures.TestBase): with e.connect() as c: c.connection.cursor = Mock( - return_value=Mock( - execute=Mock( - side_effect=TypeError("I'm not a DBAPI error") - )) - ) + return_value=Mock( + execute=Mock( + side_effect=TypeError("I'm not a DBAPI error") + )) + ) assert_raises_message( TypeError, @@ -261,10 +275,10 @@ class ExecuteTest(fixtures.TestBase): conn.execute, "select 1" ) - def test_exception_wrapping_non_dbapi_statement(self): class MyType(TypeDecorator): impl = Integer + def process_bind_param(self, value, dialect): raise Exception("nope") @@ -273,10 +287,10 @@ class ExecuteTest(fixtures.TestBase): tsa.exc.StatementError, r"nope \(original cause: Exception: nope\) u?'SELECT 1 ", conn.execute, - select([1]).\ - where( - column('foo') == literal('bar', MyType()) - ) + select([1]). + where( + column('foo') == literal('bar', MyType()) + ) ) _go(testing.db) conn = testing.db.connect() @@ -294,28 +308,31 @@ class ExecuteTest(fixtures.TestBase): "A value is required for bind parameter 'uname'" r'.*SELECT users.user_name AS .m\\xe9il.') if util.py2k else - util.u( - "A value is required for bind parameter 'uname'" - '.*SELECT users.user_name AS .méil.') - , + util.u( + "A value is required for bind parameter 'uname'" + '.*SELECT users.user_name AS .méil.'), conn.execute, select([users.c.user_name.label(name)]).where( - users.c.user_name == bindparam("uname")), + users.c.user_name == bindparam("uname")), {'uname_incorrect': 'foo'} ) def test_stmt_exception_pickleable_no_dbapi(self): self._test_stmt_exception_pickleable(Exception("hello world")) - @testing.crashes("postgresql+psycopg2", - "Older versions don't support cursor pickling, newer ones do") - @testing.fails_on("mysql+oursql", - "Exception doesn't come back exactly the same from pickle") - @testing.fails_on("mysql+mysqlconnector", - "Exception doesn't come back exactly the same from pickle") - @testing.fails_on("oracle+cx_oracle", - "cx_oracle exception seems to be having " - "some issue with pickling") + @testing.crashes( + "postgresql+psycopg2", + "Older versions don't support cursor pickling, newer ones do") + @testing.fails_on( + "mysql+oursql", + "Exception doesn't come back exactly the same from pickle") + @testing.fails_on( + "mysql+mysqlconnector", + "Exception doesn't come back exactly the same from pickle") + @testing.fails_on( + "oracle+cx_oracle", + "cx_oracle exception seems to be having " + "some issue with pickling") def test_stmt_exception_pickleable_plus_dbapi(self): raw = testing.db.raw_connection() the_orig = None @@ -333,21 +350,22 @@ class ExecuteTest(fixtures.TestBase): def _test_stmt_exception_pickleable(self, orig): for sa_exc in ( tsa.exc.StatementError("some error", - "select * from table", - {"foo":"bar"}, - orig), + "select * from table", + {"foo": "bar"}, + orig), tsa.exc.InterfaceError("select * from table", - {"foo":"bar"}, - orig), + {"foo": "bar"}, + orig), tsa.exc.NoReferencedTableError("message", "tname"), tsa.exc.NoReferencedColumnError("message", "tname", "cname"), - tsa.exc.CircularDependencyError("some message", [1, 2, 3], [(1, 2), (3, 4)]), + tsa.exc.CircularDependencyError( + "some message", [1, 2, 3], [(1, 2), (3, 4)]), ): for loads, dumps in picklers(): repickled = loads(dumps(sa_exc)) eq_(repickled.args[0], sa_exc.args[0]) if isinstance(sa_exc, tsa.exc.StatementError): - eq_(repickled.params, {"foo":"bar"}) + eq_(repickled.params, {"foo": "bar"}) eq_(repickled.statement, sa_exc.statement) if hasattr(sa_exc, "connection_invalidated"): eq_(repickled.connection_invalidated, @@ -360,6 +378,7 @@ class ExecuteTest(fixtures.TestBase): class MyType(TypeDecorator): impl = Integer + def process_bind_param(self, value, dialect): raise MyException("nope") @@ -368,10 +387,10 @@ class ExecuteTest(fixtures.TestBase): MyException, "nope", conn.execute, - select([1]).\ - where( - column('foo') == literal('bar', MyType()) - ) + select([1]). + where( + column('foo') == literal('bar', MyType()) + ) ) _go(testing.db) conn = testing.db.connect() @@ -384,21 +403,24 @@ class ExecuteTest(fixtures.TestBase): """test that execute() interprets [] as a list with no params""" testing.db.execute(users_autoinc.insert(). - values(user_name=bindparam('name', None)), []) + values(user_name=bindparam('name', None)), []) eq_(testing.db.execute(users_autoinc.select()).fetchall(), [(1, None)]) @testing.requires.ad_hoc_engines def test_engine_level_options(self): eng = engines.testing_engine(options={'execution_options': - {'foo': 'bar'}}) + {'foo': 'bar'}}) with eng.contextual_connect() as conn: eq_(conn._execution_options['foo'], 'bar') - eq_(conn.execution_options(bat='hoho')._execution_options['foo' - ], 'bar') - eq_(conn.execution_options(bat='hoho')._execution_options['bat' - ], 'hoho') - eq_(conn.execution_options(foo='hoho')._execution_options['foo' - ], 'hoho') + eq_( + conn.execution_options(bat='hoho')._execution_options['foo'], + 'bar') + eq_( + conn.execution_options(bat='hoho')._execution_options['bat'], + 'hoho') + eq_( + conn.execution_options(foo='hoho')._execution_options['foo'], + 'hoho') eng.update_execution_options(foo='hoho') conn = eng.contextual_connect() eq_(conn._execution_options['foo'], 'hoho') @@ -406,7 +428,7 @@ class ExecuteTest(fixtures.TestBase): @testing.requires.ad_hoc_engines def test_generative_engine_execution_options(self): eng = engines.testing_engine(options={'execution_options': - {'base': 'x1'}}) + {'base': 'x1'}}) eng1 = eng.execution_options(foo="b1") eng2 = eng.execution_options(foo="b2") @@ -414,15 +436,15 @@ class ExecuteTest(fixtures.TestBase): eng2a = eng2.execution_options(foo="b3", bar="a2") eq_(eng._execution_options, - {'base': 'x1'}) + {'base': 'x1'}) eq_(eng1._execution_options, - {'base': 'x1', 'foo': 'b1'}) + {'base': 'x1', 'foo': 'b1'}) eq_(eng2._execution_options, - {'base': 'x1', 'foo': 'b2'}) + {'base': 'x1', 'foo': 'b2'}) eq_(eng1a._execution_options, - {'base': 'x1', 'foo': 'b1', 'bar': 'a1'}) + {'base': 'x1', 'foo': 'b1', 'bar': 'a1'}) eq_(eng2a._execution_options, - {'base': 'x1', 'foo': 'b3', 'bar': 'a2'}) + {'base': 'x1', 'foo': 'b3', 'bar': 'a2'}) is_(eng1a.pool, eng.pool) # test pool is shared @@ -433,15 +455,18 @@ class ExecuteTest(fixtures.TestBase): @testing.requires.ad_hoc_engines def test_generative_engine_event_dispatch(self): canary = [] + def l1(*arg, **kw): canary.append("l1") + def l2(*arg, **kw): canary.append("l2") + def l3(*arg, **kw): canary.append("l3") eng = engines.testing_engine(options={'execution_options': - {'base': 'x1'}}) + {'base': 'x1'}}) event.listen(eng, "before_execute", l1) eng1 = eng.execution_options(foo="b1") @@ -465,6 +490,7 @@ class ExecuteTest(fixtures.TestBase): def test_unicode_test_fails_warning(self): class MockCursor(engines.DBAPIProxyCursor): + def execute(self, stmt, params=None, **kw): if "test unicode returns" in stmt: raise self.engine.dialect.dbapi.DatabaseError("boom") @@ -491,16 +517,17 @@ class ExecuteTest(fixtures.TestBase): eq_(eng.scalar(select([1])), 1) eng.dispose() + class ConvenienceExecuteTest(fixtures.TablesTest): __backend__ = True @classmethod def define_tables(cls, metadata): cls.table = Table('exec_test', metadata, - Column('a', Integer), - Column('b', Integer), - test_needs_acid=True - ) + Column('a', Integer), + Column('b', Integer), + test_needs_acid=True + ) def _trans_fn(self, is_transaction=False): def go(conn, x, value=None): @@ -539,8 +566,8 @@ class ConvenienceExecuteTest(fixtures.TablesTest): mock_connection = Mock( return_value=Mock( - begin=Mock(side_effect=Exception("boom")) - ) + begin=Mock(side_effect=Exception("boom")) + ) ) engine._connection_cls = mock_connection assert_raises( @@ -566,8 +593,8 @@ class ConvenienceExecuteTest(fixtures.TablesTest): def test_transaction_tlocal_engine_ctx_commit(self): fn = self._trans_fn() engine = engines.testing_engine(options=dict( - strategy='threadlocal', - pool=testing.db.pool)) + strategy='threadlocal', + pool=testing.db.pool)) ctx = engine.begin() testing.run_as_contextmanager(ctx, fn, 5, value=8) self._assert_fn(5, value=8) @@ -575,8 +602,8 @@ class ConvenienceExecuteTest(fixtures.TablesTest): def test_transaction_tlocal_engine_ctx_rollback(self): fn = self._trans_rollback_fn() engine = engines.testing_engine(options=dict( - strategy='threadlocal', - pool=testing.db.pool)) + strategy='threadlocal', + pool=testing.db.pool)) ctx = engine.begin() assert_raises_message( Exception, @@ -648,6 +675,7 @@ class ConvenienceExecuteTest(fixtures.TablesTest): ) self._assert_no_data() + class CompiledCacheTest(fixtures.TestBase): __backend__ = True @@ -656,10 +684,10 @@ class CompiledCacheTest(fixtures.TestBase): global users, metadata metadata = MetaData(testing.db) users = Table('users', metadata, - Column('user_id', INT, primary_key=True, - test_needs_autoincrement=True), - Column('user_name', VARCHAR(20)), - ) + Column('user_id', INT, primary_key=True, + test_needs_autoincrement=True), + Column('user_name', VARCHAR(20)), + ) metadata.create_all() @engines.close_first @@ -676,16 +704,18 @@ class CompiledCacheTest(fixtures.TestBase): cached_conn = conn.execution_options(compiled_cache=cache) ins = users.insert() - cached_conn.execute(ins, {'user_name':'u1'}) - cached_conn.execute(ins, {'user_name':'u2'}) - cached_conn.execute(ins, {'user_name':'u3'}) + cached_conn.execute(ins, {'user_name': 'u1'}) + cached_conn.execute(ins, {'user_name': 'u2'}) + cached_conn.execute(ins, {'user_name': 'u3'}) assert len(cache) == 1 eq_(conn.execute("select count(*) from users").scalar(), 3) class MockStrategyTest(fixtures.TestBase): + def _engine_fixture(self): buf = util.StringIO() + def dump(sql, *multiparams, **params): buf.write(util.text_type(sql.compile(dialect=engine.dialect))) engine = create_engine('postgresql://', strategy='mock', executor=dump) @@ -695,8 +725,9 @@ class MockStrategyTest(fixtures.TestBase): engine, buf = self._engine_fixture() metadata = MetaData() t = Table('testtable', metadata, - Column('pk', Integer, Sequence('testtable_pk_seq'), primary_key=True) - ) + Column( + 'pk', Integer, Sequence('testtable_pk_seq'), primary_key=True) + ) t.create(engine) t.drop(engine) @@ -711,6 +742,7 @@ class MockStrategyTest(fixtures.TestBase): ["SEQUENCE", "TABLE"] ) + class ResultProxyTest(fixtures.TestBase): __backend__ = True @@ -721,6 +753,7 @@ class ResultProxyTest(fixtures.TestBase): from sqlalchemy.engine import RowProxy class MyList(object): + def __init__(self, l): self.l = l @@ -730,8 +763,8 @@ class ResultProxyTest(fixtures.TestBase): def __getitem__(self, i): return list.__getitem__(self.l, i) - proxy = RowProxy(object(), MyList(['value']), [None], {'key' - : (None, None, 0), 0: (None, None, 0)}) + proxy = RowProxy(object(), MyList(['value']), [None], { + 'key': (None, None, 0), 0: (None, None, 0)}) eq_(list(proxy), ['value']) eq_(proxy[0], 'value') eq_(proxy['key'], 'value') @@ -751,22 +784,23 @@ class ResultProxyTest(fixtures.TestBase): engine = engines.testing_engine() t = Table('t1', metadata, - Column('data', String(10)) - ) + Column('data', String(10)) + ) metadata.create_all(engine) - with patch.object(engine.dialect.execution_ctx_cls, "rowcount") as mock_rowcount: + with patch.object( + engine.dialect.execution_ctx_cls, "rowcount") as mock_rowcount: mock_rowcount.__get__ = Mock() engine.execute(t.insert(), - {'data': 'd1'}, - {'data': 'd2'}, - {'data': 'd3'}) + {'data': 'd1'}, + {'data': 'd2'}, + {'data': 'd3'}) eq_(len(mock_rowcount.__get__.mock_calls), 0) eq_( - engine.execute(t.select()).fetchall(), - [('d1', ), ('d2', ), ('d3', )] + engine.execute(t.select()).fetchall(), + [('d1', ), ('d2', ), ('d3', )] ) eq_(len(mock_rowcount.__get__.mock_calls), 0) @@ -777,26 +811,25 @@ class ResultProxyTest(fixtures.TestBase): engine.execute(t.delete()) eq_(len(mock_rowcount.__get__.mock_calls), 2) - def test_rowproxy_is_sequence(self): import collections from sqlalchemy.engine import RowProxy - row = RowProxy(object(), ['value'], [None], {'key' - : (None, None, 0), 0: (None, None, 0)}) + row = RowProxy( + object(), ['value'], [None], + {'key': (None, None, 0), 0: (None, None, 0)}) assert isinstance(row, collections.Sequence) @testing.requires.cextensions def test_row_c_sequence_check(self): import csv - import collections metadata = MetaData() metadata.bind = 'sqlite://' users = Table('users', metadata, - Column('id', Integer, primary_key=True), - Column('name', String(40)), - ) + Column('id', Integer, primary_key=True), + Column('name', String(40)), + ) users.create() users.insert().execute(name='Test') @@ -818,7 +851,7 @@ class ResultProxyTest(fixtures.TestBase): lambda r: r.last_updated_params(), lambda r: r.prefetch_cols(), lambda r: r.postfetch_cols(), - lambda r : r.inserted_primary_key + lambda r: r.inserted_primary_key ], "Statement is not a compiled expression construct." ), @@ -826,7 +859,7 @@ class ResultProxyTest(fixtures.TestBase): select([1]), [ lambda r: r.last_inserted_params(), - lambda r : r.inserted_primary_key + lambda r: r.inserted_primary_key ], r"Statement is not an insert\(\) expression construct." ), @@ -841,7 +874,7 @@ class ResultProxyTest(fixtures.TestBase): select([1]), [ lambda r: r.prefetch_cols(), - lambda r : r.postfetch_cols() + lambda r: r.postfetch_cols() ], r"Statement is not an insert\(\) " r"or update\(\) expression construct." @@ -861,7 +894,9 @@ class ResultProxyTest(fixtures.TestBase): finally: r.close() + class ExecutionOptionsTest(fixtures.TestBase): + def test_dialect_conn_options(self): engine = testing_engine("sqlite://", options=dict(_initialize=False)) engine.dialect = Mock() @@ -884,7 +919,7 @@ class ExecutionOptionsTest(fixtures.TestBase): def test_dialect_engine_construction_options(self): dialect = Mock() engine = Engine(Mock(), dialect, Mock(), - execution_options={"foo": "bar"}) + execution_options={"foo": "bar"}) eq_( dialect.set_engine_execution_options.mock_calls, [call(engine, {"foo": "bar"})] @@ -892,13 +927,13 @@ class ExecutionOptionsTest(fixtures.TestBase): def test_propagate_engine_to_connection(self): engine = testing_engine("sqlite://", - options=dict(execution_options={"foo": "bar"})) + options=dict(execution_options={"foo": "bar"})) conn = engine.connect() eq_(conn._execution_options, {"foo": "bar"}) def test_propagate_option_engine_to_connection(self): e1 = testing_engine("sqlite://", - options=dict(execution_options={"foo": "bar"})) + options=dict(execution_options={"foo": "bar"})) e2 = e1.execution_options(bat="hoho") c1 = e1.connect() c2 = e2.connect() @@ -906,28 +941,25 @@ class ExecutionOptionsTest(fixtures.TestBase): eq_(c2._execution_options, {"foo": "bar", "bat": "hoho"}) - - - class AlternateResultProxyTest(fixtures.TestBase): __requires__ = ('sqlite', ) @classmethod def setup_class(cls): - from sqlalchemy.engine import base, default cls.engine = engine = testing_engine('sqlite://') m = MetaData() cls.table = t = Table('test', m, - Column('x', Integer, primary_key=True), - Column('y', String(50, convert_unicode='force')) - ) + Column('x', Integer, primary_key=True), + Column('y', String(50, convert_unicode='force')) + ) m.create_all(engine) engine.execute(t.insert(), [ - {'x':i, 'y':"t_%d" % i} for i in range(1, 12) + {'x': i, 'y': "t_%d" % i} for i in range(1, 12) ]) def _test_proxy(self, cls): class ExcCtx(default.DefaultExecutionContext): + def get_result_proxy(self): return cls(self) self.engine.dialect.execution_ctx_cls = ExcCtx @@ -970,6 +1002,7 @@ class AlternateResultProxyTest(fixtures.TestBase): def test_buffered_column_result_proxy(self): self._test_proxy(_result.BufferedColumnResultProxy) + class EngineEventsTest(fixtures.TestBase): __requires__ = 'ad_hoc_engines', __backend__ = True @@ -987,9 +1020,9 @@ class EngineEventsTest(fixtures.TestBase): teststmt, testparams, testmultiparams = \ received.pop(0) teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ', - teststmt).strip() - if teststmt.startswith(stmt) and (testparams - == params or testparams == posn): + teststmt).strip() + if teststmt.startswith(stmt) and ( + testparams == params or testparams == posn): break def test_per_engine_independence(self): @@ -1010,7 +1043,6 @@ class EngineEventsTest(fixtures.TestBase): e2.execute(s2) eq_([arg[1][1] for arg in canary.mock_calls], [s1, s1, s2]) - def test_per_engine_plus_global(self): canary = Mock() event.listen(Engine, "before_execute", canary.be1) @@ -1074,7 +1106,7 @@ class EngineEventsTest(fixtures.TestBase): event.listen(e1, "before_execute", canary.be1) conn = e1._connection_cls(e1, connection=e1.raw_connection(), - _has_events=False) + _has_events=False) conn.execute(select([1])) @@ -1096,14 +1128,14 @@ class EngineEventsTest(fixtures.TestBase): dialect = conn.dialect ctx = dialect.execution_ctx_cls._init_statement( - dialect, conn, conn.connection, stmt, {}) + dialect, conn, conn.connection, stmt, {}) ctx._execute_scalar(stmt, Integer()) eq_(canary.bce.mock_calls, - [call(conn, ctx.cursor, stmt, ctx.parameters[0], ctx, False)]) + [call(conn, ctx.cursor, stmt, ctx.parameters[0], ctx, False)]) eq_(canary.ace.mock_calls, - [call(conn, ctx.cursor, stmt, ctx.parameters[0], ctx, False)]) + [call(conn, ctx.cursor, stmt, ctx.parameters[0], ctx, False)]) def test_cursor_events_execute(self): canary = Mock() @@ -1120,15 +1152,15 @@ class EngineEventsTest(fixtures.TestBase): ctx = result.context eq_(canary.bce.mock_calls, - [call(conn, ctx.cursor, stmt, ctx.parameters[0], ctx, False)]) + [call(conn, ctx.cursor, stmt, ctx.parameters[0], ctx, False)]) eq_(canary.ace.mock_calls, - [call(conn, ctx.cursor, stmt, ctx.parameters[0], ctx, False)]) - + [call(conn, ctx.cursor, stmt, ctx.parameters[0], ctx, False)]) def test_argument_format_execute(self): def before_execute(conn, clauseelement, multiparams, params): assert isinstance(multiparams, (list, tuple)) assert isinstance(params, dict) + def after_execute(conn, clauseelement, multiparams, params, result): assert isinstance(multiparams, (list, tuple)) assert isinstance(params, dict) @@ -1141,9 +1173,6 @@ class EngineEventsTest(fixtures.TestBase): e1.execute(select([1]).compile(dialect=e1.dialect)) e1._execute_compiled(select([1]).compile(dialect=e1.dialect), (), {}) - - - @testing.fails_on('firebird', 'Data type unknown') def test_execute_events(self): @@ -1151,43 +1180,43 @@ class EngineEventsTest(fixtures.TestBase): cursor_stmts = [] def execute(conn, clauseelement, multiparams, - params ): + params): stmts.append((str(clauseelement), params, multiparams)) def cursor_execute(conn, cursor, statement, parameters, - context, executemany): + context, executemany): cursor_stmts.append((str(statement), parameters, None)) - for engine in [ - engines.testing_engine(options=dict(implicit_returning=False)), - engines.testing_engine(options=dict(implicit_returning=False, - strategy='threadlocal')), - engines.testing_engine(options=dict(implicit_returning=False)).\ - connect() - ]: + engines.testing_engine(options=dict(implicit_returning=False)), + engines.testing_engine(options=dict(implicit_returning=False, + strategy='threadlocal')), + engines.testing_engine(options=dict(implicit_returning=False)). + connect() + ]: event.listen(engine, 'before_execute', execute) event.listen(engine, 'before_cursor_execute', cursor_execute) m = MetaData(engine) t1 = Table('t1', m, - Column('c1', Integer, primary_key=True), - Column('c2', String(50), default=func.lower('Foo'), - primary_key=True) - ) + Column('c1', Integer, primary_key=True), + Column('c2', String(50), default=func.lower('Foo'), + primary_key=True) + ) m.create_all() try: t1.insert().execute(c1=5, c2='some data') t1.insert().execute(c1=6) - eq_(engine.execute('select * from t1').fetchall(), [(5, - 'some data'), (6, 'foo')]) + eq_( + engine.execute('select * from t1').fetchall(), + [(5, 'some data'), (6, 'foo')]) finally: m.drop_all() compiled = [('CREATE TABLE t1', {}, None), ('INSERT INTO t1 (c1, c2)', - {'c2': 'some data', 'c1': 5}, None), + {'c2': 'some data', 'c1': 5}, None), ('INSERT INTO t1 (c1, c2)', - {'c1': 6}, None), + {'c1': 6}, None), ('select * from t1', {}, None), ('DROP TABLE t1', {}, None)] @@ -1205,20 +1234,20 @@ class EngineEventsTest(fixtures.TestBase): (6, 'foo')), ('select * from t1', {}, ()), ('DROP TABLE t1', {}, ()), - ] + ] else: insert2_params = 6, 'Foo' if testing.against('oracle+zxjdbc'): insert2_params += (ReturningParam(12), ) cursor = [('CREATE TABLE t1', {}, ()), ('INSERT INTO t1 (c1, c2)', - {'c2': 'some data', 'c1': 5}, (5, 'some data')), - ('INSERT INTO t1 (c1, c2)', {'c1': 6, - 'lower_2': 'Foo'}, insert2_params), + {'c2': 'some data', 'c1': 5}, (5, 'some data')), + ('INSERT INTO t1 (c1, c2)', + {'c1': 6, 'lower_2': 'Foo'}, insert2_params), ('select * from t1', {}, ()), ('DROP TABLE t1', {}, ())] - # bind param name 'lower_2' might - # be incorrect + # bind param name 'lower_2' might + # be incorrect self._assert_stmts(compiled, stmts) self._assert_stmts(cursor, cursor_stmts) @@ -1236,14 +1265,15 @@ class EngineEventsTest(fixtures.TestBase): event.listen(engine, 'before_cursor_execute', cursor_execute) conn = engine.connect() c2 = conn.execution_options(foo='bar') - eq_(c2._execution_options, {'foo':'bar'}) + eq_(c2._execution_options, {'foo': 'bar'}) c2.execute(select([1])) c3 = c2.execution_options(bar='bat') - eq_(c3._execution_options, {'foo':'bar', 'bar':'bat'}) + eq_(c3._execution_options, {'foo': 'bar', 'bar': 'bat'}) eq_(canary, ['execute', 'cursor_execute']) def test_retval_flag(self): canary = [] + def tracker(name): def go(conn, *args, **kw): canary.append(name) @@ -1254,7 +1284,7 @@ class EngineEventsTest(fixtures.TestBase): return clauseelement, multiparams, params def cursor_execute(conn, cursor, statement, - parameters, context, executemany): + parameters, context, executemany): canary.append('cursor_execute') return statement, parameters @@ -1266,7 +1296,8 @@ class EngineEventsTest(fixtures.TestBase): ) event.listen(engine, "before_execute", execute, retval=True) - event.listen(engine, "before_cursor_execute", cursor_execute, retval=True) + event.listen( + engine, "before_cursor_execute", cursor_execute, retval=True) engine.execute(select([1])) eq_( canary, ['execute', 'cursor_execute'] @@ -1309,25 +1340,25 @@ class EngineEventsTest(fixtures.TestBase): [call(c2, {"c1": "opt_c1"}), call(c4, {"c3": "opt_c3"})] ) - @testing.requires.sequences @testing.provide_metadata def test_cursor_execute(self): canary = [] + def tracker(name): def go(conn, cursor, statement, parameters, context, executemany): canary.append((statement, context)) return go engine = engines.testing_engine() - t = Table('t', self.metadata, - Column('x', Integer, Sequence('t_id_seq'), primary_key=True), - implicit_returning=False - ) + Column('x', Integer, Sequence('t_id_seq'), primary_key=True), + implicit_returning=False + ) self.metadata.create_all(engine) with engine.begin() as conn: - event.listen(conn, 'before_cursor_execute', tracker('cursor_execute')) + event.listen( + conn, 'before_cursor_execute', tracker('cursor_execute')) conn.execute(t.insert()) # we see the sequence pre-executed in the first call assert "t_id_seq" in canary[0][0] @@ -1339,6 +1370,7 @@ class EngineEventsTest(fixtures.TestBase): def test_transactional(self): canary = [] + def tracker(name): def go(conn, *args, **kw): canary.append(name) @@ -1346,7 +1378,8 @@ class EngineEventsTest(fixtures.TestBase): engine = engines.testing_engine() event.listen(engine, 'before_execute', tracker('execute')) - event.listen(engine, 'before_cursor_execute', tracker('cursor_execute')) + event.listen( + engine, 'before_cursor_execute', tracker('cursor_execute')) event.listen(engine, 'begin', tracker('begin')) event.listen(engine, 'commit', tracker('commit')) event.listen(engine, 'rollback', tracker('rollback')) @@ -1359,20 +1392,23 @@ class EngineEventsTest(fixtures.TestBase): conn.execute(select([1])) trans.commit() - eq_(canary, [ - 'begin', 'execute', 'cursor_execute', 'rollback', - 'begin', 'execute', 'cursor_execute', 'commit', + eq_( + canary, [ + 'begin', 'execute', 'cursor_execute', 'rollback', + 'begin', 'execute', 'cursor_execute', 'commit', ]) @testing.requires.savepoints @testing.requires.two_phase_transactions def test_transactional_advanced(self): canary1 = [] + def tracker1(name): def go(*args, **kw): canary1.append(name) return go canary2 = [] + def tracker2(name): def go(*args, **kw): canary2.append(name) @@ -1380,16 +1416,16 @@ class EngineEventsTest(fixtures.TestBase): engine = engines.testing_engine() for name in ['begin', 'savepoint', - 'rollback_savepoint', 'release_savepoint', - 'rollback', 'begin_twophase', - 'prepare_twophase', 'commit_twophase']: + 'rollback_savepoint', 'release_savepoint', + 'rollback', 'begin_twophase', + 'prepare_twophase', 'commit_twophase']: event.listen(engine, '%s' % name, tracker1(name)) conn = engine.connect() for name in ['begin', 'savepoint', - 'rollback_savepoint', 'release_savepoint', - 'rollback', 'begin_twophase', - 'prepare_twophase', 'commit_twophase']: + 'rollback_savepoint', 'release_savepoint', + 'rollback', 'begin_twophase', + 'prepare_twophase', 'commit_twophase']: event.listen(conn, '%s' % name, tracker2(name)) trans = conn.begin() @@ -1407,15 +1443,16 @@ class EngineEventsTest(fixtures.TestBase): trans.commit() eq_(canary1, ['begin', 'savepoint', - 'rollback_savepoint', 'savepoint', 'release_savepoint', - 'rollback', 'begin_twophase', - 'prepare_twophase', 'commit_twophase'] - ) + 'rollback_savepoint', 'savepoint', 'release_savepoint', + 'rollback', 'begin_twophase', + 'prepare_twophase', 'commit_twophase'] + ) eq_(canary2, ['begin', 'savepoint', - 'rollback_savepoint', 'savepoint', 'release_savepoint', - 'rollback', 'begin_twophase', - 'prepare_twophase', 'commit_twophase'] - ) + 'rollback_savepoint', 'savepoint', 'release_savepoint', + 'rollback', 'begin_twophase', + 'prepare_twophase', 'commit_twophase'] + ) + class HandleErrorTest(fixtures.TestBase): __requires__ = 'ad_hoc_engines', @@ -1446,8 +1483,10 @@ class HandleErrorTest(fixtures.TestBase): event.listen(engine, 'dbapi_error', listener) nope = Exception("nope") + class MyType(TypeDecorator): impl = Integer + def process_bind_param(self, value, dialect): raise nope @@ -1456,8 +1495,8 @@ class HandleErrorTest(fixtures.TestBase): tsa.exc.StatementError, r"nope \(original cause: Exception: nope\) u?'SELECT 1 ", conn.execute, - select([1]).where( - column('foo') == literal('bar', MyType())) + select([1]).where( + column('foo') == literal('bar', MyType())) ) # no legacy event eq_(listener.mock_calls, []) @@ -1471,11 +1510,11 @@ class HandleErrorTest(fixtures.TestBase): nope = TypeError("I'm not a DBAPI error") with engine.connect() as c: c.connection.cursor = Mock( - return_value=Mock( - execute=Mock( - side_effect=nope - )) - ) + return_value=Mock( + execute=Mock( + side_effect=nope + )) + ) assert_raises_message( TypeError, @@ -1485,7 +1524,6 @@ class HandleErrorTest(fixtures.TestBase): # no legacy event eq_(listener.mock_calls, []) - def test_handle_error(self): engine = engines.testing_engine() canary = Mock(return_value=None) @@ -1575,8 +1613,8 @@ class HandleErrorTest(fixtures.TestBase): conn = engine.connect() with patch.object(engine. - dialect.execution_ctx_cls, - "handle_dbapi_exception") as patched: + dialect.execution_ctx_cls, + "handle_dbapi_exception") as patched: assert_raises_message( MyException2, "my exception chained", @@ -1585,8 +1623,8 @@ class HandleErrorTest(fixtures.TestBase): eq_(patched.call_count, 1) with patch.object(engine. - dialect.execution_ctx_cls, - "handle_dbapi_exception") as patched: + dialect.execution_ctx_cls, + "handle_dbapi_exception") as patched: assert_raises( MyException1, conn.execute, "SELECT 'ERROR TWO' FROM I_DONT_EXIST" @@ -1594,8 +1632,8 @@ class HandleErrorTest(fixtures.TestBase): eq_(patched.call_count, 1) with patch.object(engine. - dialect.execution_ctx_cls, - "handle_dbapi_exception") as patched: + dialect.execution_ctx_cls, + "handle_dbapi_exception") as patched: # test that non None from err1 isn't cancelled out # by err2 assert_raises( @@ -1605,8 +1643,8 @@ class HandleErrorTest(fixtures.TestBase): eq_(patched.call_count, 1) with patch.object(engine. - dialect.execution_ctx_cls, - "handle_dbapi_exception") as patched: + dialect.execution_ctx_cls, + "handle_dbapi_exception") as patched: assert_raises( tsa.exc.DBAPIError, conn.execute, "SELECT 'ERROR FIVE' FROM I_DONT_EXIST" @@ -1614,8 +1652,8 @@ class HandleErrorTest(fixtures.TestBase): eq_(patched.call_count, 1) with patch.object(engine. - dialect.execution_ctx_cls, - "handle_dbapi_exception") as patched: + dialect.execution_ctx_cls, + "handle_dbapi_exception") as patched: assert_raises_message( MyException3, "my exception short circuit", @@ -1636,8 +1674,10 @@ class HandleErrorTest(fixtures.TestBase): event.listen(engine, 'handle_error', listener) nope = Exception("nope") + class MyType(TypeDecorator): impl = Integer + def process_bind_param(self, value, dialect): raise nope @@ -1646,8 +1686,8 @@ class HandleErrorTest(fixtures.TestBase): tsa.exc.StatementError, r"nope \(original cause: Exception: nope\) u?'SELECT 1 ", conn.execute, - select([1]).where( - column('foo') == literal('bar', MyType())) + select([1]).where( + column('foo') == literal('bar', MyType())) ) ctx = listener.mock_calls[0][1][0] @@ -1669,11 +1709,11 @@ class HandleErrorTest(fixtures.TestBase): nope = TypeError("I'm not a DBAPI error") with engine.connect() as c: c.connection.cursor = Mock( - return_value=Mock( - execute=Mock( - side_effect=nope - )) - ) + return_value=Mock( + execute=Mock( + side_effect=nope + )) + ) assert_raises_message( TypeError, @@ -1721,7 +1761,7 @@ class HandleErrorTest(fixtures.TestBase): ctx.is_disconnect = evt_value with patch.object(engine.dialect, "is_disconnect", - Mock(return_value=orig_error)): + Mock(return_value=orig_error)): with engine.connect() as c: try: @@ -1738,7 +1778,9 @@ class HandleErrorTest(fixtures.TestBase): self._test_alter_disconnect(True, False) self._test_alter_disconnect(False, False) + class ProxyConnectionTest(fixtures.TestBase): + """These are the same tests as EngineEventsTest, except using the deprecated ConnectionProxy interface. @@ -1754,6 +1796,7 @@ class ProxyConnectionTest(fixtures.TestBase): cursor_stmts = [] class MyProxy(ConnectionProxy): + def execute( self, conn, @@ -1761,7 +1804,7 @@ class ProxyConnectionTest(fixtures.TestBase): clauseelement, *multiparams, **params - ): + ): stmts.append((str(clauseelement), params, multiparams)) return execute(clauseelement, *multiparams, **params) @@ -1773,7 +1816,7 @@ class ProxyConnectionTest(fixtures.TestBase): parameters, context, executemany, - ): + ): cursor_stmts.append((str(statement), parameters, None)) return execute(cursor, statement, parameters, context) @@ -1784,99 +1827,105 @@ class ProxyConnectionTest(fixtures.TestBase): while received: teststmt, testparams, testmultiparams = \ received.pop(0) - teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ', - teststmt).strip() - if teststmt.startswith(stmt) and (testparams - == params or testparams == posn): + teststmt = re.compile( + r'[\n\t ]+', re.M).sub(' ', teststmt).strip() + if teststmt.startswith(stmt) and ( + testparams == params or testparams == posn): break for engine in \ engines.testing_engine(options=dict(implicit_returning=False, - proxy=MyProxy())), \ + proxy=MyProxy())), \ engines.testing_engine(options=dict(implicit_returning=False, - proxy=MyProxy(), - strategy='threadlocal')): + proxy=MyProxy(), + strategy='threadlocal')): m = MetaData(engine) t1 = Table('t1', m, - Column('c1', Integer, primary_key=True), - Column('c2', String(50), default=func.lower('Foo'), - primary_key=True) - ) + Column('c1', Integer, primary_key=True), + Column('c2', String(50), default=func.lower('Foo'), + primary_key=True) + ) m.create_all() try: t1.insert().execute(c1=5, c2='some data') t1.insert().execute(c1=6) - eq_(engine.execute('select * from t1').fetchall(), [(5, - 'some data'), (6, 'foo')]) + eq_( + engine.execute('select * from t1').fetchall(), + [(5, 'some data'), (6, 'foo')]) finally: m.drop_all() engine.dispose() compiled = [('CREATE TABLE t1', {}, None), ('INSERT INTO t1 (c1, c2)', {'c2': 'some data', - 'c1': 5}, None), ('INSERT INTO t1 (c1, c2)', - {'c1': 6}, None), ('select * from t1', {}, - None), ('DROP TABLE t1', {}, None)] + 'c1': 5}, None), + ('INSERT INTO t1 (c1, c2)', {'c1': 6}, None), + ('select * from t1', {}, None), + ('DROP TABLE t1', {}, None)] if not testing.against('oracle+zxjdbc'): # or engine.dialect.pr # eexecute_pk_sequence # s: cursor = [ ('CREATE TABLE t1', {}, ()), - ('INSERT INTO t1 (c1, c2)', {'c2': 'some data', 'c1' - : 5}, (5, 'some data')), + ('INSERT INTO t1 (c1, c2)', { + 'c2': 'some data', 'c1': 5}, (5, 'some data')), ('SELECT lower', {'lower_2': 'Foo'}, ('Foo', )), ('INSERT INTO t1 (c1, c2)', {'c2': 'foo', 'c1': 6}, (6, 'foo')), ('select * from t1', {}, ()), ('DROP TABLE t1', {}, ()), - ] + ] else: insert2_params = 6, 'Foo' if testing.against('oracle+zxjdbc'): insert2_params += (ReturningParam(12), ) cursor = [('CREATE TABLE t1', {}, ()), - ('INSERT INTO t1 (c1, c2)', {'c2': 'some data' - , 'c1': 5}, (5, 'some data')), - ('INSERT INTO t1 (c1, c2)', {'c1': 6, - 'lower_2': 'Foo'}, insert2_params), - ('select * from t1', {}, ()), ('DROP TABLE t1' - , {}, ())] # bind param name 'lower_2' might - # be incorrect + ('INSERT INTO t1 (c1, c2)', { + 'c2': 'some data', 'c1': 5}, (5, 'some data')), + ('INSERT INTO t1 (c1, c2)', + {'c1': 6, 'lower_2': 'Foo'}, insert2_params), + ('select * from t1', {}, ()), + ('DROP TABLE t1', {}, ())] assert_stmts(compiled, stmts) assert_stmts(cursor, cursor_stmts) @testing.uses_deprecated(r'.*Use event.listen') def test_options(self): canary = [] + class TrackProxy(ConnectionProxy): + def __getattribute__(self, key): fn = object.__getattribute__(self, key) + def go(*arg, **kw): canary.append(fn.__name__) return fn(*arg, **kw) return go - engine = engines.testing_engine(options={'proxy':TrackProxy()}) + engine = engines.testing_engine(options={'proxy': TrackProxy()}) conn = engine.connect() c2 = conn.execution_options(foo='bar') - eq_(c2._execution_options, {'foo':'bar'}) + eq_(c2._execution_options, {'foo': 'bar'}) c2.execute(select([1])) c3 = c2.execution_options(bar='bat') - eq_(c3._execution_options, {'foo':'bar', 'bar':'bat'}) + eq_(c3._execution_options, {'foo': 'bar', 'bar': 'bat'}) eq_(canary, ['execute', 'cursor_execute']) - @testing.uses_deprecated(r'.*Use event.listen') def test_transactional(self): canary = [] + class TrackProxy(ConnectionProxy): + def __getattribute__(self, key): fn = object.__getattribute__(self, key) + def go(*arg, **kw): canary.append(fn.__name__) return fn(*arg, **kw) return go - engine = engines.testing_engine(options={'proxy':TrackProxy()}) + engine = engines.testing_engine(options={'proxy': TrackProxy()}) conn = engine.connect() trans = conn.begin() conn.execute(select([1])) @@ -1885,9 +1934,10 @@ class ProxyConnectionTest(fixtures.TestBase): conn.execute(select([1])) trans.commit() - eq_(canary, [ - 'begin', 'execute', 'cursor_execute', 'rollback', - 'begin', 'execute', 'cursor_execute', 'commit', + eq_( + canary, [ + 'begin', 'execute', 'cursor_execute', 'rollback', + 'begin', 'execute', 'cursor_execute', 'commit', ]) @testing.uses_deprecated(r'.*Use event.listen') @@ -1895,15 +1945,18 @@ class ProxyConnectionTest(fixtures.TestBase): @testing.requires.two_phase_transactions def test_transactional_advanced(self): canary = [] + class TrackProxy(ConnectionProxy): + def __getattribute__(self, key): fn = object.__getattribute__(self, key) + def go(*arg, **kw): canary.append(fn.__name__) return fn(*arg, **kw) return go - engine = engines.testing_engine(options={'proxy':TrackProxy()}) + engine = engines.testing_engine(options={'proxy': TrackProxy()}) conn = engine.connect() trans = conn.begin() @@ -1922,12 +1975,14 @@ class ProxyConnectionTest(fixtures.TestBase): canary = [t for t in canary if t not in ('cursor_execute', 'execute')] eq_(canary, ['begin', 'savepoint', - 'rollback_savepoint', 'savepoint', 'release_savepoint', - 'rollback', 'begin_twophase', - 'prepare_twophase', 'commit_twophase'] - ) + 'rollback_savepoint', 'savepoint', 'release_savepoint', + 'rollback', 'begin_twophase', + 'prepare_twophase', 'commit_twophase'] + ) + class DialectEventTest(fixtures.TestBase): + @contextmanager def _run_test(self, retval): m1 = Mock() @@ -1949,9 +2004,12 @@ class DialectEventTest(fixtures.TestBase): arg[-1].get_result_proxy = Mock(return_value=Mock(context=arg[-1])) return retval - m1.real_do_execute.side_effect = m1.do_execute.side_effect = mock_the_cursor - m1.real_do_executemany.side_effect = m1.do_executemany.side_effect = mock_the_cursor - m1.real_do_execute_no_params.side_effect = m1.do_execute_no_params.side_effect = mock_the_cursor + m1.real_do_execute.side_effect = \ + m1.do_execute.side_effect = mock_the_cursor + m1.real_do_executemany.side_effect = \ + m1.do_executemany.side_effect = mock_the_cursor + m1.real_do_execute_no_params.side_effect = \ + m1.do_execute_no_params.side_effect = mock_the_cursor with e.connect() as conn: yield conn, m1 @@ -1970,22 +2028,22 @@ class DialectEventTest(fixtures.TestBase): retval, m1.do_execute, m1.real_do_execute, [call( - result.context.cursor, - "insert into table foo", - {"foo": "bar"}, result.context)] + result.context.cursor, + "insert into table foo", + {"foo": "bar"}, result.context)] ) def _test_do_executemany(self, retval): with self._run_test(retval) as (conn, m1): result = conn.execute("insert into table foo", - [{"foo": "bar"}, {"foo": "bar"}]) + [{"foo": "bar"}, {"foo": "bar"}]) self._assert( retval, m1.do_executemany, m1.real_do_executemany, [call( - result.context.cursor, - "insert into table foo", - [{"foo": "bar"}, {"foo": "bar"}], result.context)] + result.context.cursor, + "insert into table foo", + [{"foo": "bar"}, {"foo": "bar"}], result.context)] ) def _test_do_execute_no_params(self, retval): @@ -1996,8 +2054,8 @@ class DialectEventTest(fixtures.TestBase): retval, m1.do_execute_no_params, m1.real_do_execute_no_params, [call( - result.context.cursor, - "insert into table foo", result.context)] + result.context.cursor, + "insert into table foo", result.context)] ) def _test_cursor_execute(self, retval): @@ -2007,7 +2065,7 @@ class DialectEventTest(fixtures.TestBase): stmt = "insert into table foo" params = {"foo": "bar"} ctx = dialect.execution_ctx_cls._init_statement( - dialect, conn, conn.connection, stmt, [params]) + dialect, conn, conn.connection, stmt, [params]) conn._cursor_execute(ctx.cursor, stmt, params, ctx) @@ -2015,9 +2073,9 @@ class DialectEventTest(fixtures.TestBase): retval, m1.do_execute, m1.real_do_execute, [call( - ctx.cursor, - "insert into table foo", - {"foo": "bar"}, ctx)] + ctx.cursor, + "insert into table foo", + {"foo": "bar"}, ctx)] ) def test_do_execute_w_replace(self): @@ -2043,4 +2101,3 @@ class DialectEventTest(fixtures.TestBase): def test_cursor_execute_wo_replace(self): self._test_cursor_execute(False) -