]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- its probably worthwhile to make the primary listen() interface humane, i.e.:
authorMike Bayer <mike_mp@zzzcomputing.com>
Sat, 28 Aug 2010 00:17:37 +0000 (20:17 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sat, 28 Aug 2010 00:17:37 +0000 (20:17 -0400)
 def listen(target, args)

so here we provide a "wrapper" approach that allows this, and it is
basically pass-by-value.  a pass-by-value event *may* support rewriting
some of the args in the dictionary.

 the current
listen will become "listen_raw" since it saves about 100% overhead versus
the coercion to dict, and will be used internally, and will remain
pass-by-reference.

proxyconnection probably will rely upon the newer style of pass-by-value
for "rewrite the args" types of calls.

lib/sqlalchemy/engine/base.py
lib/sqlalchemy/engine/strategies.py
lib/sqlalchemy/event.py
lib/sqlalchemy/interfaces.py
lib/sqlalchemy/orm/attributes.py
lib/sqlalchemy/orm/interfaces.py
lib/sqlalchemy/orm/mapper.py
lib/sqlalchemy/pool.py
lib/sqlalchemy/test/engines.py
test/engine/test_execute.py

index dbba2b62fe9ac0987ed327d3d4960c3b630358fb..4843d02da6c2d36c3c533d2d52d063250a0473a8 100644 (file)
@@ -1557,43 +1557,47 @@ class EngineEvents(event.Events):
                                         target.dispatch)
         event.Events.listen(fn, identifier, target)
 
+    @classmethod
+    def unwrap(cls, identifier, args):
+        return args
+
     def on_execute(self, conn, execute, clauseelement, *multiparams, **params):
         """Intercept high level execute() events."""
-
+        
     def on_cursor_execute(self, conn, execute, cursor, statement, 
                         parameters, context, executemany):
         """Intercept low-level cursor execute() events."""
 
     def on_begin(self, conn, begin):
         """Intercept begin() events."""
-
+        
     def on_rollback(self, conn, rollback):
         """Intercept rollback() events."""
-
+        
     def on_commit(self, conn, commit):
         """Intercept commit() events."""
-
+        
     def on_savepoint(self, conn, savepoint, name=None):
         """Intercept savepoint() events."""
-
+        
     def on_rollback_savepoint(self, conn, rollback_savepoint, name, context):
         """Intercept rollback_savepoint() events."""
-
+        
     def on_release_savepoint(self, conn, release_savepoint, name, context):
         """Intercept release_savepoint() events."""
-
+        
     def on_begin_twophase(self, conn, begin_twophase, xid):
         """Intercept begin_twophase() events."""
-
+        
     def on_prepare_twophase(self, conn, prepare_twophase, xid):
         """Intercept prepare_twophase() events."""
-
+        
     def on_rollback_twophase(self, conn, rollback_twophase, xid, is_prepared):
         """Intercept rollback_twophase() events."""
-
+        
     def on_commit_twophase(self, conn, commit_twophase, xid, is_prepared):
         """Intercept commit_twophase() events."""
-
+        
 class Engine(Connectable, log.Identified):
     """
     Connects a :class:`~sqlalchemy.pool.Pool` and 
@@ -1846,18 +1850,27 @@ class Engine(Connectable, log.Identified):
         return self.pool.unique_connection()
 
 def _proxy_connection_cls(cls, dispatch):
+    # TODO: this is insane.
+    # consider some different method of 
+    # event propagation / control, possibly
+    # requiring the (target, args) style of calling.
+    # arguments can simply be modified within the "args"
+    # dictionary.
+    
+    # perhaps:
+#    def execute(self, clauseelement, *multiparams, **params):
+#        for fn in dispatch.on_execute:
+#            ret = fn(clauseelement, multiparams, params)
+#            if ret:
+#               clauseelement, multiparams, params = \
+#                    ret['clauseelment'], ret['multiparams'], ret['params']
+    
     def _exec_recursive(conn, fns, orig):
         if not fns:
             return orig
         def go(*arg, **kw):
             nested = _exec_recursive(conn, fns[1:], orig)
             ret = fns[0](conn, nested, *arg, **kw)
-            # TODO: need to get consistent way to check 
-            # for "they called the fn, they didn't", or otherwise
-            # make some decision here how this is to work
-            #if ret is None:
-            #    return nested(*arg, **kw)
-            #else:
             return ret
         return go
 
index 1ef3ae6245d442fa468e52001b6fb9892b9d5a53..817c743f653fb9d892ae5f85763968358e28a02a 100644 (file)
@@ -138,13 +138,13 @@ class DefaultEngineStrategy(EngineStrategy):
                         return
                     do_on_connect(conn)
                 
-                event.listen(on_connect, 'on_first_connect', pool)
-                event.listen(on_connect, 'on_connect', pool)
+                event.listen_raw(on_connect, 'on_first_connect', pool)
+                event.listen_raw(on_connect, 'on_connect', pool)
                     
             def first_connect(dbapi_connection, connection_record):
                 c = base.Connection(engine, connection=dbapi_connection)
                 dialect.initialize(c)
-            event.listen(first_connect, 'on_first_connect', pool)
+            event.listen_raw(first_connect, 'on_first_connect', pool)
 
         return engine
 
index 21f05a1c9540e5555dc4bd8b8c1f25d34e579088..5fae8aa242551dcd4b552c17ffe8c6ac92b4e2fc 100644 (file)
@@ -6,9 +6,61 @@ and :mod:`sqlalchemy.orm` packages.
 """
 
 from sqlalchemy import util
