for "sqlalchemy.engine" is enabled overall. Note that the
default setting of "echo" is `None`. [ticket:1554]
+ - ConnectionProxy now has wrapper methods for all transaction
+ lifecycle events, including begin(), rollback(), commit()
+ begin_nested(), begin_prepared(), prepare(), release_savepoint(),
+ etc.
+
- Connection pool logging now uses both INFO and DEBUG
log levels for logging. INFO is for major events such
as invalidated connections, DEBUG for all the acquire/return
def _cursor_executemany(self, cursor, statement, parameters, context=None):
return proxy.cursor_execute(super(ProxyConnection, self)._cursor_executemany, cursor, statement, parameters, context, True)
+ def _begin_impl(self):
+ return proxy.begin(self, super(ProxyConnection, self)._begin_impl)
+
+ def _rollback_impl(self):
+ return proxy.rollback(self, super(ProxyConnection, self)._rollback_impl)
+
+ def _commit_impl(self):
+ return proxy.commit(self, super(ProxyConnection, self)._commit_impl)
+
+ def _savepoint_impl(self, name=None):
+ return proxy.savepoint(self, super(ProxyConnection, self)._savepoint_impl, name=name)
+
+ def _rollback_to_savepoint_impl(self, name, context):
+ return proxy.rollback_savepoint(self, super(ProxyConnection, self)._rollback_to_savepoint_impl, name, context)
+
+ def _release_savepoint_impl(self, name, context):
+ return proxy.release_savepoint(self, super(ProxyConnection, self)._release_savepoint_impl, name, context)
+
+ def _begin_twophase_impl(self, xid):
+ return proxy.begin_twophase(self, super(ProxyConnection, self)._begin_twophase_impl, xid)
+
+ def _prepare_twophase_impl(self, xid):
+ return proxy.prepare_twophase(self, super(ProxyConnection, self)._prepare_twophase_impl, xid)
+
+ def _rollback_twophase_impl(self, xid, is_prepared):
+ return proxy.rollback_twophase(self, super(ProxyConnection, self)._rollback_twophase_impl, xid, is_prepared)
+
+ def _commit_twophase_impl(self, xid, is_prepared):
+ return proxy.commit_twophase(self, super(ProxyConnection, self)._commit_twophase_impl, xid, is_prepared)
+
return ProxyConnection
"""Intercept low-level cursor execute() events."""
return execute(cursor, statement, parameters, context)
-
+
+ def begin(self, conn, begin):
+ """Intercept begin() events."""
+
+ return begin()
+
+ def rollback(self, conn, rollback):
+ """Intercept rollback() events."""
+
+ return rollback()
+
+ def commit(self, conn, commit):
+ """Intercept commit() events."""
+
+ return commit()
+
+ def savepoint(self, conn, savepoint, name=None):
+ """Intercept savepoint() events."""
+
+ return savepoint(name=name)
+
+ def rollback_savepoint(self, conn, rollback_savepoint, name, context):
+ """Intercept rollback_savepoint() events."""
+
+ return rollback_savepoint(name, context)
+
+ def release_savepoint(self, conn, release_savepoint, name, context):
+ """Intercept release_savepoint() events."""
+
+ return release_savepoint(name, context)
+
+ def begin_twophase(self, conn, begin_twophase, xid):
+ """Intercept begin_twophase() events."""
+
+ return begin_twophase(xid)
+
+ def prepare_twophase(self, conn, prepare_twophase, xid):
+ """Intercept prepare_twophase() events."""
+
+ return prepare_twophase(xid)
+
+ def rollback_twophase(self, conn, rollback_twophase, xid, is_prepared):
+ """Intercept rollback_twophase() events."""
+
+ return rollback_twophase(xid, is_prepared)
+
+ def commit_twophase(self, conn, commit_twophase, xid, is_prepared):
+ """Intercept commit_twophase() events."""
+
+ return commit_twophase(xid, is_prepared)
return execute(clauseelement, *multiparams, **params)
def cursor_execute(self, execute, cursor, statement, parameters, context, executemany):
- print "CE", statement, parameters
cursor_stmts.append(
(str(statement), parameters, None)
)
assert_stmts(compiled, stmts)
assert_stmts(cursor, cursor_stmts)
+ def test_transactional(self):
+ track = []
+ class TrackProxy(ConnectionProxy):
+ def __getattribute__(self, key):
+ fn = object.__getattribute__(self, key)
+ def go(*arg, **kw):
+ track.append(fn.__name__)
+ return fn(*arg, **kw)
+ return go
+
+ engine = engines.testing_engine(options={'proxy':TrackProxy()})
+ conn = engine.connect()
+ trans = conn.begin()
+ conn.execute("select 1")
+ trans.rollback()
+ trans = conn.begin()
+ conn.execute("select 1")
+ trans.commit()
+
+ eq_(track, ['begin', 'execute', 'cursor_execute',
+ 'rollback', 'begin', 'execute', 'cursor_execute', 'commit'])
+
+ @testing.requires.savepoints
+ @testing.requires.two_phase_transactions
+ def test_transactional_advanced(self):
+ track = []
+ class TrackProxy(ConnectionProxy):
+ def __getattribute__(self, key):
+ fn = object.__getattribute__(self, key)
+ def go(*arg, **kw):
+ track.append(fn.__name__)
+ return fn(*arg, **kw)
+ return go
+
+ engine = engines.testing_engine(options={'proxy':TrackProxy()})
+ conn = engine.connect()
+
+ trans = conn.begin()
+ trans2 = conn.begin_nested()
+ conn.execute("select 1")
+ trans2.rollback()
+ trans2 = conn.begin_nested()
+ conn.execute("select 1")
+ trans2.commit()
+ trans.rollback()
+
+ trans = conn.begin_twophase()
+ conn.execute("select 1")
+ trans.prepare()
+ trans.commit()
+
+ eq_(track, ['begin', 'savepoint', 'execute', 'cursor_execute', 'execute', 'cursor_execute',
+ 'rollback_savepoint', 'execute', 'cursor_execute', 'savepoint', 'execute',
+ 'cursor_execute', 'execute', 'cursor_execute', 'release_savepoint', 'execute',
+ 'cursor_execute', 'rollback', 'begin_twophase', 'execute', 'cursor_execute',
+ 'prepare_twophase', 'execute', 'cursor_execute', 'commit_twophase',
+ 'execute', 'cursor_execute', 'execute', 'cursor_execute'])