assert_raises(AssertionError, t.delete().execute)
finally:
engine.dialect.execution_ctx_cls = execution_ctx_cls
-
+
+ @testing.requires.python26
+ def test_rowproxy_is_sequence(self):
+ import collections
+ from sqlalchemy.engine import RowProxy
+
+ row = RowProxy(object(), ['value'], [None], {'key'
+ : (None, 0), 0: (None, 0)})
+ assert isinstance(row, collections.Sequence)
+
+ @testing.requires.cextensions
+ def test_row_c_sequence_check(self):
+ import csv
+ import collections
+ from StringIO import StringIO
+
+ metadata = MetaData()
+ metadata.bind = 'sqlite://'
+ users = Table('users', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('name', String(40)),
+ )
+ users.create()
+
+ users.insert().execute(name='Test')
+ row = users.select().execute().fetchone()
+
+ s = StringIO()
+ writer = csv.writer(s)
+ # csv performs PySequenceCheck call
+ writer.writerow(row)
+ assert s.getvalue().strip() == '1,Test'
++
+class EngineEventsTest(TestBase):
+
+ def _assert_stmts(self, expected, received):
+ for stmt, params, posn in expected:
+ if not received:
+ assert False
+ while received:
+ teststmt, testparams, testmultiparams = \
+ received.pop(0)
+ teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ',
+ teststmt).strip()
+ if teststmt.startswith(stmt) and (testparams
+ == params or testparams == posn):
+ break
+
+ @testing.fails_on('firebird', 'Data type unknown')
+ def test_execute_events(self):
+
+ stmts = []
+ cursor_stmts = []
+
+ def execute(conn, clauseelement, multiparams,
+ params ):
+ stmts.append((str(clauseelement), params, multiparams))
+
+ def cursor_execute(conn, cursor, statement, parameters,
+ context, executemany):
+ cursor_stmts.append((str(statement), parameters, None))
+
+
+ for engine in [
+ engines.testing_engine(options=dict(implicit_returning=False)),
+ engines.testing_engine(options=dict(implicit_returning=False,
+ strategy='threadlocal'))
+ ]:
+ event.listen(execute, 'on_before_execute', engine)
+ event.listen(cursor_execute, 'on_before_cursor_execute', engine)
+
+ m = MetaData(engine)
+ t1 = Table('t1', m,
+ Column('c1', Integer, primary_key=True),
+ Column('c2', String(50), default=func.lower('Foo'),
+ primary_key=True)
+ )
+ m.create_all()
+ try:
+ t1.insert().execute(c1=5, c2='some data')
+ t1.insert().execute(c1=6)
+ eq_(engine.execute('select * from t1').fetchall(), [(5,
+ 'some data'), (6, 'foo')])
+ finally:
+ m.drop_all()
+ engine.dispose()
+ compiled = [('CREATE TABLE t1', {}, None),
+ ('INSERT INTO t1 (c1, c2)', {'c2': 'some data',
+ 'c1': 5}, None), ('INSERT INTO t1 (c1, c2)',
+ {'c1': 6}, None), ('select * from t1', {},
+ None), ('DROP TABLE t1', {}, None)]
+ if not testing.against('oracle+zxjdbc'): # or engine.dialect.pr
+ # eexecute_pk_sequence
+ # s:
+ cursor = [
+ ('CREATE TABLE t1', {}, ()),
+ ('INSERT INTO t1 (c1, c2)', {'c2': 'some data', 'c1'
+ : 5}, (5, 'some data')),
+ ('SELECT lower', {'lower_2': 'Foo'}, ('Foo', )),
+ ('INSERT INTO t1 (c1, c2)', {'c2': 'foo', 'c1': 6},
+ (6, 'foo')),
+ ('select * from t1', {}, ()),
+ ('DROP TABLE t1', {}, ()),
+ ]
+ else:
+ insert2_params = 6, 'Foo'
+ if testing.against('oracle+zxjdbc'):
+ insert2_params += (ReturningParam(12), )
+ cursor = [('CREATE TABLE t1', {}, ()),
+ ('INSERT INTO t1 (c1, c2)', {'c2': 'some data'
+ , 'c1': 5}, (5, 'some data')),
+ ('INSERT INTO t1 (c1, c2)', {'c1': 6,
+ 'lower_2': 'Foo'}, insert2_params),
+ ('select * from t1', {}, ()), ('DROP TABLE t1'
+ , {}, ())] # bind param name 'lower_2' might
+ # be incorrect
+ self._assert_stmts(compiled, stmts)
+ self._assert_stmts(cursor, cursor_stmts)
+
+ def test_options(self):
+ canary = []
+ def on_execute(conn, *args, **kw):
+ canary.append('execute')
+
+ def on_cursor_execute(conn, *args, **kw):
+ canary.append('cursor_execute')
+
+ engine = engines.testing_engine()
+ event.listen(on_execute, 'on_before_execute', engine)
+ event.listen(on_cursor_execute, 'on_before_cursor_execute', engine)
+ conn = engine.connect()
+ c2 = conn.execution_options(foo='bar')
+ eq_(c2._execution_options, {'foo':'bar'})
+ c2.execute(select([1]))
+ c3 = c2.execution_options(bar='bat')
+ eq_(c3._execution_options, {'foo':'bar', 'bar':'bat'})
+ eq_(canary, ['execute', 'cursor_execute'])
+
+ def test_retval_flag(self):
+ canary = []
+ def tracker(name):
+ def go(conn, *args, **kw):
+ canary.append(name)
+ return go
+
+ def on_execute(conn, clauseelement, multiparams, params):
+ canary.append('execute')
+ return clauseelement, multiparams, params
+
+ def on_cursor_execute(conn, cursor, statement,
+ parameters, context, executemany):
+ canary.append('cursor_execute')
+ return statement, parameters
+
+ engine = engines.testing_engine()
-class ProxyConnectionTest(TestBase):
+ assert_raises(
+ tsa.exc.ArgumentError,
+ event.listen, tracker("on_begin"), "on_begin", engine, retval=True
+ )
+
+ event.listen(on_execute, "on_before_execute", engine, retval=True)
+ event.listen(on_cursor_execute, "on_before_cursor_execute", engine, retval=True)
+ engine.execute("select 1")
+ eq_(
+ canary, ['execute', 'cursor_execute']
+ )
+
+
+
+ def test_transactional(self):
+ canary = []
+ def tracker(name):
+ def go(conn, *args, **kw):
+ canary.append(name)
+ return go
+
+ engine = engines.testing_engine()
+ event.listen(tracker('execute'), 'on_before_execute', engine)
+ event.listen(tracker('cursor_execute'), 'on_before_cursor_execute', engine)
+ event.listen(tracker('begin'), 'on_begin', engine)
+ event.listen(tracker('commit'), 'on_commit', engine)
+ event.listen(tracker('rollback'), 'on_rollback', engine)
+
+ conn = engine.connect()
+ trans = conn.begin()
+ conn.execute(select([1]))
+ trans.rollback()
+ trans = conn.begin()
+ conn.execute(select([1]))
+ trans.commit()
+
+ eq_(canary, [
+ 'begin', 'execute', 'cursor_execute', 'rollback',
+ 'begin', 'execute', 'cursor_execute', 'commit',
+ ])
+
+ @testing.requires.savepoints
+ @testing.requires.two_phase_transactions
+ def test_transactional_advanced(self):
+ canary = []
+ def tracker(name):
+ def go(conn, exec_, *args, **kw):
+ canary.append(name)
+ return exec_(*args, **kw)
+ return go
+
+ engine = engines.testing_engine()
+ for name in ['begin', 'savepoint',
+ 'rollback_savepoint', 'release_savepoint',
+ 'rollback', 'begin_twophase',
+ 'prepare_twophase', 'commit_twophase']:
+ event.listen(tracker(name), 'on_%s' % name, engine)
+
+ conn = engine.connect()
+
+ trans = conn.begin()
+ trans2 = conn.begin_nested()
+ conn.execute(select([1]))
+ trans2.rollback()
+ trans2 = conn.begin_nested()
+ conn.execute(select([1]))
+ trans2.commit()
+ trans.rollback()
+
+ trans = conn.begin_twophase()
+ conn.execute(select([1]))
+ trans.prepare()
+ trans.commit()
+ eq_(canary, ['begin', 'savepoint',
+ 'rollback_savepoint', 'savepoint', 'release_savepoint',
+ 'rollback', 'begin_twophase',
+ 'prepare_twophase', 'commit_twophase']
+ )
+
+
+class ProxyConnectionTest(TestBase):
+ """These are the same tests as EngineEventsTest, except using
+ the deprecated ConnectionProxy interface.
+
+ """
+
+ @testing.uses_deprecated(r'.*Use event.listen')
@testing.fails_on('firebird', 'Data type unknown')
def test_proxy(self):