from sqlalchemy.testing import engines
from sqlalchemy import util
from sqlalchemy.testing.engines import testing_engine
-import logging.handlers
from sqlalchemy.dialects.oracle.zxjdbc import ReturningParam
from sqlalchemy.engine import result as _result, default
from sqlalchemy.engine.base import Engine
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.mock import Mock, call, patch
from contextlib import contextmanager, nested
+import logging.handlers # needed for logging tests to work correctly
users, metadata, users_autoinc = None, None, None
+
+
class ExecuteTest(fixtures.TestBase):
__backend__ = True
def setup_class(cls):
global users, users_autoinc, metadata
metadata = MetaData(testing.db)
- users = Table('users', metadata,
+ users = Table(
+ 'users', metadata,
Column('user_id', INT, primary_key=True, autoincrement=False),
Column('user_name', VARCHAR(20)),
)
- users_autoinc = Table('users_autoinc', metadata,
- Column('user_id', INT, primary_key=True,
- test_needs_autoincrement=True),
+ users_autoinc = Table(
+ 'users_autoinc', metadata,
+ Column(
+ 'user_id', INT, primary_key=True,
+ test_needs_autoincrement=True),
Column('user_name', VARCHAR(20)),
)
metadata.create_all()
"pg8000 still doesn't allow single paren without params")
def test_no_params_option(self):
stmt = "SELECT '%'" + testing.db.dialect.statement_compiler(
- testing.db.dialect, None).default_from()
+ testing.db.dialect, None).default_from()
conn = testing.db.connect()
result = conn.\
- execution_options(no_parameters=True).\
- scalar(stmt)
+ execution_options(no_parameters=True).\
+ scalar(stmt)
eq_(result, '%')
@testing.fails_on_everything_except('firebird',
(5, 'barney'),
(6, 'donkey'),
(7, 'sally'),
- ]
+ ]
for multiparam, param in [
(("jack", "fred"), {}),
((["jack", "fred"],), {})
(1, 'jack'),
(2, 'fred')
]
- res = conn.execute("select * from users where user_name=?",
+ res = conn.execute(
+ "select * from users where user_name=?",
"jack"
)
assert res.fetchall() == [(1, 'jack')]
conn.close()
# some psycopg2 versions bomb this.
- @testing.fails_on_everything_except('mysql+mysqldb', 'mysql+pymysql',
- 'mysql+cymysql', 'mysql+mysqlconnector', 'postgresql')
+ @testing.fails_on_everything_except(
+ 'mysql+mysqldb', 'mysql+pymysql',
+ 'mysql+cymysql', 'mysql+mysqlconnector', 'postgresql')
@testing.fails_on('postgresql+zxjdbc', 'sprintf not supported')
def test_raw_sprintf(self):
def go(conn):
'values (%s, %s)', 4, 'sally')
conn.execute('insert into users (user_id) values (%s)', 5)
res = conn.execute('select * from users order by user_id')
- assert res.fetchall() == [(1, 'jack'), (2, 'ed'), (3,
- 'horse'), (4, 'sally'), (5, None)]
+ assert res.fetchall() == [
+ (1, 'jack'), (2, 'ed'),
+ (3, 'horse'), (4, 'sally'), (5, None)
+ ]
for multiparam, param in [
(("jack", "ed"), {}),
((["jack", "ed"],), {})
(1, 'jack'),
(2, 'ed')
]
- res = conn.execute("select * from users where user_name=%s",
+ res = conn.execute(
+ "select * from users where user_name=%s",
"jack"
)
assert res.fetchall() == [(1, 'jack')]
# versions have a bug that bombs out on this test. (1.2.2b3,
# 1.2.2c1, 1.2.2)
- @testing.skip_if(lambda : testing.against('mysql+mysqldb'),
- 'db-api flaky')
- @testing.fails_on_everything_except('postgresql+psycopg2',
- 'postgresql+pypostgresql', 'mysql+mysqlconnector',
- 'mysql+pymysql', 'mysql+cymysql')
+ @testing.skip_if(
+ lambda: testing.against('mysql+mysqldb'), 'db-api flaky')
+ @testing.fails_on_everything_except(
+ 'postgresql+psycopg2',
+ 'postgresql+pypostgresql', 'mysql+mysqlconnector',
+ 'mysql+pymysql', 'mysql+cymysql')
def test_raw_python(self):
def go(conn):
- conn.execute('insert into users (user_id, user_name) '
- 'values (%(id)s, %(name)s)', {'id': 1, 'name'
- : 'jack'})
- conn.execute('insert into users (user_id, user_name) '
- 'values (%(id)s, %(name)s)', {'id': 2, 'name'
- : 'ed'}, {'id': 3, 'name': 'horse'})
- conn.execute('insert into users (user_id, user_name) '
- 'values (%(id)s, %(name)s)', id=4, name='sally'
- )
+ conn.execute(
+ 'insert into users (user_id, user_name) '
+ 'values (%(id)s, %(name)s)',
+ {'id': 1, 'name': 'jack'})
+ conn.execute(
+ 'insert into users (user_id, user_name) '
+ 'values (%(id)s, %(name)s)',
+ {'id': 2, 'name': 'ed'}, {'id': 3, 'name': 'horse'})
+ conn.execute(
+ 'insert into users (user_id, user_name) '
+ 'values (%(id)s, %(name)s)', id=4, name='sally'
+ )
res = conn.execute('select * from users order by user_id')
- assert res.fetchall() == [(1, 'jack'), (2, 'ed'), (3,
- 'horse'), (4, 'sally')]
+ assert res.fetchall() == [
+ (1, 'jack'), (2, 'ed'), (3, 'horse'), (4, 'sally')]
conn.execute('delete from users')
go(testing.db)
conn = testing.db.connect()
def go(conn):
conn.execute('insert into users (user_id, user_name) '
'values (:id, :name)', {'id': 1, 'name': 'jack'
- })
+ })
conn.execute('insert into users (user_id, user_name) '
'values (:id, :name)', {'id': 2, 'name': 'ed'
- }, {'id': 3, 'name': 'horse'})
+ }, {'id': 3, 'name': 'horse'})
conn.execute('insert into users (user_id, user_name) '
'values (:id, :name)', id=4, name='sally')
res = conn.execute('select * from users order by user_id')
- assert res.fetchall() == [(1, 'jack'), (2, 'ed'), (3,
- 'horse'), (4, 'sally')]
+ assert res.fetchall() == [
+ (1, 'jack'), (2, 'ed'), (3, 'horse'), (4, 'sally')]
conn.execute('delete from users')
go(testing.db)
- conn= testing.db.connect()
+ conn = testing.db.connect()
try:
go(conn)
finally:
with e.connect() as c:
c.connection.cursor = Mock(
- return_value=Mock(
- execute=Mock(
- side_effect=TypeError("I'm not a DBAPI error")
- ))
- )
+ return_value=Mock(
+ execute=Mock(
+ side_effect=TypeError("I'm not a DBAPI error")
+ ))
+ )
assert_raises_message(
TypeError,
conn.execute, "select 1"
)
-
def test_exception_wrapping_non_dbapi_statement(self):
class MyType(TypeDecorator):
impl = Integer
+
def process_bind_param(self, value, dialect):
raise Exception("nope")
tsa.exc.StatementError,
r"nope \(original cause: Exception: nope\) u?'SELECT 1 ",
conn.execute,
- select([1]).\
- where(
- column('foo') == literal('bar', MyType())
- )
+ select([1]).
+ where(
+ column('foo') == literal('bar', MyType())
+ )
)
_go(testing.db)
conn = testing.db.connect()
"A value is required for bind parameter 'uname'"
r'.*SELECT users.user_name AS .m\\xe9il.') if util.py2k
else
- util.u(
- "A value is required for bind parameter 'uname'"
- '.*SELECT users.user_name AS .méil.')
- ,
+ util.u(
+ "A value is required for bind parameter 'uname'"
+ '.*SELECT users.user_name AS .méil.'),
conn.execute,
select([users.c.user_name.label(name)]).where(
- users.c.user_name == bindparam("uname")),
+ users.c.user_name == bindparam("uname")),
{'uname_incorrect': 'foo'}
)
def test_stmt_exception_pickleable_no_dbapi(self):
self._test_stmt_exception_pickleable(Exception("hello world"))
- @testing.crashes("postgresql+psycopg2",
- "Older versions don't support cursor pickling, newer ones do")
- @testing.fails_on("mysql+oursql",
- "Exception doesn't come back exactly the same from pickle")
- @testing.fails_on("mysql+mysqlconnector",
- "Exception doesn't come back exactly the same from pickle")
- @testing.fails_on("oracle+cx_oracle",
- "cx_oracle exception seems to be having "
- "some issue with pickling")
+ @testing.crashes(
+ "postgresql+psycopg2",
+ "Older versions don't support cursor pickling, newer ones do")
+ @testing.fails_on(
+ "mysql+oursql",
+ "Exception doesn't come back exactly the same from pickle")
+ @testing.fails_on(
+ "mysql+mysqlconnector",
+ "Exception doesn't come back exactly the same from pickle")
+ @testing.fails_on(
+ "oracle+cx_oracle",
+ "cx_oracle exception seems to be having "
+ "some issue with pickling")
def test_stmt_exception_pickleable_plus_dbapi(self):
raw = testing.db.raw_connection()
the_orig = None
def _test_stmt_exception_pickleable(self, orig):
for sa_exc in (
tsa.exc.StatementError("some error",
- "select * from table",
- {"foo":"bar"},
- orig),
+ "select * from table",
+ {"foo": "bar"},
+ orig),
tsa.exc.InterfaceError("select * from table",
- {"foo":"bar"},
- orig),
+ {"foo": "bar"},
+ orig),
tsa.exc.NoReferencedTableError("message", "tname"),
tsa.exc.NoReferencedColumnError("message", "tname", "cname"),
- tsa.exc.CircularDependencyError("some message", [1, 2, 3], [(1, 2), (3, 4)]),
+ tsa.exc.CircularDependencyError(
+ "some message", [1, 2, 3], [(1, 2), (3, 4)]),
):
for loads, dumps in picklers():
repickled = loads(dumps(sa_exc))
eq_(repickled.args[0], sa_exc.args[0])
if isinstance(sa_exc, tsa.exc.StatementError):
- eq_(repickled.params, {"foo":"bar"})
+ eq_(repickled.params, {"foo": "bar"})
eq_(repickled.statement, sa_exc.statement)
if hasattr(sa_exc, "connection_invalidated"):
eq_(repickled.connection_invalidated,
class MyType(TypeDecorator):
impl = Integer
+
def process_bind_param(self, value, dialect):
raise MyException("nope")
MyException,
"nope",
conn.execute,
- select([1]).\
- where(
- column('foo') == literal('bar', MyType())
- )
+ select([1]).
+ where(
+ column('foo') == literal('bar', MyType())
+ )
)
_go(testing.db)
conn = testing.db.connect()
"""test that execute() interprets [] as a list with no params"""
testing.db.execute(users_autoinc.insert().
- values(user_name=bindparam('name', None)), [])
+ values(user_name=bindparam('name', None)), [])
eq_(testing.db.execute(users_autoinc.select()).fetchall(), [(1, None)])
@testing.requires.ad_hoc_engines
def test_engine_level_options(self):
eng = engines.testing_engine(options={'execution_options':
- {'foo': 'bar'}})
+ {'foo': 'bar'}})
with eng.contextual_connect() as conn:
eq_(conn._execution_options['foo'], 'bar')
- eq_(conn.execution_options(bat='hoho')._execution_options['foo'
- ], 'bar')
- eq_(conn.execution_options(bat='hoho')._execution_options['bat'
- ], 'hoho')
- eq_(conn.execution_options(foo='hoho')._execution_options['foo'
- ], 'hoho')
+ eq_(
+ conn.execution_options(bat='hoho')._execution_options['foo'],
+ 'bar')
+ eq_(
+ conn.execution_options(bat='hoho')._execution_options['bat'],
+ 'hoho')
+ eq_(
+ conn.execution_options(foo='hoho')._execution_options['foo'],
+ 'hoho')
eng.update_execution_options(foo='hoho')
conn = eng.contextual_connect()
eq_(conn._execution_options['foo'], 'hoho')
@testing.requires.ad_hoc_engines
def test_generative_engine_execution_options(self):
eng = engines.testing_engine(options={'execution_options':
- {'base': 'x1'}})
+ {'base': 'x1'}})
eng1 = eng.execution_options(foo="b1")
eng2 = eng.execution_options(foo="b2")
eng2a = eng2.execution_options(foo="b3", bar="a2")
eq_(eng._execution_options,
- {'base': 'x1'})
+ {'base': 'x1'})
eq_(eng1._execution_options,
- {'base': 'x1', 'foo': 'b1'})
+ {'base': 'x1', 'foo': 'b1'})
eq_(eng2._execution_options,
- {'base': 'x1', 'foo': 'b2'})
+ {'base': 'x1', 'foo': 'b2'})
eq_(eng1a._execution_options,
- {'base': 'x1', 'foo': 'b1', 'bar': 'a1'})
+ {'base': 'x1', 'foo': 'b1', 'bar': 'a1'})
eq_(eng2a._execution_options,
- {'base': 'x1', 'foo': 'b3', 'bar': 'a2'})
+ {'base': 'x1', 'foo': 'b3', 'bar': 'a2'})
is_(eng1a.pool, eng.pool)
# test pool is shared
@testing.requires.ad_hoc_engines
def test_generative_engine_event_dispatch(self):
canary = []
+
def l1(*arg, **kw):
canary.append("l1")
+
def l2(*arg, **kw):
canary.append("l2")
+
def l3(*arg, **kw):
canary.append("l3")
eng = engines.testing_engine(options={'execution_options':
- {'base': 'x1'}})
+ {'base': 'x1'}})
event.listen(eng, "before_execute", l1)
eng1 = eng.execution_options(foo="b1")
def test_unicode_test_fails_warning(self):
class MockCursor(engines.DBAPIProxyCursor):
+
def execute(self, stmt, params=None, **kw):
if "test unicode returns" in stmt:
raise self.engine.dialect.dbapi.DatabaseError("boom")
eq_(eng.scalar(select([1])), 1)
eng.dispose()
+
class ConvenienceExecuteTest(fixtures.TablesTest):
__backend__ = True
@classmethod
def define_tables(cls, metadata):
cls.table = Table('exec_test', metadata,
- Column('a', Integer),
- Column('b', Integer),
- test_needs_acid=True
- )
+ Column('a', Integer),
+ Column('b', Integer),
+ test_needs_acid=True
+ )
def _trans_fn(self, is_transaction=False):
def go(conn, x, value=None):
mock_connection = Mock(
return_value=Mock(
- begin=Mock(side_effect=Exception("boom"))
- )
+ begin=Mock(side_effect=Exception("boom"))
+ )
)
engine._connection_cls = mock_connection
assert_raises(
def test_transaction_tlocal_engine_ctx_commit(self):
fn = self._trans_fn()
engine = engines.testing_engine(options=dict(
- strategy='threadlocal',
- pool=testing.db.pool))
+ strategy='threadlocal',
+ pool=testing.db.pool))
ctx = engine.begin()
testing.run_as_contextmanager(ctx, fn, 5, value=8)
self._assert_fn(5, value=8)
def test_transaction_tlocal_engine_ctx_rollback(self):
fn = self._trans_rollback_fn()
engine = engines.testing_engine(options=dict(
- strategy='threadlocal',
- pool=testing.db.pool))
+ strategy='threadlocal',
+ pool=testing.db.pool))
ctx = engine.begin()
assert_raises_message(
Exception,
)
self._assert_no_data()
+
class CompiledCacheTest(fixtures.TestBase):
__backend__ = True
global users, metadata
metadata = MetaData(testing.db)
users = Table('users', metadata,
- Column('user_id', INT, primary_key=True,
- test_needs_autoincrement=True),
- Column('user_name', VARCHAR(20)),
- )
+ Column('user_id', INT, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('user_name', VARCHAR(20)),
+ )
metadata.create_all()
@engines.close_first
cached_conn = conn.execution_options(compiled_cache=cache)
ins = users.insert()
- cached_conn.execute(ins, {'user_name':'u1'})
- cached_conn.execute(ins, {'user_name':'u2'})
- cached_conn.execute(ins, {'user_name':'u3'})
+ cached_conn.execute(ins, {'user_name': 'u1'})
+ cached_conn.execute(ins, {'user_name': 'u2'})
+ cached_conn.execute(ins, {'user_name': 'u3'})
assert len(cache) == 1
eq_(conn.execute("select count(*) from users").scalar(), 3)
class MockStrategyTest(fixtures.TestBase):
+
def _engine_fixture(self):
buf = util.StringIO()
+
def dump(sql, *multiparams, **params):
buf.write(util.text_type(sql.compile(dialect=engine.dialect)))
engine = create_engine('postgresql://', strategy='mock', executor=dump)
engine, buf = self._engine_fixture()
metadata = MetaData()
t = Table('testtable', metadata,
- Column('pk', Integer, Sequence('testtable_pk_seq'), primary_key=True)
- )
+ Column(
+ 'pk', Integer, Sequence('testtable_pk_seq'), primary_key=True)
+ )
t.create(engine)
t.drop(engine)
["SEQUENCE", "TABLE"]
)
+
class ResultProxyTest(fixtures.TestBase):
__backend__ = True
from sqlalchemy.engine import RowProxy
class MyList(object):
+
def __init__(self, l):
self.l = l
def __getitem__(self, i):
return list.__getitem__(self.l, i)
- proxy = RowProxy(object(), MyList(['value']), [None], {'key'
- : (None, None, 0), 0: (None, None, 0)})
+ proxy = RowProxy(object(), MyList(['value']), [None], {
+ 'key': (None, None, 0), 0: (None, None, 0)})
eq_(list(proxy), ['value'])
eq_(proxy[0], 'value')
eq_(proxy['key'], 'value')
engine = engines.testing_engine()
t = Table('t1', metadata,
- Column('data', String(10))
- )
+ Column('data', String(10))
+ )
metadata.create_all(engine)
- with patch.object(engine.dialect.execution_ctx_cls, "rowcount") as mock_rowcount:
+ with patch.object(
+ engine.dialect.execution_ctx_cls, "rowcount") as mock_rowcount:
mock_rowcount.__get__ = Mock()
engine.execute(t.insert(),
- {'data': 'd1'},
- {'data': 'd2'},
- {'data': 'd3'})
+ {'data': 'd1'},
+ {'data': 'd2'},
+ {'data': 'd3'})
eq_(len(mock_rowcount.__get__.mock_calls), 0)
eq_(
- engine.execute(t.select()).fetchall(),
- [('d1', ), ('d2', ), ('d3', )]
+ engine.execute(t.select()).fetchall(),
+ [('d1', ), ('d2', ), ('d3', )]
)
eq_(len(mock_rowcount.__get__.mock_calls), 0)
engine.execute(t.delete())
eq_(len(mock_rowcount.__get__.mock_calls), 2)
-
def test_rowproxy_is_sequence(self):
import collections
from sqlalchemy.engine import RowProxy
- row = RowProxy(object(), ['value'], [None], {'key'
- : (None, None, 0), 0: (None, None, 0)})
+ row = RowProxy(
+ object(), ['value'], [None],
+ {'key': (None, None, 0), 0: (None, None, 0)})
assert isinstance(row, collections.Sequence)
@testing.requires.cextensions
def test_row_c_sequence_check(self):
import csv
- import collections
metadata = MetaData()
metadata.bind = 'sqlite://'
users = Table('users', metadata,
- Column('id', Integer, primary_key=True),
- Column('name', String(40)),
- )
+ Column('id', Integer, primary_key=True),
+ Column('name', String(40)),
+ )
users.create()
users.insert().execute(name='Test')
lambda r: r.last_updated_params(),
lambda r: r.prefetch_cols(),
lambda r: r.postfetch_cols(),
- lambda r : r.inserted_primary_key
+ lambda r: r.inserted_primary_key
],
"Statement is not a compiled expression construct."
),
select([1]),
[
lambda r: r.last_inserted_params(),
- lambda r : r.inserted_primary_key
+ lambda r: r.inserted_primary_key
],
r"Statement is not an insert\(\) expression construct."
),
select([1]),
[
lambda r: r.prefetch_cols(),
- lambda r : r.postfetch_cols()
+ lambda r: r.postfetch_cols()
],
r"Statement is not an insert\(\) "
r"or update\(\) expression construct."
finally:
r.close()
+
class ExecutionOptionsTest(fixtures.TestBase):
+
def test_dialect_conn_options(self):
engine = testing_engine("sqlite://", options=dict(_initialize=False))
engine.dialect = Mock()
def test_dialect_engine_construction_options(self):
dialect = Mock()
engine = Engine(Mock(), dialect, Mock(),
- execution_options={"foo": "bar"})
+ execution_options={"foo": "bar"})
eq_(
dialect.set_engine_execution_options.mock_calls,
[call(engine, {"foo": "bar"})]
def test_propagate_engine_to_connection(self):
engine = testing_engine("sqlite://",
- options=dict(execution_options={"foo": "bar"}))
+ options=dict(execution_options={"foo": "bar"}))
conn = engine.connect()
eq_(conn._execution_options, {"foo": "bar"})
def test_propagate_option_engine_to_connection(self):
e1 = testing_engine("sqlite://",
- options=dict(execution_options={"foo": "bar"}))
+ options=dict(execution_options={"foo": "bar"}))
e2 = e1.execution_options(bat="hoho")
c1 = e1.connect()
c2 = e2.connect()
eq_(c2._execution_options, {"foo": "bar", "bat": "hoho"})
-
-
-
class AlternateResultProxyTest(fixtures.TestBase):
__requires__ = ('sqlite', )
@classmethod
def setup_class(cls):
- from sqlalchemy.engine import base, default
cls.engine = engine = testing_engine('sqlite://')
m = MetaData()
cls.table = t = Table('test', m,
- Column('x', Integer, primary_key=True),
- Column('y', String(50, convert_unicode='force'))
- )
+ Column('x', Integer, primary_key=True),
+ Column('y', String(50, convert_unicode='force'))
+ )
m.create_all(engine)
engine.execute(t.insert(), [
- {'x':i, 'y':"t_%d" % i} for i in range(1, 12)
+ {'x': i, 'y': "t_%d" % i} for i in range(1, 12)
])
def _test_proxy(self, cls):
class ExcCtx(default.DefaultExecutionContext):
+
def get_result_proxy(self):
return cls(self)
self.engine.dialect.execution_ctx_cls = ExcCtx
def test_buffered_column_result_proxy(self):
self._test_proxy(_result.BufferedColumnResultProxy)
+
class EngineEventsTest(fixtures.TestBase):
__requires__ = 'ad_hoc_engines',
__backend__ = True
teststmt, testparams, testmultiparams = \
received.pop(0)
teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ',
- teststmt).strip()
- if teststmt.startswith(stmt) and (testparams
- == params or testparams == posn):
+ teststmt).strip()
+ if teststmt.startswith(stmt) and (
+ testparams == params or testparams == posn):
break
def test_per_engine_independence(self):
e2.execute(s2)
eq_([arg[1][1] for arg in canary.mock_calls], [s1, s1, s2])
-
def test_per_engine_plus_global(self):
canary = Mock()
event.listen(Engine, "before_execute", canary.be1)
event.listen(e1, "before_execute", canary.be1)
conn = e1._connection_cls(e1, connection=e1.raw_connection(),
- _has_events=False)
+ _has_events=False)
conn.execute(select([1]))
dialect = conn.dialect
ctx = dialect.execution_ctx_cls._init_statement(
- dialect, conn, conn.connection, stmt, {})
+ dialect, conn, conn.connection, stmt, {})
ctx._execute_scalar(stmt, Integer())
eq_(canary.bce.mock_calls,
- [call(conn, ctx.cursor, stmt, ctx.parameters[0], ctx, False)])
+ [call(conn, ctx.cursor, stmt, ctx.parameters[0], ctx, False)])
eq_(canary.ace.mock_calls,
- [call(conn, ctx.cursor, stmt, ctx.parameters[0], ctx, False)])
+ [call(conn, ctx.cursor, stmt, ctx.parameters[0], ctx, False)])
def test_cursor_events_execute(self):
canary = Mock()
ctx = result.context
eq_(canary.bce.mock_calls,
- [call(conn, ctx.cursor, stmt, ctx.parameters[0], ctx, False)])
+ [call(conn, ctx.cursor, stmt, ctx.parameters[0], ctx, False)])
eq_(canary.ace.mock_calls,
- [call(conn, ctx.cursor, stmt, ctx.parameters[0], ctx, False)])
-
+ [call(conn, ctx.cursor, stmt, ctx.parameters[0], ctx, False)])
def test_argument_format_execute(self):
def before_execute(conn, clauseelement, multiparams, params):
assert isinstance(multiparams, (list, tuple))
assert isinstance(params, dict)
+
def after_execute(conn, clauseelement, multiparams, params, result):
assert isinstance(multiparams, (list, tuple))
assert isinstance(params, dict)
e1.execute(select([1]).compile(dialect=e1.dialect))
e1._execute_compiled(select([1]).compile(dialect=e1.dialect), (), {})
-
-
-
@testing.fails_on('firebird', 'Data type unknown')
def test_execute_events(self):
cursor_stmts = []
def execute(conn, clauseelement, multiparams,
- params ):
+ params):
stmts.append((str(clauseelement), params, multiparams))
def cursor_execute(conn, cursor, statement, parameters,
- context, executemany):
+ context, executemany):
cursor_stmts.append((str(statement), parameters, None))
-
for engine in [
- engines.testing_engine(options=dict(implicit_returning=False)),
- engines.testing_engine(options=dict(implicit_returning=False,
- strategy='threadlocal')),
- engines.testing_engine(options=dict(implicit_returning=False)).\
- connect()
- ]:
+ engines.testing_engine(options=dict(implicit_returning=False)),
+ engines.testing_engine(options=dict(implicit_returning=False,
+ strategy='threadlocal')),
+ engines.testing_engine(options=dict(implicit_returning=False)).
+ connect()
+ ]:
event.listen(engine, 'before_execute', execute)
event.listen(engine, 'before_cursor_execute', cursor_execute)
m = MetaData(engine)
t1 = Table('t1', m,
- Column('c1', Integer, primary_key=True),
- Column('c2', String(50), default=func.lower('Foo'),
- primary_key=True)
- )
+ Column('c1', Integer, primary_key=True),
+ Column('c2', String(50), default=func.lower('Foo'),
+ primary_key=True)
+ )
m.create_all()
try:
t1.insert().execute(c1=5, c2='some data')
t1.insert().execute(c1=6)
- eq_(engine.execute('select * from t1').fetchall(), [(5,
- 'some data'), (6, 'foo')])
+ eq_(
+ engine.execute('select * from t1').fetchall(),
+ [(5, 'some data'), (6, 'foo')])
finally:
m.drop_all()
compiled = [('CREATE TABLE t1', {}, None),
('INSERT INTO t1 (c1, c2)',
- {'c2': 'some data', 'c1': 5}, None),
+ {'c2': 'some data', 'c1': 5}, None),
('INSERT INTO t1 (c1, c2)',
- {'c1': 6}, None),
+ {'c1': 6}, None),
('select * from t1', {}, None),
('DROP TABLE t1', {}, None)]
(6, 'foo')),
('select * from t1', {}, ()),
('DROP TABLE t1', {}, ()),
- ]
+ ]
else:
insert2_params = 6, 'Foo'
if testing.against('oracle+zxjdbc'):
insert2_params += (ReturningParam(12), )
cursor = [('CREATE TABLE t1', {}, ()),
('INSERT INTO t1 (c1, c2)',
- {'c2': 'some data', 'c1': 5}, (5, 'some data')),
- ('INSERT INTO t1 (c1, c2)', {'c1': 6,
- 'lower_2': 'Foo'}, insert2_params),
+ {'c2': 'some data', 'c1': 5}, (5, 'some data')),
+ ('INSERT INTO t1 (c1, c2)',
+ {'c1': 6, 'lower_2': 'Foo'}, insert2_params),
('select * from t1', {}, ()),
('DROP TABLE t1', {}, ())]
- # bind param name 'lower_2' might
- # be incorrect
+ # bind param name 'lower_2' might
+ # be incorrect
self._assert_stmts(compiled, stmts)
self._assert_stmts(cursor, cursor_stmts)
event.listen(engine, 'before_cursor_execute', cursor_execute)
conn = engine.connect()
c2 = conn.execution_options(foo='bar')
- eq_(c2._execution_options, {'foo':'bar'})
+ eq_(c2._execution_options, {'foo': 'bar'})
c2.execute(select([1]))
c3 = c2.execution_options(bar='bat')
- eq_(c3._execution_options, {'foo':'bar', 'bar':'bat'})
+ eq_(c3._execution_options, {'foo': 'bar', 'bar': 'bat'})
eq_(canary, ['execute', 'cursor_execute'])
def test_retval_flag(self):
canary = []
+
def tracker(name):
def go(conn, *args, **kw):
canary.append(name)
return clauseelement, multiparams, params
def cursor_execute(conn, cursor, statement,
- parameters, context, executemany):
+ parameters, context, executemany):
canary.append('cursor_execute')
return statement, parameters
)
event.listen(engine, "before_execute", execute, retval=True)
- event.listen(engine, "before_cursor_execute", cursor_execute, retval=True)
+ event.listen(
+ engine, "before_cursor_execute", cursor_execute, retval=True)
engine.execute(select([1]))
eq_(
canary, ['execute', 'cursor_execute']
[call(c2, {"c1": "opt_c1"}), call(c4, {"c3": "opt_c3"})]
)
-
@testing.requires.sequences
@testing.provide_metadata
def test_cursor_execute(self):
canary = []
+
def tracker(name):
def go(conn, cursor, statement, parameters, context, executemany):
canary.append((statement, context))
return go
engine = engines.testing_engine()
-
t = Table('t', self.metadata,
- Column('x', Integer, Sequence('t_id_seq'), primary_key=True),
- implicit_returning=False
- )
+ Column('x', Integer, Sequence('t_id_seq'), primary_key=True),
+ implicit_returning=False
+ )
self.metadata.create_all(engine)
with engine.begin() as conn:
- event.listen(conn, 'before_cursor_execute', tracker('cursor_execute'))
+ event.listen(
+ conn, 'before_cursor_execute', tracker('cursor_execute'))
conn.execute(t.insert())
# we see the sequence pre-executed in the first call
assert "t_id_seq" in canary[0][0]
def test_transactional(self):
canary = []
+
def tracker(name):
def go(conn, *args, **kw):
canary.append(name)
engine = engines.testing_engine()
event.listen(engine, 'before_execute', tracker('execute'))
- event.listen(engine, 'before_cursor_execute', tracker('cursor_execute'))
+ event.listen(
+ engine, 'before_cursor_execute', tracker('cursor_execute'))
event.listen(engine, 'begin', tracker('begin'))
event.listen(engine, 'commit', tracker('commit'))
event.listen(engine, 'rollback', tracker('rollback'))
conn.execute(select([1]))
trans.commit()
- eq_(canary, [
- 'begin', 'execute', 'cursor_execute', 'rollback',
- 'begin', 'execute', 'cursor_execute', 'commit',
+ eq_(
+ canary, [
+ 'begin', 'execute', 'cursor_execute', 'rollback',
+ 'begin', 'execute', 'cursor_execute', 'commit',
])
@testing.requires.savepoints
@testing.requires.two_phase_transactions
def test_transactional_advanced(self):
canary1 = []
+
def tracker1(name):
def go(*args, **kw):
canary1.append(name)
return go
canary2 = []
+
def tracker2(name):
def go(*args, **kw):
canary2.append(name)
engine = engines.testing_engine()
for name in ['begin', 'savepoint',
- 'rollback_savepoint', 'release_savepoint',
- 'rollback', 'begin_twophase',
- 'prepare_twophase', 'commit_twophase']:
+ 'rollback_savepoint', 'release_savepoint',
+ 'rollback', 'begin_twophase',
+ 'prepare_twophase', 'commit_twophase']:
event.listen(engine, '%s' % name, tracker1(name))
conn = engine.connect()
for name in ['begin', 'savepoint',
- 'rollback_savepoint', 'release_savepoint',
- 'rollback', 'begin_twophase',
- 'prepare_twophase', 'commit_twophase']:
+ 'rollback_savepoint', 'release_savepoint',
+ 'rollback', 'begin_twophase',
+ 'prepare_twophase', 'commit_twophase']:
event.listen(conn, '%s' % name, tracker2(name))
trans = conn.begin()
trans.commit()
eq_(canary1, ['begin', 'savepoint',
- 'rollback_savepoint', 'savepoint', 'release_savepoint',
- 'rollback', 'begin_twophase',
- 'prepare_twophase', 'commit_twophase']
- )
+ 'rollback_savepoint', 'savepoint', 'release_savepoint',
+ 'rollback', 'begin_twophase',
+ 'prepare_twophase', 'commit_twophase']
+ )
eq_(canary2, ['begin', 'savepoint',
- 'rollback_savepoint', 'savepoint', 'release_savepoint',
- 'rollback', 'begin_twophase',
- 'prepare_twophase', 'commit_twophase']
- )
+ 'rollback_savepoint', 'savepoint', 'release_savepoint',
+ 'rollback', 'begin_twophase',
+ 'prepare_twophase', 'commit_twophase']
+ )
+
class HandleErrorTest(fixtures.TestBase):
__requires__ = 'ad_hoc_engines',
event.listen(engine, 'dbapi_error', listener)
nope = Exception("nope")
+
class MyType(TypeDecorator):
impl = Integer
+
def process_bind_param(self, value, dialect):
raise nope
tsa.exc.StatementError,
r"nope \(original cause: Exception: nope\) u?'SELECT 1 ",
conn.execute,
- select([1]).where(
- column('foo') == literal('bar', MyType()))
+ select([1]).where(
+ column('foo') == literal('bar', MyType()))
)
# no legacy event
eq_(listener.mock_calls, [])
nope = TypeError("I'm not a DBAPI error")
with engine.connect() as c:
c.connection.cursor = Mock(
- return_value=Mock(
- execute=Mock(
- side_effect=nope
- ))
- )
+ return_value=Mock(
+ execute=Mock(
+ side_effect=nope
+ ))
+ )
assert_raises_message(
TypeError,
# no legacy event
eq_(listener.mock_calls, [])
-
def test_handle_error(self):
engine = engines.testing_engine()
canary = Mock(return_value=None)
conn = engine.connect()
with patch.object(engine.
- dialect.execution_ctx_cls,
- "handle_dbapi_exception") as patched:
+ dialect.execution_ctx_cls,
+ "handle_dbapi_exception") as patched:
assert_raises_message(
MyException2,
"my exception chained",
eq_(patched.call_count, 1)
with patch.object(engine.
- dialect.execution_ctx_cls,
- "handle_dbapi_exception") as patched:
+ dialect.execution_ctx_cls,
+ "handle_dbapi_exception") as patched:
assert_raises(
MyException1,
conn.execute, "SELECT 'ERROR TWO' FROM I_DONT_EXIST"
eq_(patched.call_count, 1)
with patch.object(engine.
- dialect.execution_ctx_cls,
- "handle_dbapi_exception") as patched:
+ dialect.execution_ctx_cls,
+ "handle_dbapi_exception") as patched:
# test that non None from err1 isn't cancelled out
# by err2
assert_raises(
eq_(patched.call_count, 1)
with patch.object(engine.
- dialect.execution_ctx_cls,
- "handle_dbapi_exception") as patched:
+ dialect.execution_ctx_cls,
+ "handle_dbapi_exception") as patched:
assert_raises(
tsa.exc.DBAPIError,
conn.execute, "SELECT 'ERROR FIVE' FROM I_DONT_EXIST"
eq_(patched.call_count, 1)
with patch.object(engine.
- dialect.execution_ctx_cls,
- "handle_dbapi_exception") as patched:
+ dialect.execution_ctx_cls,
+ "handle_dbapi_exception") as patched:
assert_raises_message(
MyException3,
"my exception short circuit",
event.listen(engine, 'handle_error', listener)
nope = Exception("nope")
+
class MyType(TypeDecorator):
impl = Integer
+
def process_bind_param(self, value, dialect):
raise nope
tsa.exc.StatementError,
r"nope \(original cause: Exception: nope\) u?'SELECT 1 ",
conn.execute,
- select([1]).where(
- column('foo') == literal('bar', MyType()))
+ select([1]).where(
+ column('foo') == literal('bar', MyType()))
)
ctx = listener.mock_calls[0][1][0]
nope = TypeError("I'm not a DBAPI error")
with engine.connect() as c:
c.connection.cursor = Mock(
- return_value=Mock(
- execute=Mock(
- side_effect=nope
- ))
- )
+ return_value=Mock(
+ execute=Mock(
+ side_effect=nope
+ ))
+ )
assert_raises_message(
TypeError,
ctx.is_disconnect = evt_value
with patch.object(engine.dialect, "is_disconnect",
- Mock(return_value=orig_error)):
+ Mock(return_value=orig_error)):
with engine.connect() as c:
try:
self._test_alter_disconnect(True, False)
self._test_alter_disconnect(False, False)
+
class ProxyConnectionTest(fixtures.TestBase):
+
"""These are the same tests as EngineEventsTest, except using
the deprecated ConnectionProxy interface.
cursor_stmts = []
class MyProxy(ConnectionProxy):
+
def execute(
self,
conn,
clauseelement,
*multiparams,
**params
- ):
+ ):
stmts.append((str(clauseelement), params, multiparams))
return execute(clauseelement, *multiparams, **params)
parameters,
context,
executemany,
- ):
+ ):
cursor_stmts.append((str(statement), parameters, None))
return execute(cursor, statement, parameters, context)
while received:
teststmt, testparams, testmultiparams = \
received.pop(0)
- teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ',
- teststmt).strip()
- if teststmt.startswith(stmt) and (testparams
- == params or testparams == posn):
+ teststmt = re.compile(
+ r'[\n\t ]+', re.M).sub(' ', teststmt).strip()
+ if teststmt.startswith(stmt) and (
+ testparams == params or testparams == posn):
break
for engine in \
engines.testing_engine(options=dict(implicit_returning=False,
- proxy=MyProxy())), \
+ proxy=MyProxy())), \
engines.testing_engine(options=dict(implicit_returning=False,
- proxy=MyProxy(),
- strategy='threadlocal')):
+ proxy=MyProxy(),
+ strategy='threadlocal')):
m = MetaData(engine)
t1 = Table('t1', m,
- Column('c1', Integer, primary_key=True),
- Column('c2', String(50), default=func.lower('Foo'),
- primary_key=True)
- )
+ Column('c1', Integer, primary_key=True),
+ Column('c2', String(50), default=func.lower('Foo'),
+ primary_key=True)
+ )
m.create_all()
try:
t1.insert().execute(c1=5, c2='some data')
t1.insert().execute(c1=6)
- eq_(engine.execute('select * from t1').fetchall(), [(5,
- 'some data'), (6, 'foo')])
+ eq_(
+ engine.execute('select * from t1').fetchall(),
+ [(5, 'some data'), (6, 'foo')])
finally:
m.drop_all()
engine.dispose()
compiled = [('CREATE TABLE t1', {}, None),
('INSERT INTO t1 (c1, c2)', {'c2': 'some data',
- 'c1': 5}, None), ('INSERT INTO t1 (c1, c2)',
- {'c1': 6}, None), ('select * from t1', {},
- None), ('DROP TABLE t1', {}, None)]
+ 'c1': 5}, None),
+ ('INSERT INTO t1 (c1, c2)', {'c1': 6}, None),
+ ('select * from t1', {}, None),
+ ('DROP TABLE t1', {}, None)]
if not testing.against('oracle+zxjdbc'): # or engine.dialect.pr
# eexecute_pk_sequence
# s:
cursor = [
('CREATE TABLE t1', {}, ()),
- ('INSERT INTO t1 (c1, c2)', {'c2': 'some data', 'c1'
- : 5}, (5, 'some data')),
+ ('INSERT INTO t1 (c1, c2)', {
+ 'c2': 'some data', 'c1': 5}, (5, 'some data')),
('SELECT lower', {'lower_2': 'Foo'},
('Foo', )),
('INSERT INTO t1 (c1, c2)', {'c2': 'foo', 'c1': 6},
(6, 'foo')),
('select * from t1', {}, ()),
('DROP TABLE t1', {}, ()),
- ]
+ ]
else:
insert2_params = 6, 'Foo'
if testing.against('oracle+zxjdbc'):
insert2_params += (ReturningParam(12), )
cursor = [('CREATE TABLE t1', {}, ()),
- ('INSERT INTO t1 (c1, c2)', {'c2': 'some data'
- , 'c1': 5}, (5, 'some data')),
- ('INSERT INTO t1 (c1, c2)', {'c1': 6,
- 'lower_2': 'Foo'}, insert2_params),
- ('select * from t1', {}, ()), ('DROP TABLE t1'
- , {}, ())] # bind param name 'lower_2' might
- # be incorrect
+ ('INSERT INTO t1 (c1, c2)', {
+ 'c2': 'some data', 'c1': 5}, (5, 'some data')),
+ ('INSERT INTO t1 (c1, c2)',
+ {'c1': 6, 'lower_2': 'Foo'}, insert2_params),
+ ('select * from t1', {}, ()),
+ ('DROP TABLE t1', {}, ())]
assert_stmts(compiled, stmts)
assert_stmts(cursor, cursor_stmts)
@testing.uses_deprecated(r'.*Use event.listen')
def test_options(self):
canary = []
+
class TrackProxy(ConnectionProxy):
+
def __getattribute__(self, key):
fn = object.__getattribute__(self, key)
+
def go(*arg, **kw):
canary.append(fn.__name__)
return fn(*arg, **kw)
return go
- engine = engines.testing_engine(options={'proxy':TrackProxy()})
+ engine = engines.testing_engine(options={'proxy': TrackProxy()})
conn = engine.connect()
c2 = conn.execution_options(foo='bar')
- eq_(c2._execution_options, {'foo':'bar'})
+ eq_(c2._execution_options, {'foo': 'bar'})
c2.execute(select([1]))
c3 = c2.execution_options(bar='bat')
- eq_(c3._execution_options, {'foo':'bar', 'bar':'bat'})
+ eq_(c3._execution_options, {'foo': 'bar', 'bar': 'bat'})
eq_(canary, ['execute', 'cursor_execute'])
-
@testing.uses_deprecated(r'.*Use event.listen')
def test_transactional(self):
canary = []
+
class TrackProxy(ConnectionProxy):
+
def __getattribute__(self, key):
fn = object.__getattribute__(self, key)
+
def go(*arg, **kw):
canary.append(fn.__name__)
return fn(*arg, **kw)
return go
- engine = engines.testing_engine(options={'proxy':TrackProxy()})
+ engine = engines.testing_engine(options={'proxy': TrackProxy()})
conn = engine.connect()
trans = conn.begin()
conn.execute(select([1]))
conn.execute(select([1]))
trans.commit()
- eq_(canary, [
- 'begin', 'execute', 'cursor_execute', 'rollback',
- 'begin', 'execute', 'cursor_execute', 'commit',
+ eq_(
+ canary, [
+ 'begin', 'execute', 'cursor_execute', 'rollback',
+ 'begin', 'execute', 'cursor_execute', 'commit',
])
@testing.uses_deprecated(r'.*Use event.listen')
@testing.requires.two_phase_transactions
def test_transactional_advanced(self):
canary = []
+
class TrackProxy(ConnectionProxy):
+
def __getattribute__(self, key):
fn = object.__getattribute__(self, key)
+
def go(*arg, **kw):
canary.append(fn.__name__)
return fn(*arg, **kw)
return go
- engine = engines.testing_engine(options={'proxy':TrackProxy()})
+ engine = engines.testing_engine(options={'proxy': TrackProxy()})
conn = engine.connect()
trans = conn.begin()
canary = [t for t in canary if t not in ('cursor_execute', 'execute')]
eq_(canary, ['begin', 'savepoint',
- 'rollback_savepoint', 'savepoint', 'release_savepoint',
- 'rollback', 'begin_twophase',
- 'prepare_twophase', 'commit_twophase']
- )
+ 'rollback_savepoint', 'savepoint', 'release_savepoint',
+ 'rollback', 'begin_twophase',
+ 'prepare_twophase', 'commit_twophase']
+ )
+
class DialectEventTest(fixtures.TestBase):
+
@contextmanager
def _run_test(self, retval):
m1 = Mock()
arg[-1].get_result_proxy = Mock(return_value=Mock(context=arg[-1]))
return retval
- m1.real_do_execute.side_effect = m1.do_execute.side_effect = mock_the_cursor
- m1.real_do_executemany.side_effect = m1.do_executemany.side_effect = mock_the_cursor
- m1.real_do_execute_no_params.side_effect = m1.do_execute_no_params.side_effect = mock_the_cursor
+ m1.real_do_execute.side_effect = \
+ m1.do_execute.side_effect = mock_the_cursor
+ m1.real_do_executemany.side_effect = \
+ m1.do_executemany.side_effect = mock_the_cursor
+ m1.real_do_execute_no_params.side_effect = \
+ m1.do_execute_no_params.side_effect = mock_the_cursor
with e.connect() as conn:
yield conn, m1
retval,
m1.do_execute, m1.real_do_execute,
[call(
- result.context.cursor,
- "insert into table foo",
- {"foo": "bar"}, result.context)]
+ result.context.cursor,
+ "insert into table foo",
+ {"foo": "bar"}, result.context)]
)
def _test_do_executemany(self, retval):
with self._run_test(retval) as (conn, m1):
result = conn.execute("insert into table foo",
- [{"foo": "bar"}, {"foo": "bar"}])
+ [{"foo": "bar"}, {"foo": "bar"}])
self._assert(
retval,
m1.do_executemany, m1.real_do_executemany,
[call(
- result.context.cursor,
- "insert into table foo",
- [{"foo": "bar"}, {"foo": "bar"}], result.context)]
+ result.context.cursor,
+ "insert into table foo",
+ [{"foo": "bar"}, {"foo": "bar"}], result.context)]
)
def _test_do_execute_no_params(self, retval):
retval,
m1.do_execute_no_params, m1.real_do_execute_no_params,
[call(
- result.context.cursor,
- "insert into table foo", result.context)]
+ result.context.cursor,
+ "insert into table foo", result.context)]
)
def _test_cursor_execute(self, retval):
stmt = "insert into table foo"
params = {"foo": "bar"}
ctx = dialect.execution_ctx_cls._init_statement(
- dialect, conn, conn.connection, stmt, [params])
+ dialect, conn, conn.connection, stmt, [params])
conn._cursor_execute(ctx.cursor, stmt, params, ctx)
retval,
m1.do_execute, m1.real_do_execute,
[call(
- ctx.cursor,
- "insert into table foo",
- {"foo": "bar"}, ctx)]
+ ctx.cursor,
+ "insert into table foo",
+ {"foo": "bar"}, ctx)]
)
def test_do_execute_w_replace(self):
def test_cursor_execute_wo_replace(self):
self._test_cursor_execute(False)
-