+import inspect
 
 def listen(fn, identifier, target, *args, **kw):
-    """Listen for events, passing to fn."""
+    """Listen for events, passing to fn.
+    
+    Event listener functions are in a consistent format::
+        
+        def listen(event_name, args):
+            # ...
+            
+    Where ``event_name`` is the string name, the same 
+    as ``identifier``, and ``args`` is a dict containing
+    an entry for each argument.  The names match those 
+    of the event declaration.
+    
+    """
+
+    for evt_cls in _registrars[identifier]:
+        for tgt in evt_cls.accept_with(target):
+            fn = _create_wrapper(evt_cls, fn, identifier)
+            tgt.dispatch.listen(fn, identifier, tgt, *args, **kw)
+            break
+
+def _create_wrapper(evt_cls, fn, identifier):
+    argspec = inspect.getargspec(getattr(evt_cls, identifier))
+    arg, varargs, keywords, defaults = argspec
+    def go(*args, **kw):
+        # here we are coercing the *arg, **kw to a single 
+        # dictionary.
+        
+        # TODO: defaults
+        if keywords:
+            kw = {keywords:kw}
+        for n, v in zip(arg[1:], args):
+            kw[n] = v
+        if varargs:
+            kw[varargs] = arg[len(args)+1:]
+        
+        fn(identifier, kw)
+        
+        # then here, we ask the Events subclass to interpret
+        # the dictionary back to what it wants for a return.
+        
+        return evt_cls.unwrap(identifier, kw)
+        
+    return util.update_wrapper(go, fn)
+    
+def listen_raw(fn, identifier, target, *args, **kw):
+    """Listen for events, accepting an event function that's "raw".
+    Only the exact arguments are received in order.
+    
+    This is used by SQLA internals simply to reduce the overhead
+    of creating an event dictionary for each event call.
+    
+    """
 
     # rationale - the events on ClassManager, Session, and Mapper
     # will need to accept mapped classes directly as targets and know 
@@ -85,7 +137,9 @@ class Events(object):
     def listen(cls, fn, identifier, target):
         getattr(target.dispatch, identifier).append(fn, target)
 
-
+    @classmethod
+    def unwrap(cls, identifier, event):
+        return None
     
 class _DispatchDescriptor(object):
     """Class-level attributes on _Dispatch classes."""
index 36573bf4371d3d86824e80d574870a7a15275b61..d502afbd8c76cfe49228dd3c2e63ef4e1a2f0aa8 100644 (file)
@@ -13,7 +13,7 @@ class PoolListener(object):
     """Hooks into the lifecycle of connections in a :class:`Pool`.
 
     .. note:: :class:`PoolListener` is deprecated.   Please
-       refer to :func:`event.listen` as well as 
+       refer to :func:`event.listen_raw` as well as 
        :attr:`.Pool.events`.
     
     Usage::
@@ -75,13 +75,13 @@ class PoolListener(object):
         listener = as_interface(listener, methods=('connect',
                                 'first_connect', 'checkout', 'checkin'))
         if hasattr(listener, 'connect'):
-            event.listen(listener.connect, 'on_connect', self)
+            event.listen_raw(listener.connect, 'on_connect', self)
         if hasattr(listener, 'first_connect'):
-            event.listen(listener.first_connect, 'on_first_connect', self)
+            event.listen_raw(listener.first_connect, 'on_first_connect', self)
         if hasattr(listener, 'checkout'):
-            event.listen(listener.checkout, 'on_checkout', self)
+            event.listen_raw(listener.checkout, 'on_checkout', self)
         if hasattr(listener, 'checkin'):
-            event.listen(listener.checkin, 'on_checkin', self)
+            event.listen_raw(listener.checkin, 'on_checkin', self)
             
         
     def connect(self, dbapi_con, con_record):
@@ -146,7 +146,7 @@ class ConnectionProxy(object):
     """Allows interception of statement execution by Connections.
 
     .. note:: :class:`ConnectionProxy` is deprecated.   Please
