])
class ProxyConnectionTest(TestBase):
+
@testing.fails_on('firebird', 'Data type unknown')
def test_proxy(self):
engine = engines.testing_engine(options={'proxy':TrackProxy()})
conn = engine.connect()
trans = conn.begin()
- conn.execute("select 1")
+ conn.execute(select([1]))
trans.rollback()
trans = conn.begin()
- conn.execute("select 1")
+ conn.execute(select([1]))
trans.commit()
eq_(track, ['begin', 'execute', 'cursor_execute',
trans = conn.begin()
trans2 = conn.begin_nested()
- conn.execute("select 1")
+ conn.execute(select([1]))
trans2.rollback()
trans2 = conn.begin_nested()
- conn.execute("select 1")
+ conn.execute(select([1]))
trans2.commit()
trans.rollback()
trans = conn.begin_twophase()
- conn.execute("select 1")
+ conn.execute(select([1]))
trans.prepare()
trans.commit()
def test_cursor_iterable(self):
conn = testing.db.raw_connection()
cursor = conn.cursor()
- cursor.execute("select 1")
+ cursor.execute(select([1]).compile(testing.db))
expected = [(1,)]
for row in cursor:
eq_(row, expected.pop(0))
def test_result_closing(self):
"""tests that contextual_connect is threadlocal"""
- r1 = tlengine.execute("select 1")
- r2 = tlengine.execute("select 1")
+ r1 = tlengine.execute(select([1]))
+ r2 = tlengine.execute(select([1]))
row1 = r1.fetchone()
row2 = r2.fetchone()
r1.close()
def test_dispose(self):
eng = create_engine(testing.db.url, strategy='threadlocal')
- result = eng.execute("select 1")
+ result = eng.execute(select([1]))
eng.dispose()
- eng.execute("select 1")
+ eng.execute(select([1]))
@testing.requires.two_phase_transactions