_savepoint_impl(name=name)
def _rollback_to_savepoint_impl(self, name, context):
- for fn in dispatch.on_rollback_to_savepoint:
+ for fn in dispatch.on_rollback_savepoint:
fn(self, name, context)
return super(EventListenerConnection, self).\
_rollback_to_savepoint_impl(name, context)
for fn in dispatch.on_rollback_twophase:
fn(self, xid)
return super(EventListenerConnection, self).\
- _rollback_twophase_impl(xid)
+ _rollback_twophase_impl(xid, is_prepared)
def _commit_twophase_impl(self, xid, is_prepared):
for fn in dispatch.on_commit_twophase:
- fn(self, xid)
+ fn(self, xid, is_prepared)
return super(EventListenerConnection, self).\
- _commit_twophase_impl(xid)
+ _commit_twophase_impl(xid, is_prepared)
return EventListenerConnection
'echo': 'echo_pool',
'timeout': 'pool_timeout',
'recycle': 'pool_recycle',
+ 'events':'pool_events',
'use_threadlocal':'pool_threadlocal'}
for k in util.get_cls_kwargs(poolclass):
tk = translate.get(k, k)
logging_name=None,
reset_on_return=True,
listeners=None,
+ events=None,
_dispatch=None):
"""
Construct a Pool.
connections returned to the pool. This is typically a
ROLLBACK to release locks and transaction resources.
Disable at your own peril. Defaults to True.
-
+
+ :param events: a list of 2-tuples, each of the form
+ ``(callable, target)`` which will be passed to event.listen()
+ upon construction. Provided here so that event listeners
+ can be assigned via ``create_engine`` before dialect-level
+ listeners are applied.
+
:param listeners: Deprecated. A list of
:class:`~sqlalchemy.interfaces.PoolListener`-like objects or
dictionaries of callables that receive events when DB-API
self.echo = echo
if _dispatch:
self.dispatch.update(_dispatch, only_propagate=False)
+ if events:
+ for fn, target in events:
+ event.listen(fn, target, self)
if listeners:
util.warn_deprecated(
"The 'listeners' argument to Pool (and "
# end Py2K
from sqlalchemy import *
-from sqlalchemy import sql, exc, schema, types as sqltypes
+from sqlalchemy import sql, exc, schema, types as sqltypes, event
from sqlalchemy.dialects.mysql import base as mysql
from sqlalchemy.test.testing import eq_
from sqlalchemy.test import *
__only_on__ = 'mysql'
def _options(self, modes):
- class SetOptions(object):
- def first_connect(self, con, record):
- self.connect(con, record)
- def connect(self, con, record):
- cursor = con.cursor()
- cursor.execute("set sql_mode='%s'" % (",".join(modes)))
- return engines.testing_engine(options={"listeners":[SetOptions()]})
+ def connect(con, record):
+ cursor = con.cursor()
+ print "DOING THiS:", "set sql_mode='%s'" % (",".join(modes))
+ cursor.execute("set sql_mode='%s'" % (",".join(modes)))
+ e = engines.testing_engine(options={
+ 'pool_events':[
+ (connect, 'on_first_connect'),
+ (connect, 'on_connect')
+ ]
+ })
+ return e
def test_backslash_escapes(self):
engine = self._options(['NO_BACKSLASH_ESCAPES'])
def teardown(self):
metadata.drop_all()
- metadata.tables.clear()
+ metadata.clear()
if self.engine is not testing.db:
self.engine.dispose()
def test_transactional_advanced(self):
canary = []
def tracker(name):
- def go(conn, exec_, *args, **kw):
+ def go(*args, **kw):
canary.append(name)
- return exec_(*args, **kw)
return go
engine = engines.testing_engine()
[(1, ), (2, )])
connection.close()
+ # PG emergency shutdown:
+ # select * from pg_prepared_xacts
+ # ROLLBACK PREPARED '<xid>'
@testing.requires.two_phase_transactions
@testing.requires.savepoints
def test_mixed_two_phase_transaction(self):
order_by(users.c.user_id)).fetchall(),
[(1, ), (2, ), (5, )])
connection.close()
-
+
@testing.requires.two_phase_transactions
@testing.crashes('mysql+oursql',
'Times out in full test runs only, causing '
order_by(users.c.user_id))
eq_(result.fetchall(), [('user1', ), ('user4', )])
conn.close()
-
+
class AutoRollbackTest(TestBase):
@classmethod
class NoSaveCascadeFlushTest(_fixtures.FixtureTest):
"""Test related item not present in session, commit proceeds."""
+
+ run_inserts = None
@testing.resolve_artifact_names
def _one_to_many_fixture(self, o2m_cascade=True,