-       refer to :func:`event.listen` as well as 
+       refer to :func:`event.listen_raw` as well as 
        :attr:`.Engine.events`.
     
     Either or both of the ``execute()`` and ``cursor_execute()``
@@ -175,29 +175,48 @@ class ConnectionProxy(object):
     
     @classmethod
     def _adapt_listener(cls, self, listener):
-        event.listen(listener.execute, 'on_execute', self)
+        # TODO: suppose if new style listeners used here.  then we say:
+        
+        # def _wrap_in_some_way(legacy_listener):
+        #   def go(clauseelement, *multiparams, **params):
+        #       # 'fake' execute function.  in reality just repopulates
+        #       # the event with the given args in case they were modified.
+        #       args.update({'clauseelement':clauseelement, 'multiparams':multiparams, 'params':params})
+        #       return args
+        #   def listen(evt, args):
+        #       return legacy_listener(args['conn'], go, args['clauseelement'], *args['multiparams'], **args['params'])
+        # 
+        # event.listen(_wrap_in_some_way(self.execute), 'on_execute', self)
+        # 
+        # that way all the complex crap is left in the legacy adapter, and the "re-execute" idea is
+        # scrapped, since it was fairly pointless.  The proxyconnection stuff in base.py can just 
+        # iterate through listeners.
+        #
+
+        event.listen_raw(listener.execute, 'on_execute', self)
         def _adapt_cursor_execute(conn, execute, cursor, statement, 
                                     parameters, context, executemany):
             def _re_execute(cursor, statement, parameters, context):
                 return execute(cursor, statement, parameters, context, executemany)
             return listener.cursor_execute(_re_execute, cursor, statement, 
                                         parameters, context, executemany)
-        event.listen(_adapt_cursor_execute, 'on_cursor_execute', self)
-        event.listen(listener.begin, 'on_begin', self)
-        event.listen(listener.rollback, 'on_rollback', self)
-        event.listen(listener.commit, 'on_commit', self)
-        event.listen(listener.savepoint, 'on_savepoint', self)
-        event.listen(listener.rollback_savepoint, 'on_rollback_savepoint', self)
-        event.listen(listener.release_savepoint, 'on_release_savepoint', self)
-        event.listen(listener.begin_twophase, 'on_begin_twophase', self)
-        event.listen(listener.prepare_twophase, 'on_prepare_twophase', self)
-        event.listen(listener.rollback_twophase, 'on_rollback_twophase', self)
-        event.listen(listener.commit_twophase, 'on_commit_twophase', self)
+        event.listen_raw(_adapt_cursor_execute, 'on_cursor_execute', self)
+        event.listen_raw(listener.begin, 'on_begin', self)
+        event.listen_raw(listener.rollback, 'on_rollback', self)
+        event.listen_raw(listener.commit, 'on_commit', self)
+        event.listen_raw(listener.savepoint, 'on_savepoint', self)
+        event.listen_raw(listener.rollback_savepoint, 'on_rollback_savepoint', self)
+        event.listen_raw(listener.release_savepoint, 'on_release_savepoint', self)
+        event.listen_raw(listener.begin_twophase, 'on_begin_twophase', self)
+        event.listen_raw(listener.prepare_twophase, 'on_prepare_twophase', self)
+        event.listen_raw(listener.rollback_twophase, 'on_rollback_twophase', self)
+        event.listen_raw(listener.commit_twophase, 'on_commit_twophase', self)
         
         
     def execute(self, conn, execute, clauseelement, *multiparams, **params):
         """Intercept high level execute() events."""
         
+        
         return execute(clauseelement, *multiparams, **params)
 
     def cursor_execute(self, execute, cursor, statement, parameters, context, executemany):
