From 63c1800c568824b6828ac791f83fd2bf7626adcc Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Fri, 27 Aug 2010 20:17:37 -0400 Subject: [PATCH] - its probably worthwhile to make the primary listen() interface humane, i.e.: def listen(target, args) so here we provide a "wrapper" approach that allows this, and it is basically pass-by-value. a pass-by-value event *may* support rewriting some of the args in the dictionary. the current listen will become "listen_raw" since it saves about 100% overhead versus the coercion to dict, and will be used internally, and will remain pass-by-reference. proxyconnection probably will rely upon the newer style of pass-by-value for "rewrite the args" types of calls. --- lib/sqlalchemy/engine/base.py | 47 ++++++---- lib/sqlalchemy/engine/strategies.py | 6 +- lib/sqlalchemy/event.py | 58 +++++++++++- lib/sqlalchemy/interfaces.py | 55 ++++++++---- lib/sqlalchemy/orm/attributes.py | 4 + lib/sqlalchemy/orm/interfaces.py | 8 +- lib/sqlalchemy/orm/mapper.py | 10 +-- lib/sqlalchemy/pool.py | 2 +- lib/sqlalchemy/test/engines.py | 6 +- test/engine/test_execute.py | 131 ++++++++++++++++++++++------ 10 files changed, 245 insertions(+), 82 deletions(-) diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index dbba2b62fe..4843d02da6 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -1557,43 +1557,47 @@ class EngineEvents(event.Events): target.dispatch) event.Events.listen(fn, identifier, target) + @classmethod + def unwrap(cls, identifier, args): + return args + def on_execute(self, conn, execute, clauseelement, *multiparams, **params): """Intercept high level execute() events.""" - + def on_cursor_execute(self, conn, execute, cursor, statement, parameters, context, executemany): """Intercept low-level cursor execute() events.""" def on_begin(self, conn, begin): """Intercept begin() events.""" - + def on_rollback(self, conn, rollback): """Intercept rollback() events.""" - + def on_commit(self, conn, commit): """Intercept commit() events.""" - + def on_savepoint(self, conn, savepoint, name=None): """Intercept savepoint() events.""" - + def on_rollback_savepoint(self, conn, rollback_savepoint, name, context): """Intercept rollback_savepoint() events.""" - + def on_release_savepoint(self, conn, release_savepoint, name, context): """Intercept release_savepoint() events.""" - + def on_begin_twophase(self, conn, begin_twophase, xid): """Intercept begin_twophase() events.""" - + def on_prepare_twophase(self, conn, prepare_twophase, xid): """Intercept prepare_twophase() events.""" - + def on_rollback_twophase(self, conn, rollback_twophase, xid, is_prepared): """Intercept rollback_twophase() events.""" - + def on_commit_twophase(self, conn, commit_twophase, xid, is_prepared): """Intercept commit_twophase() events.""" - + class Engine(Connectable, log.Identified): """ Connects a :class:`~sqlalchemy.pool.Pool` and @@ -1846,18 +1850,27 @@ class Engine(Connectable, log.Identified): return self.pool.unique_connection() def _proxy_connection_cls(cls, dispatch): + # TODO: this is insane. + # consider some different method of + # event propagation / control, possibly + # requiring the (target, args) style of calling. + # arguments can simply be modified within the "args" + # dictionary. + + # perhaps: +# def execute(self, clauseelement, *multiparams, **params): +# for fn in dispatch.on_execute: +# ret = fn(clauseelement, multiparams, params) +# if ret: +# clauseelement, multiparams, params = \ +# ret['clauseelment'], ret['multiparams'], ret['params'] + def _exec_recursive(conn, fns, orig): if not fns: return orig def go(*arg, **kw): nested = _exec_recursive(conn, fns[1:], orig) ret = fns[0](conn, nested, *arg, **kw) - # TODO: need to get consistent way to check - # for "they called the fn, they didn't", or otherwise - # make some decision here how this is to work - #if ret is None: - # return nested(*arg, **kw) - #else: return ret return go diff --git a/lib/sqlalchemy/engine/strategies.py b/lib/sqlalchemy/engine/strategies.py index 1ef3ae6245..817c743f65 100644 --- a/lib/sqlalchemy/engine/strategies.py +++ b/lib/sqlalchemy/engine/strategies.py @@ -138,13 +138,13 @@ class DefaultEngineStrategy(EngineStrategy): return do_on_connect(conn) - event.listen(on_connect, 'on_first_connect', pool) - event.listen(on_connect, 'on_connect', pool) + event.listen_raw(on_connect, 'on_first_connect', pool) + event.listen_raw(on_connect, 'on_connect', pool) def first_connect(dbapi_connection, connection_record): c = base.Connection(engine, connection=dbapi_connection) dialect.initialize(c) - event.listen(first_connect, 'on_first_connect', pool) + event.listen_raw(first_connect, 'on_first_connect', pool) return engine diff --git a/lib/sqlalchemy/event.py b/lib/sqlalchemy/event.py index 21f05a1c95..5fae8aa242 100644 --- a/lib/sqlalchemy/event.py +++ b/lib/sqlalchemy/event.py @@ -6,9 +6,61 @@ and :mod:`sqlalchemy.orm` packages. """ from sqlalchemy import util +import inspect def listen(fn, identifier, target, *args, **kw): - """Listen for events, passing to fn.""" + """Listen for events, passing to fn. + + Event listener functions are in a consistent format:: + + def listen(event_name, args): + # ... + + Where ``event_name`` is the string name, the same + as ``identifier``, and ``args`` is a dict containing + an entry for each argument. The names match those + of the event declaration. + + """ + + for evt_cls in _registrars[identifier]: + for tgt in evt_cls.accept_with(target): + fn = _create_wrapper(evt_cls, fn, identifier) + tgt.dispatch.listen(fn, identifier, tgt, *args, **kw) + break + +def _create_wrapper(evt_cls, fn, identifier): + argspec = inspect.getargspec(getattr(evt_cls, identifier)) + arg, varargs, keywords, defaults = argspec + def go(*args, **kw): + # here we are coercing the *arg, **kw to a single + # dictionary. + + # TODO: defaults + if keywords: + kw = {keywords:kw} + for n, v in zip(arg[1:], args): + kw[n] = v + if varargs: + kw[varargs] = arg[len(args)+1:] + + fn(identifier, kw) + + # then here, we ask the Events subclass to interpret + # the dictionary back to what it wants for a return. + + return evt_cls.unwrap(identifier, kw) + + return util.update_wrapper(go, fn) + +def listen_raw(fn, identifier, target, *args, **kw): + """Listen for events, accepting an event function that's "raw". + Only the exact arguments are received in order. + + This is used by SQLA internals simply to reduce the overhead + of creating an event dictionary for each event call. + + """ # rationale - the events on ClassManager, Session, and Mapper # will need to accept mapped classes directly as targets and know @@ -85,7 +137,9 @@ class Events(object): def listen(cls, fn, identifier, target): getattr(target.dispatch, identifier).append(fn, target) - + @classmethod + def unwrap(cls, identifier, event): + return None class _DispatchDescriptor(object): """Class-level attributes on _Dispatch classes.""" diff --git a/lib/sqlalchemy/interfaces.py b/lib/sqlalchemy/interfaces.py index 36573bf437..d502afbd8c 100644 --- a/lib/sqlalchemy/interfaces.py +++ b/lib/sqlalchemy/interfaces.py @@ -13,7 +13,7 @@ class PoolListener(object): """Hooks into the lifecycle of connections in a :class:`Pool`. .. note:: :class:`PoolListener` is deprecated. Please - refer to :func:`event.listen` as well as + refer to :func:`event.listen_raw` as well as :attr:`.Pool.events`. Usage:: @@ -75,13 +75,13 @@ class PoolListener(object): listener = as_interface(listener, methods=('connect', 'first_connect', 'checkout', 'checkin')) if hasattr(listener, 'connect'): - event.listen(listener.connect, 'on_connect', self) + event.listen_raw(listener.connect, 'on_connect', self) if hasattr(listener, 'first_connect'): - event.listen(listener.first_connect, 'on_first_connect', self) + event.listen_raw(listener.first_connect, 'on_first_connect', self) if hasattr(listener, 'checkout'): - event.listen(listener.checkout, 'on_checkout', self) + event.listen_raw(listener.checkout, 'on_checkout', self) if hasattr(listener, 'checkin'): - event.listen(listener.checkin, 'on_checkin', self) + event.listen_raw(listener.checkin, 'on_checkin', self) def connect(self, dbapi_con, con_record): @@ -146,7 +146,7 @@ class ConnectionProxy(object): """Allows interception of statement execution by Connections. .. note:: :class:`ConnectionProxy` is deprecated. Please - refer to :func:`event.listen` as well as + refer to :func:`event.listen_raw` as well as :attr:`.Engine.events`. Either or both of the ``execute()`` and ``cursor_execute()`` @@ -175,29 +175,48 @@ class ConnectionProxy(object): @classmethod def _adapt_listener(cls, self, listener): - event.listen(listener.execute, 'on_execute', self) + # TODO: suppose if new style listeners used here. then we say: + + # def _wrap_in_some_way(legacy_listener): + # def go(clauseelement, *multiparams, **params): + # # 'fake' execute function. in reality just repopulates + # # the event with the given args in case they were modified. + # args.update({'clauseelement':clauseelement, 'multiparams':multiparams, 'params':params}) + # return args + # def listen(evt, args): + # return legacy_listener(args['conn'], go, args['clauseelement'], *args['multiparams'], **args['params']) + # + # event.listen(_wrap_in_some_way(self.execute), 'on_execute', self) + # + # that way all the complex crap is left in the legacy adapter, and the "re-execute" idea is + # scrapped, since it was fairly pointless. The proxyconnection stuff in base.py can just + # iterate through listeners. + # + + event.listen_raw(listener.execute, 'on_execute', self) def _adapt_cursor_execute(conn, execute, cursor, statement, parameters, context, executemany): def _re_execute(cursor, statement, parameters, context): return execute(cursor, statement, parameters, context, executemany) return listener.cursor_execute(_re_execute, cursor, statement, parameters, context, executemany) - event.listen(_adapt_cursor_execute, 'on_cursor_execute', self) - event.listen(listener.begin, 'on_begin', self) - event.listen(listener.rollback, 'on_rollback', self) - event.listen(listener.commit, 'on_commit', self) - event.listen(listener.savepoint, 'on_savepoint', self) - event.listen(listener.rollback_savepoint, 'on_rollback_savepoint', self) - event.listen(listener.release_savepoint, 'on_release_savepoint', self) - event.listen(listener.begin_twophase, 'on_begin_twophase', self) - event.listen(listener.prepare_twophase, 'on_prepare_twophase', self) - event.listen(listener.rollback_twophase, 'on_rollback_twophase', self) - event.listen(listener.commit_twophase, 'on_commit_twophase', self) + event.listen_raw(_adapt_cursor_execute, 'on_cursor_execute', self) + event.listen_raw(listener.begin, 'on_begin', self) + event.listen_raw(listener.rollback, 'on_rollback', self) + event.listen_raw(listener.commit, 'on_commit', self) + event.listen_raw(listener.savepoint, 'on_savepoint', self) + event.listen_raw(listener.rollback_savepoint, 'on_rollback_savepoint', self) + event.listen_raw(listener.release_savepoint, 'on_release_savepoint', self) + event.listen_raw(listener.begin_twophase, 'on_begin_twophase', self) + event.listen_raw(listener.prepare_twophase, 'on_prepare_twophase', self) + event.listen_raw(listener.rollback_twophase, 'on_rollback_twophase', self) + event.listen_raw(listener.commit_twophase, 'on_commit_twophase', self) def execute(self, conn, execute, clauseelement, *multiparams, **params): """Intercept high level execute() events.""" + return execute(clauseelement, *multiparams, **params) def cursor_execute(self, execute, cursor, statement, parameters, context, executemany): diff --git a/lib/sqlalchemy/orm/attributes.py b/lib/sqlalchemy/orm/attributes.py index 729968940b..93ef1a6659 100644 --- a/lib/sqlalchemy/orm/attributes.py +++ b/lib/sqlalchemy/orm/attributes.py @@ -131,6 +131,10 @@ class QueryableAttribute(interfaces.PropComparator): if active_history: target.active_history = True event.Events.listen(fn, identifier, target) + + @classmethod + def unwrap(cls, identifier, event): + return event['value'] def on_append(self, state, value, initiator): """Receive a collection append event. diff --git a/lib/sqlalchemy/orm/interfaces.py b/lib/sqlalchemy/orm/interfaces.py index 77a387f845..f53fb22406 100644 --- a/lib/sqlalchemy/orm/interfaces.py +++ b/lib/sqlalchemy/orm/interfaces.py @@ -880,7 +880,7 @@ class AttributeExtension(object): """An event handler for individual attribute change events. .. note:: :class:`AttributeExtension` is deprecated. Please - refer to :func:`event.listen` as well as + refer to :func:`event.listen_raw` as well as :attr:`AttributeImpl.events`. AttributeExtension is assembled within the descriptors associated @@ -895,9 +895,9 @@ class AttributeExtension(object): @classmethod def _adapt_listener(cls, self, listener): - event.listen(listener.append, 'on_append', self, active_history=listener.active_history) - event.listen(listener.remove, 'on_remove', self, active_history=listener.active_history) - event.listen(listener.set, 'on_set', self, active_history=listener.active_history) + event.listen_raw(listener.append, 'on_append', self, active_history=listener.active_history) + event.listen_raw(listener.remove, 'on_remove', self, active_history=listener.active_history) + event.listen_raw(listener.set, 'on_set', self, active_history=listener.active_history) def append(self, state, value, initiator): diff --git a/lib/sqlalchemy/orm/mapper.py b/lib/sqlalchemy/orm/mapper.py index ddb08039aa..cb000024a4 100644 --- a/lib/sqlalchemy/orm/mapper.py +++ b/lib/sqlalchemy/orm/mapper.py @@ -400,14 +400,14 @@ class Mapper(object): if manager.info.get(_INSTRUMENTOR, False): return - event.listen(_event_on_init, 'on_init', manager) - event.listen(_event_on_init_failure, 'on_init_failure', manager) - event.listen(_event_on_resurrect, 'on_resurrect', manager) + event.listen_raw(_event_on_init, 'on_init', manager) + event.listen_raw(_event_on_init_failure, 'on_init_failure', manager) + event.listen_raw(_event_on_resurrect, 'on_resurrect', manager) for key, method in util.iterate_attributes(self.class_): if isinstance(method, types.FunctionType): if hasattr(method, '__sa_reconstructor__'): - event.listen(method, 'on_load', manager) + event.listen_raw(method, 'on_load', manager) elif hasattr(method, '__sa_validators__'): for name in method.__sa_validators__: self._validators[name] = method @@ -415,7 +415,7 @@ class Mapper(object): if 'reconstruct_instance' in self.extension: def reconstruct(instance): self.extension.reconstruct_instance(self, instance) - event.listen(reconstruct, 'on_load', manager) + event.listen_raw(reconstruct, 'on_load', manager) manager.info[_INSTRUMENTOR] = self diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index 6c7e01c6dd..5d14c17898 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -211,7 +211,7 @@ class Pool(log.Identified): dispatch = event.dispatcher(PoolEvents) - @util.deprecated("Pool.add_listener is deprecated. Use event.listen()") + @util.deprecated(2.7, "Pool.add_listener is deprecated. Use event.listen()") def add_listener(self, listener): """Add a :class:`.PoolListener`-like object to this pool. diff --git a/lib/sqlalchemy/test/engines.py b/lib/sqlalchemy/test/engines.py index 779f872646..2b1223c27d 100644 --- a/lib/sqlalchemy/test/engines.py +++ b/lib/sqlalchemy/test/engines.py @@ -135,9 +135,9 @@ def testing_engine(url=None, options=None): options = options or config.db_opts engine = create_engine(url, **options) - event.listen(asserter.execute, 'on_execute', engine) - event.listen(asserter.cursor_execute, 'on_cursor_execute', engine) - event.listen(testing_reaper.checkout, 'on_checkout', engine.pool) + event.listen_raw(asserter.execute, 'on_execute', engine) + event.listen_raw(asserter.cursor_execute, 'on_cursor_execute', engine) + event.listen_raw(testing_reaper.checkout, 'on_checkout', engine.pool) # may want to call this, results # in first-connect initializers diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index cacc5385aa..329e2ea646 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -291,8 +291,21 @@ class ResultProxyTest(TestBase): class EngineEventsTest(TestBase): + def _assert_stmts(self, expected, received): + for stmt, params, posn in expected: + if not received: + assert False + while received: + teststmt, testparams, testmultiparams = \ + received.pop(0) + teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ', + teststmt).strip() + if teststmt.startswith(stmt) and (testparams + == params or testparams == posn): + break + @testing.fails_on('firebird', 'Data type unknown') - def test_execute_events(self): + def test_execute_events_raw(self): stmts = [] cursor_stmts = [] @@ -307,26 +320,14 @@ class EngineEventsTest(TestBase): cursor_stmts.append((str(statement), parameters, None)) return execute(cursor, statement, parameters, context, executemany) - def assert_stmts(expected, received): - for stmt, params, posn in expected: - if not received: - assert False - while received: - teststmt, testparams, testmultiparams = \ - received.pop(0) - teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ', - teststmt).strip() - if teststmt.startswith(stmt) and (testparams - == params or testparams == posn): - break for engine in [ -# engines.testing_engine(options=dict(implicit_returning=False)), + engines.testing_engine(options=dict(implicit_returning=False)), engines.testing_engine(options=dict(implicit_returning=False, strategy='threadlocal')) ]: - event.listen(execute, 'on_execute', engine) - event.listen(cursor_execute, 'on_cursor_execute', engine) + event.listen_raw(execute, 'on_execute', engine) + event.listen_raw(cursor_execute, 'on_cursor_execute', engine) m = MetaData(engine) t1 = Table('t1', m, @@ -373,10 +374,81 @@ class EngineEventsTest(TestBase): ('select * from t1', {}, ()), ('DROP TABLE t1' , {}, ())] # bind param name 'lower_2' might # be incorrect - assert_stmts(compiled, stmts) - assert_stmts(cursor, cursor_stmts) + self._assert_stmts(compiled, stmts) + self._assert_stmts(cursor, cursor_stmts) - def test_options(self): + @testing.fails_on('firebird', 'Data type unknown') + def _broken_test_execute_events_generic(self): + + stmts = [] + cursor_stmts = [] + + def listen(event_name, args): + if event_name == 'on_execute': + clauseelement, params, multiparams = \ + args['clauseelement'], args['params'], args['multiparams'] + stmts.append((str(clauseelement), params, multiparams)) + elif event_name == 'on_cursor_execute': + statement, parameters = args['statement'], args['parameters'] + cursor_stmts.append((str(statement), parameters, None)) + + for engine in [ + engines.testing_engine(options=dict(implicit_returning=False)), + engines.testing_engine(options=dict(implicit_returning=False, + strategy='threadlocal')) + ]: + event.listen(listen, 'on_execute', engine) + event.listen(listen, 'on_cursor_execute', engine) + + m = MetaData(engine) + t1 = Table('t1', m, + Column('c1', Integer, primary_key=True), + Column('c2', String(50), default=func.lower('Foo'), + primary_key=True) + ) + m.create_all() + try: + t1.insert().execute(c1=5, c2='some data') + t1.insert().execute(c1=6) + eq_(engine.execute('select * from t1').fetchall(), [(5, + 'some data'), (6, 'foo')]) + finally: + m.drop_all() + engine.dispose() + compiled = [('CREATE TABLE t1', {}, None), + ('INSERT INTO t1 (c1, c2)', {'c2': 'some data', + 'c1': 5}, None), ('INSERT INTO t1 (c1, c2)', + {'c1': 6}, None), ('select * from t1', {}, + None), ('DROP TABLE t1', {}, None)] + if not testing.against('oracle+zxjdbc'): # or engine.dialect.pr + # eexecute_pk_sequence + # s: + cursor = [ + ('CREATE TABLE t1', {}, ()), + ('INSERT INTO t1 (c1, c2)', {'c2': 'some data', 'c1' + : 5}, (5, 'some data')), + ('SELECT lower', {'lower_2': 'Foo'}, ('Foo', )), + ('INSERT INTO t1 (c1, c2)', {'c2': 'foo', 'c1': 6}, + (6, 'foo')), + ('select * from t1', {}, ()), + ('DROP TABLE t1', {}, ()), + ] + else: + insert2_params = 6, 'Foo' + if testing.against('oracle+zxjdbc'): + insert2_params += (ReturningParam(12), ) + cursor = [('CREATE TABLE t1', {}, ()), + ('INSERT INTO t1 (c1, c2)', {'c2': 'some data' + , 'c1': 5}, (5, 'some data')), + ('INSERT INTO t1 (c1, c2)', {'c1': 6, + 'lower_2': 'Foo'}, insert2_params), + ('select * from t1', {}, ()), ('DROP TABLE t1' + , {}, ())] # bind param name 'lower_2' might + # be incorrect + self._assert_stmts(compiled, stmts) + self._assert_stmts(cursor, cursor_stmts) + + def test_options_raw(self): track = [] def on_execute(conn, exec_, *args, **kw): track.append('execute') @@ -387,8 +459,8 @@ class EngineEventsTest(TestBase): return exec_(*args, **kw) engine = engines.testing_engine() - event.listen(on_execute, 'on_execute', engine) - event.listen(on_cursor_execute, 'on_cursor_execute', engine) + event.listen_raw(on_execute, 'on_execute', engine) + event.listen_raw(on_cursor_execute, 'on_cursor_execute', engine) conn = engine.connect() c2 = conn.execution_options(foo='bar') eq_(c2._execution_options, {'foo':'bar'}) @@ -398,7 +470,7 @@ class EngineEventsTest(TestBase): eq_(track, ['execute', 'cursor_execute']) - def test_transactional(self): + def test_transactional_raw(self): track = [] def tracker(name): def go(conn, exec_, *args, **kw): @@ -407,11 +479,11 @@ class EngineEventsTest(TestBase): return go engine = engines.testing_engine() - event.listen(tracker('execute'), 'on_execute', engine) - event.listen(tracker('cursor_execute'), 'on_cursor_execute', engine) - event.listen(tracker('begin'), 'on_begin', engine) - event.listen(tracker('commit'), 'on_commit', engine) - event.listen(tracker('rollback'), 'on_rollback', engine) + event.listen_raw(tracker('execute'), 'on_execute', engine) + event.listen_raw(tracker('cursor_execute'), 'on_cursor_execute', engine) + event.listen_raw(tracker('begin'), 'on_begin', engine) + event.listen_raw(tracker('commit'), 'on_commit', engine) + event.listen_raw(tracker('rollback'), 'on_rollback', engine) conn = engine.connect() trans = conn.begin() @@ -428,7 +500,7 @@ class EngineEventsTest(TestBase): @testing.requires.savepoints @testing.requires.two_phase_transactions - def test_transactional_advanced(self): + def test_transactional_advanced_raw(self): track = [] def tracker(name): def go(conn, exec_, *args, **kw): @@ -441,7 +513,7 @@ class EngineEventsTest(TestBase): 'rollback_savepoint', 'release_savepoint', 'rollback', 'begin_twophase', 'prepare_twophase', 'commit_twophase']: - event.listen(tracker(name), 'on_%s' % name, engine) + event.listen_raw(tracker(name), 'on_%s' % name, engine) conn = engine.connect() @@ -464,6 +536,7 @@ class EngineEventsTest(TestBase): 'rollback', 'begin_twophase', 'prepare_twophase', 'commit_twophase'] ) + class ProxyConnectionTest(TestBase): """These are the same tests as EngineEventsTest, except using -- 2.47.3