index 729968940b14bc5cc2485ad6169990b0767d642a..93ef1a6659721d1fe0dbdf38387302e4e5a9fba1 100644 (file)
@@ -131,6 +131,10 @@ class QueryableAttribute(interfaces.PropComparator):
             if active_history:
                 target.active_history = True
             event.Events.listen(fn, identifier, target)
+        
+        @classmethod
+        def unwrap(cls, identifier, event):
+            return event['value']
             
         def on_append(self, state, value, initiator):
             """Receive a collection append event.
index 77a387f845e613ad9d3d2d1e116bf7cb9985c5bd..f53fb224064cb9229b5cb036f750e7f87e520bc1 100644 (file)
@@ -880,7 +880,7 @@ class AttributeExtension(object):
     """An event handler for individual attribute change events.
     
     .. note:: :class:`AttributeExtension` is deprecated.   Please
-       refer to :func:`event.listen` as well as 
+       refer to :func:`event.listen_raw` as well as 
        :attr:`AttributeImpl.events`.
     
     AttributeExtension is assembled within the descriptors associated
@@ -895,9 +895,9 @@ class AttributeExtension(object):
 
     @classmethod
     def _adapt_listener(cls, self, listener):
-        event.listen(listener.append, 'on_append', self, active_history=listener.active_history)
-        event.listen(listener.remove, 'on_remove', self, active_history=listener.active_history)
-        event.listen(listener.set, 'on_set', self, active_history=listener.active_history)
+        event.listen_raw(listener.append, 'on_append', self, active_history=listener.active_history)
+        event.listen_raw(listener.remove, 'on_remove', self, active_history=listener.active_history)
+        event.listen_raw(listener.set, 'on_set', self, active_history=listener.active_history)
         
     
     def append(self, state, value, initiator):
index ddb08039aa01075bf1fb334abdf72fedb037930f..cb000024a4d191bdc6196a3074307e14a4387134 100644 (file)
@@ -400,14 +400,14 @@ class Mapper(object):
         if manager.info.get(_INSTRUMENTOR, False):
             return
 
-        event.listen(_event_on_init, 'on_init', manager)
-        event.listen(_event_on_init_failure, 'on_init_failure', manager)
-        event.listen(_event_on_resurrect, 'on_resurrect', manager)
+        event.listen_raw(_event_on_init, 'on_init', manager)
+        event.listen_raw(_event_on_init_failure, 'on_init_failure', manager)
+        event.listen_raw(_event_on_resurrect, 'on_resurrect', manager)
         
         for key, method in util.iterate_attributes(self.class_):
             if isinstance(method, types.FunctionType):
                 if hasattr(method, '__sa_reconstructor__'):
-                    event.listen(method, 'on_load', manager)
+                    event.listen_raw(method, 'on_load', manager)
                 elif hasattr(method, '__sa_validators__'):
                     for name in method.__sa_validators__:
                         self._validators[name] = method
@@ -415,7 +415,7 @@ class Mapper(object):
         if 'reconstruct_instance' in self.extension:
             def reconstruct(instance):
                 self.extension.reconstruct_instance(self, instance)
-            event.listen(reconstruct, 'on_load', manager)
+            event.listen_raw(reconstruct, 'on_load', manager)
 
         manager.info[_INSTRUMENTOR] = self
 
index 6c7e01c6dd300e1bcfad2a05a1c4df0cc351e6ec..5d14c178984c10344ac39d56f49c2201cb5d868c 100644 (file)
@@ -211,7 +211,7 @@ class Pool(log.Identified):
 
     dispatch = event.dispatcher(PoolEvents)
         
-    @util.deprecated("Pool.add_listener is deprecated.  Use event.listen()")
+    @util.deprecated(2.7, "Pool.add_listener is deprecated.  Use event.listen()")
     def add_listener(self, listener):
         """Add a :class:`.PoolListener`-like object to this pool.
         
index 779f872646a8c289815422f2591963df20abc636..2b1223c27daa32073ee35a4a9809b2d1399be717 100644 (file)
@@ -135,9 +135,9 @@ def testing_engine(url=None, options=None):
     options = options or config.db_opts
 
     engine = create_engine(url, **options)
-    event.listen(asserter.execute, 'on_execute', engine)
-    event.listen(asserter.cursor_execute, 'on_cursor_execute', engine)
-    event.listen(testing_reaper.checkout, 'on_checkout', engine.pool)
+    event.listen_raw(asserter.execute, 'on_execute', engine)
+    event.listen_raw(asserter.cursor_execute, 'on_cursor_execute', engine)
+    event.listen_raw(testing_reaper.checkout, 'on_checkout', engine.pool)
     
     # may want to call this, results
     # in first-connect initializers
index cacc5385aa9e1cdf28422ca9e5999e6f538aa494..329e2ea646b589741bc7fbf8b067e2e3785265e5 100644 (file)
@@ -291,8 +291,21 @@ class ResultProxyTest(TestBase):
 
 class EngineEventsTest(TestBase):
 
+    def _assert_stmts(self, expected, received):
+        for stmt, params, posn in expected:
+            if not received:
+                assert False
+            while received:
+                teststmt, testparams, testmultiparams = \
+                    received.pop(0)
+                teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ',
+                        teststmt).strip()
+                if teststmt.startswith(stmt) and (testparams
+                        == params or testparams == posn):
+                    break
+
     @testing.fails_on('firebird', 'Data type unknown')
-    def test_execute_events(self):
+    def test_execute_events_raw(self):
 
         stmts = []
         cursor_stmts = []
@@ -307,26 +320,14 @@ class EngineEventsTest(TestBase):
             cursor_stmts.append((str(statement), parameters, None))
             return execute(cursor, statement, parameters, context, executemany)
 
-        def assert_stmts(expected, received):
-            for stmt, params, posn in expected:
-                if not received:
-                    assert False
-                while received:
-                    teststmt, testparams, testmultiparams = \
-                        received.pop(0)
-                    teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ',
-                            teststmt).strip()
-                    if teststmt.startswith(stmt) and (testparams
-                            == params or testparams == posn):
-                        break
 
         for engine in [
-#            engines.testing_engine(options=dict(implicit_returning=False)), 
+            engines.testing_engine(options=dict(implicit_returning=False)), 
             engines.testing_engine(options=dict(implicit_returning=False,
                                    strategy='threadlocal'))
             ]:
-            event.listen(execute, 'on_execute', engine)
-            event.listen(cursor_execute, 'on_cursor_execute', engine)
+            event.listen_raw(execute, 'on_execute', engine)
+            event.listen_raw(cursor_execute, 'on_cursor_execute', engine)
             
             m = MetaData(engine)
             t1 = Table('t1', m, 
@@ -373,10 +374,81 @@ class EngineEventsTest(TestBase):
                           ('select * from t1', {}, ()), ('DROP TABLE t1'
                           , {}, ())]  # bind param name 'lower_2' might
                                       # be incorrect
-            assert_stmts(compiled, stmts)
-            assert_stmts(cursor, cursor_stmts)
+            self._assert_stmts(compiled, stmts)
+            self._assert_stmts(cursor, cursor_stmts)
 
-    def test_options(self):
+    @testing.fails_on('firebird', 'Data type unknown')
+    def _broken_test_execute_events_generic(self):
+
+        stmts = []
+        cursor_stmts = []
+
+        def listen(event_name, args):
+            if event_name == 'on_execute':
+                clauseelement, params, multiparams = \
+                    args['clauseelement'], args['params'], args['multiparams']
+                stmts.append((str(clauseelement), params, multiparams))
+            elif event_name == 'on_cursor_execute':
+                statement, parameters = args['statement'], args['parameters']
+                cursor_stmts.append((str(statement), parameters, None))
+
+        for engine in [
+            engines.testing_engine(options=dict(implicit_returning=False)), 
+            engines.testing_engine(options=dict(implicit_returning=False,
+                                   strategy='threadlocal'))
+            ]:
+            event.listen(listen, 'on_execute', engine)
+            event.listen(listen, 'on_cursor_execute', engine)
+
+            m = MetaData(engine)
+            t1 = Table('t1', m, 
+                Column('c1', Integer, primary_key=True), 
+                Column('c2', String(50), default=func.lower('Foo'),
+                                            primary_key=True)
+            )
+            m.create_all()
+            try:
+                t1.insert().execute(c1=5, c2='some data')
+                t1.insert().execute(c1=6)
+                eq_(engine.execute('select * from t1').fetchall(), [(5,
+                    'some data'), (6, 'foo')])
+            finally:
+                m.drop_all()
+            engine.dispose()
+            compiled = [('CREATE TABLE t1', {}, None),
+                        ('INSERT INTO t1 (c1, c2)', {'c2': 'some data',
+                        'c1': 5}, None), ('INSERT INTO t1 (c1, c2)',
+                        {'c1': 6}, None), ('select * from t1', {},
+                        None), ('DROP TABLE t1', {}, None)]
+            if not testing.against('oracle+zxjdbc'):  # or engine.dialect.pr
+                                                      # eexecute_pk_sequence
+                                                      # s:
+                cursor = [
+                    ('CREATE TABLE t1', {}, ()),
+                    ('INSERT INTO t1 (c1, c2)', {'c2': 'some data', 'c1'
+                     : 5}, (5, 'some data')),
+                    ('SELECT lower', {'lower_2': 'Foo'}, ('Foo', )),
+                    ('INSERT INTO t1 (c1, c2)', {'c2': 'foo', 'c1': 6},
+                     (6, 'foo')),
+                    ('select * from t1', {}, ()),
+                    ('DROP TABLE t1', {}, ()),
+                    ]
+            else:
+                insert2_params = 6, 'Foo'
+                if testing.against('oracle+zxjdbc'):
+                    insert2_params += (ReturningParam(12), )
+                cursor = [('CREATE TABLE t1', {}, ()),
+                          ('INSERT INTO t1 (c1, c2)', {'c2': 'some data'
+                          , 'c1': 5}, (5, 'some data')),
+                          ('INSERT INTO t1 (c1, c2)', {'c1': 6,
+                          'lower_2': 'Foo'}, insert2_params),
+                          ('select * from t1', {}, ()), ('DROP TABLE t1'
+                          , {}, ())]  # bind param name 'lower_2' might
+                                      # be incorrect
+            self._assert_stmts(compiled, stmts)
+            self._assert_stmts(cursor, cursor_stmts)
+
+    def test_options_raw(self):
         track = []
         def on_execute(conn, exec_, *args, **kw):
             track.append('execute')
@@ -387,8 +459,8 @@ class EngineEventsTest(TestBase):
             return exec_(*args, **kw)
             
         engine = engines.testing_engine()
-        event.listen(on_execute, 'on_execute', engine)
-        event.listen(on_cursor_execute, 'on_cursor_execute', engine)
+        event.listen_raw(on_execute, 'on_execute', engine)
+        event.listen_raw(on_cursor_execute, 'on_cursor_execute', engine)
         conn = engine.connect()
         c2 = conn.execution_options(foo='bar')
         eq_(c2._execution_options, {'foo':'bar'})
@@ -398,7 +470,7 @@ class EngineEventsTest(TestBase):
         eq_(track, ['execute', 'cursor_execute'])
 
 
-    def test_transactional(self):
+    def test_transactional_raw(self):
         track = []
         def tracker(name):
             def go(conn, exec_, *args, **kw):
@@ -407,11 +479,11 @@ class EngineEventsTest(TestBase):
             return go
             
         engine = engines.testing_engine()
-        event.listen(tracker('execute'), 'on_execute', engine)
-        event.listen(tracker('cursor_execute'), 'on_cursor_execute', engine)
-        event.listen(tracker('begin'), 'on_begin', engine)
-        event.listen(tracker('commit'), 'on_commit', engine)
-        event.listen(tracker('rollback'), 'on_rollback', engine)
+        event.listen_raw(tracker('execute'), 'on_execute', engine)
+        event.listen_raw(tracker('cursor_execute'), 'on_cursor_execute', engine)
+        event.listen_raw(tracker('begin'), 'on_begin', engine)
+        event.listen_raw(tracker('commit'), 'on_commit', engine)
+        event.listen_raw(tracker('rollback'), 'on_rollback', engine)
         
         conn = engine.connect()
         trans = conn.begin()
@@ -428,7 +500,7 @@ class EngineEventsTest(TestBase):
 
     @testing.requires.savepoints
     @testing.requires.two_phase_transactions
-    def test_transactional_advanced(self):
+    def test_transactional_advanced_raw(self):
         track = []
         def tracker(name):
             def go(conn, exec_, *args, **kw):
@@ -441,7 +513,7 @@ class EngineEventsTest(TestBase):
                     'rollback_savepoint', 'release_savepoint',
                     'rollback', 'begin_twophase', 
                        'prepare_twophase', 'commit_twophase']:
-            event.listen(tracker(name), 'on_%s' % name, engine)
+            event.listen_raw(tracker(name), 'on_%s' % name, engine)
 
         conn = engine.connect()
 
@@ -464,6 +536,7 @@ class EngineEventsTest(TestBase):
                     'rollback', 'begin_twophase', 
                        'prepare_twophase', 'commit_twophase']
         )
+    
         
 class ProxyConnectionTest(TestBase):
     """These are the same tests as EngineEventsTest, except using