on_before_execute and on_after_execute really not appealing here. might have to just go back to
what it was the other day.
target.dispatch)
event.Events.listen(fn, identifier, target)
- @classmethod
- def unwrap(cls, identifier, args):
- return args
-
- def on_execute(self, conn, execute, clauseelement, *multiparams, **params):
+ def on_execute(self, conn, clauseelement, *multiparams, **params):
"""Intercept high level execute() events."""
- def on_cursor_execute(self, conn, execute, cursor, statement,
+ def on_cursor_execute(self, conn, cursor, statement,
parameters, context, executemany):
"""Intercept low-level cursor execute() events."""
- def on_begin(self, conn, begin):
+ def on_begin(self, conn):
"""Intercept begin() events."""
- def on_rollback(self, conn, rollback):
+ def on_rollback(self, conn):
"""Intercept rollback() events."""
- def on_commit(self, conn, commit):
+ def on_commit(self, conn):
"""Intercept commit() events."""
- def on_savepoint(self, conn, savepoint, name=None):
+ def on_savepoint(self, conn, name=None):
"""Intercept savepoint() events."""
- def on_rollback_savepoint(self, conn, rollback_savepoint, name, context):
+ def on_rollback_savepoint(self, conn, name, context):
"""Intercept rollback_savepoint() events."""
- def on_release_savepoint(self, conn, release_savepoint, name, context):
+ def on_release_savepoint(self, conn, name, context):
"""Intercept release_savepoint() events."""
- def on_begin_twophase(self, conn, begin_twophase, xid):
+ def on_begin_twophase(self, conn, xid):
"""Intercept begin_twophase() events."""
- def on_prepare_twophase(self, conn, prepare_twophase, xid):
+ def on_prepare_twophase(self, conn, xid):
"""Intercept prepare_twophase() events."""
- def on_rollback_twophase(self, conn, rollback_twophase, xid, is_prepared):
+ def on_rollback_twophase(self, conn, xid, is_prepared):
"""Intercept rollback_twophase() events."""
- def on_commit_twophase(self, conn, commit_twophase, xid, is_prepared):
+ def on_commit_twophase(self, conn, xid, is_prepared):
"""Intercept commit_twophase() events."""
class Engine(Connectable, log.Identified):
return self.pool.unique_connection()
def _proxy_connection_cls(cls, dispatch):
- # TODO: this is insane.
- # consider some different method of
- # event propagation / control, possibly
- # requiring the (target, args) style of calling.
- # arguments can simply be modified within the "args"
- # dictionary.
-
- # perhaps:
-# def execute(self, clauseelement, *multiparams, **params):
-# for fn in dispatch.on_execute:
-# ret = fn(clauseelement, multiparams, params)
-# if ret:
-# clauseelement, multiparams, params = \
-# ret['clauseelment'], ret['multiparams'], ret['params']
-
- def _exec_recursive(conn, fns, orig):
- if not fns:
- return orig
- def go(*arg, **kw):
- nested = _exec_recursive(conn, fns[1:], orig)
- ret = fns[0](conn, nested, *arg, **kw)
- return ret
- return go
-
class ProxyConnection(cls):
def execute(self, clauseelement, *multiparams, **params):
- g = _exec_recursive(self, dispatch.on_execute,
- super(ProxyConnection, self).execute)
- return g(clauseelement, *multiparams, **params)
+ for fn in dispatch.on_execute:
+ result = fn(self, clauseelement, *multiparams, **params)
+ if result:
+ clauseelement, multiparams, params = result
+
+ return super(ProxyConnection, self).execute(clauseelement, *multiparams, **params)
def _execute_clauseelement(self, clauseelement, multiparams=None, params=None):
return self.execute(clauseelement, *(multiparams or []), **(params or {}))
- # TODO : this is all wrong, cursor_execute() and
- # cursor_executemany() don't have a return value, need to find some
- # other way to check for executed on these
-
def _cursor_execute(self, cursor, statement,
parameters, context=None):
- g = _exec_recursive(self, dispatch.on_cursor_execute,
- self._cursor_exec)
- return g(cursor, statement, parameters, context, False)
-
- def _cursor_executemany(self, cursor, statement, parameters,
- context=None, ):
- g = _exec_recursive(self, dispatch.on_cursor_execute,
- self._cursor_exec)
- return g(cursor, statement, parameters, context, True)
-
- def _cursor_exec(self, cursor, statement, parameters, context,
- executemany):
- if executemany:
- return super(ProxyConnection,
- self)._cursor_executemany(cursor,
- statement, parameters, context)
- else:
- return super(ProxyConnection,
- self)._cursor_execute(cursor, statement,
- parameters, context)
+ for fn in dispatch.on_cursor_execute:
+ result = fn(self, cursor, statement, parameters, context, False)
+ if result:
+ statement, parameters = result
+
+ return super(ProxyConnection, self).\
+ _cursor_execute(cursor, statement, parameters, context)
+
+ def _cursor_executemany(self, cursor, statement,
+ parameters, context=None):
+ for fn in dispatch.on_cursor_execute:
+ result = fn(self, cursor, statement, parameters, context, True)
+ if result:
+ statement, parameters = result
+
+ return super(ProxyConnection, self).\
+ _cursor_executemany(cursor, statement, parameters, context)
def _begin_impl(self):
- g = _exec_recursive(self, dispatch.on_begin,
- super(ProxyConnection, self)._begin_impl)
- return g()
+ for fn in dispatch.on_begin:
+ fn(self)
+ return super(ProxyConnection, self).\
+ _begin_impl()
def _rollback_impl(self):
- g = _exec_recursive(self, dispatch.on_rollback,
- super(ProxyConnection, self)._rollback_impl)
- return g()
+ for fn in dispatch.on_rollback:
+ fn(self)
+ return super(ProxyConnection, self).\
+ _rollback_impl()
def _commit_impl(self):
- g = _exec_recursive(self, dispatch.on_commit,
- super(ProxyConnection, self)._commit_impl)
- return g()
+ for fn in dispatch.on_commit:
+ fn(self)
+ return super(ProxyConnection, self).\
+ _commit_impl()
def _savepoint_impl(self, name=None):
- g = _exec_recursive(self, dispatch.on_savepoint,
- super(ProxyConnection, self)._savepoint_impl)
- return g(name=name)
-
+ for fn in dispatch.on_savepoint:
+ fn(self, name)
+ return super(ProxyConnection, self).\
+ _savepoint_impl(name=name)
+
def _rollback_to_savepoint_impl(self, name, context):
- g = _exec_recursive(self, dispatch.on_rollback_savepoint,
- super(ProxyConnection, self)._rollback_to_savepoint_impl)
- return g(name, context)
+ for fn in dispatch.on_rollback_to_savepoint:
+ fn(self, name, context)
+ return super(ProxyConnection, self).\
+ _rollback_to_savepoint_impl(name, context)
def _release_savepoint_impl(self, name, context):
- g = _exec_recursive(self, dispatch.on_release_savepoint,
- super(ProxyConnection, self)._release_savepoint_impl)
- return g(name, context)
+ for fn in dispatch.on_release_savepoint:
+ fn(self, name, context)
+ return super(ProxyConnection, self).\
+ _release_savepoint_impl(name, context)
def _begin_twophase_impl(self, xid):
- g = _exec_recursive(self, dispatch.on_begin_twophase,
- super(ProxyConnection, self)._begin_twophase_impl)
- return g(xid)
+ for fn in dispatch.on_begin_twophase:
+ fn(self, xid)
+ return super(ProxyConnection, self).\
+ _begin_twophase_impl(xid)
def _prepare_twophase_impl(self, xid):
- g = _exec_recursive(self, dispatch.on_prepare_twophase,
- super(ProxyConnection, self)._prepare_twophase_impl)
- return g(xid)
+ for fn in dispatch.on_prepare_twophase:
+ fn(self, xid)
+ return super(ProxyConnection, self).\
+ _prepare_twophase_impl(xid)
def _rollback_twophase_impl(self, xid, is_prepared):
- g = _exec_recursive(self, dispatch.on_rollback_twophase,
- super(ProxyConnection, self)._rollback_twophase_impl)
- return g(xid, is_prepared)
+ for fn in dispatch.on_rollback_twophase:
+ fn(self, xid)
+ return super(ProxyConnection, self).\
+ _rollback_twophase_impl(xid)
def _commit_twophase_impl(self, xid, is_prepared):
- g = _exec_recursive(self, dispatch.on_commit_twophase,
- super(ProxyConnection, self)._commit_twophase_impl)
- return g(xid, is_prepared)
+ for fn in dispatch.on_commit_twophase:
+ fn(self, xid)
+ return super(ProxyConnection, self).\
+ _commit_twophase_impl(xid)
return ProxyConnection
return
do_on_connect(conn)
- event.listen_raw(on_connect, 'on_first_connect', pool)
- event.listen_raw(on_connect, 'on_connect', pool)
+ event.listen(on_connect, 'on_first_connect', pool)
+ event.listen(on_connect, 'on_connect', pool)
def first_connect(dbapi_connection, connection_record):
c = base.Connection(engine, connection=dbapi_connection)
dialect.initialize(c)
- event.listen_raw(first_connect, 'on_first_connect', pool)
+ event.listen(first_connect, 'on_first_connect', pool)
return engine
"""
from sqlalchemy import util
-import inspect
-def listen(fn, identifier, target, *args, **kw):
- """Listen for events, passing to fn.
-
- Event listener functions are in a consistent format::
-
- def listen(event_name, args):
- # ...
-
- Where ``event_name`` is the string name, the same
- as ``identifier``, and ``args`` is a dict containing
- an entry for each argument. The names match those
- of the event declaration.
-
- """
-
- for evt_cls in _registrars[identifier]:
- for tgt in evt_cls.accept_with(target):
- fn = _create_wrapper(evt_cls, fn, identifier)
- tgt.dispatch.listen(fn, identifier, tgt, *args, **kw)
- break
+CANCEL = util.symbol('CANCEL')
+NO_RETVAL = util.symbol('NO_RETVAL')
-def _create_wrapper(evt_cls, fn, identifier):
- argspec = inspect.getargspec(getattr(evt_cls, identifier))
- arg, varargs, keywords, defaults = argspec
- def go(*args, **kw):
- # here we are coercing the *arg, **kw to a single
- # dictionary.
-
- # TODO: defaults
- if keywords:
- kw = {keywords:kw}
- for n, v in zip(arg[1:], args):
- kw[n] = v
- if varargs:
- kw[varargs] = arg[len(args)+1:]
-
- fn(identifier, kw)
-
- # then here, we ask the Events subclass to interpret
- # the dictionary back to what it wants for a return.
-
- return evt_cls.unwrap(identifier, kw)
-
- return util.update_wrapper(go, fn)
-
-def listen_raw(fn, identifier, target, *args, **kw):
+def listen(fn, identifier, target, *args, **kw):
"""Listen for events, accepting an event function that's "raw".
Only the exact arguments are received in order.
@property
def descriptors(self):
return (getattr(self, k) for k in dir(self) if k.startswith("on_"))
-
+
def update(self, other):
"""Populate from the listeners in another :class:`Events` object."""
def listen(cls, fn, identifier, target):
getattr(target.dispatch, identifier).append(fn, target)
- @classmethod
- def unwrap(cls, identifier, event):
- return None
+# def update(self, other):
+# """Populate from the listeners in another :class:`Events` object."""
+
+# for ls in other.events:
+# getattr(self, ls.name).listeners.extend(ls.listeners)
class _DispatchDescriptor(object):
"""Class-level attributes on _Dispatch classes."""
if not self._exec_once:
self(*args, **kw)
self._exec_once = True
-
+
def exec_until_return(self, *args, **kw):
"""Execute listeners for this event until
one returns a non-None value.
"""Interfaces and abstract types."""
-from sqlalchemy.util import as_interface
-from sqlalchemy import event
+from sqlalchemy import event, util
class PoolListener(object):
"""Hooks into the lifecycle of connections in a :class:`Pool`.
.. note:: :class:`PoolListener` is deprecated. Please
- refer to :func:`event.listen_raw` as well as
+ refer to :func:`event.listen` as well as
:attr:`.Pool.events`.
Usage::
"""
- listener = as_interface(listener, methods=('connect',
+ listener = util.as_interface(listener, methods=('connect',
'first_connect', 'checkout', 'checkin'))
if hasattr(listener, 'connect'):
- event.listen_raw(listener.connect, 'on_connect', self)
+ event.listen(listener.connect, 'on_connect', self)
if hasattr(listener, 'first_connect'):
- event.listen_raw(listener.first_connect, 'on_first_connect', self)
+ event.listen(listener.first_connect, 'on_first_connect', self)
if hasattr(listener, 'checkout'):
- event.listen_raw(listener.checkout, 'on_checkout', self)
+ event.listen(listener.checkout, 'on_checkout', self)
if hasattr(listener, 'checkin'):
- event.listen_raw(listener.checkin, 'on_checkin', self)
+ event.listen(listener.checkin, 'on_checkin', self)
def connect(self, dbapi_con, con_record):
"""Allows interception of statement execution by Connections.
.. note:: :class:`ConnectionProxy` is deprecated. Please
- refer to :func:`event.listen_raw` as well as
+ refer to :func:`event.listen` as well as
:attr:`.Engine.events`.
Either or both of the ``execute()`` and ``cursor_execute()``
@classmethod
def _adapt_listener(cls, self, listener):
- # TODO: suppose if new style listeners used here. then we say:
-
- # def _wrap_in_some_way(legacy_listener):
- # def go(clauseelement, *multiparams, **params):
- # # 'fake' execute function. in reality just repopulates
- # # the event with the given args in case they were modified.
- # args.update({'clauseelement':clauseelement, 'multiparams':multiparams, 'params':params})
- # return args
- # def listen(evt, args):
- # return legacy_listener(args['conn'], go, args['clauseelement'], *args['multiparams'], **args['params'])
- #
- # event.listen(_wrap_in_some_way(self.execute), 'on_execute', self)
- #
- # that way all the complex crap is left in the legacy adapter, and the "re-execute" idea is
- # scrapped, since it was fairly pointless. The proxyconnection stuff in base.py can just
- # iterate through listeners.
- #
-
- event.listen_raw(listener.execute, 'on_execute', self)
- def _adapt_cursor_execute(conn, execute, cursor, statement,
+
+ def adapt_execute(conn, clauseelement, *multiparams, **params):
+ def execute_wrapper(clauseelement, *multiparams, **params):
+ return clauseelement, multiparams, params
+ return listener.execute(conn, execute_wrapper, clauseelement, *multiparams, **params)
+
+ event.listen(adapt_execute, 'on_execute', self)
+
+ def adapt_cursor_execute(conn, cursor, statement,
parameters, context, executemany):
- def _re_execute(cursor, statement, parameters, context):
- return execute(cursor, statement, parameters, context, executemany)
- return listener.cursor_execute(_re_execute, cursor, statement,
+ def execute_wrapper(cursor, statement, parameters, context):
+ return statement, parameters
+ return listener.cursor_execute(execute_wrapper, cursor, statement,
parameters, context, executemany)
- event.listen_raw(_adapt_cursor_execute, 'on_cursor_execute', self)
- event.listen_raw(listener.begin, 'on_begin', self)
- event.listen_raw(listener.rollback, 'on_rollback', self)
- event.listen_raw(listener.commit, 'on_commit', self)
- event.listen_raw(listener.savepoint, 'on_savepoint', self)
- event.listen_raw(listener.rollback_savepoint, 'on_rollback_savepoint', self)
- event.listen_raw(listener.release_savepoint, 'on_release_savepoint', self)
- event.listen_raw(listener.begin_twophase, 'on_begin_twophase', self)
- event.listen_raw(listener.prepare_twophase, 'on_prepare_twophase', self)
- event.listen_raw(listener.rollback_twophase, 'on_rollback_twophase', self)
- event.listen_raw(listener.commit_twophase, 'on_commit_twophase', self)
+
+ event.listen(adapt_cursor_execute, 'on_cursor_execute', self)
+
+ def do_nothing_callback(*arg, **kw):
+ pass
+
+ def adapt_listener(fn):
+ def go(conn, *arg, **kw):
+ fn(conn, do_nothing_callback, *arg, **kw)
+ return util.update_wrapper(go, fn)
+
+ event.listen(adapt_listener(listener.begin), 'on_begin', self)
+ event.listen(adapt_listener(listener.rollback), 'on_rollback', self)
+ event.listen(adapt_listener(listener.commit), 'on_commit', self)
+ event.listen(adapt_listener(listener.savepoint), 'on_savepoint', self)
+ event.listen(adapt_listener(listener.rollback_savepoint), 'on_rollback_savepoint', self)
+ event.listen(adapt_listener(listener.release_savepoint), 'on_release_savepoint', self)
+ event.listen(adapt_listener(listener.begin_twophase), 'on_begin_twophase', self)
+ event.listen(adapt_listener(listener.prepare_twophase), 'on_prepare_twophase', self)
+ event.listen(adapt_listener(listener.rollback_twophase), 'on_rollback_twophase', self)
+ event.listen(adapt_listener(listener.commit_twophase), 'on_commit_twophase', self)
def execute(self, conn, execute, clauseelement, *multiparams, **params):
previous is not None and
previous is not PASSIVE_NO_RESULT):
self.sethasparent(instance_state(previous), False)
-
+
for fn in self.dispatch.on_set:
value = fn(state, value, previous, initiator or self)
"""An event handler for individual attribute change events.
.. note:: :class:`AttributeExtension` is deprecated. Please
- refer to :func:`event.listen_raw` as well as
+ refer to :func:`event.listen` as well as
:attr:`AttributeImpl.events`.
AttributeExtension is assembled within the descriptors associated
@classmethod
def _adapt_listener(cls, self, listener):
- event.listen_raw(listener.append, 'on_append', self, active_history=listener.active_history)
- event.listen_raw(listener.remove, 'on_remove', self, active_history=listener.active_history)
- event.listen_raw(listener.set, 'on_set', self, active_history=listener.active_history)
+ event.listen(listener.append, 'on_append', self, active_history=listener.active_history)
+ event.listen(listener.remove, 'on_remove', self, active_history=listener.active_history)
+ event.listen(listener.set, 'on_set', self, active_history=listener.active_history)
def append(self, state, value, initiator):
if manager.info.get(_INSTRUMENTOR, False):
return
- event.listen_raw(_event_on_init, 'on_init', manager)
- event.listen_raw(_event_on_init_failure, 'on_init_failure', manager)
- event.listen_raw(_event_on_resurrect, 'on_resurrect', manager)
+ event.listen(_event_on_init, 'on_init', manager)
+ event.listen(_event_on_init_failure, 'on_init_failure', manager)
+ event.listen(_event_on_resurrect, 'on_resurrect', manager)
for key, method in util.iterate_attributes(self.class_):
if isinstance(method, types.FunctionType):
if hasattr(method, '__sa_reconstructor__'):
- event.listen_raw(method, 'on_load', manager)
+ event.listen(method, 'on_load', manager)
elif hasattr(method, '__sa_validators__'):
for name in method.__sa_validators__:
self._validators[name] = method
if 'reconstruct_instance' in self.extension:
def reconstruct(instance):
self.extension.reconstruct_instance(self, instance)
- event.listen_raw(reconstruct, 'on_load', manager)
+ event.listen(reconstruct, 'on_load', manager)
manager.info[_INSTRUMENTOR] = self
self._result = equivalent
if not self._result:
+ print "Testing for compiled statement %r partial params %r, " \
+ "received %r with params %r" % \
+ (self.statement, all_params, _received_statement, all_received)
+
self._errmsg = "Testing for compiled statement %r partial params %r, " \
"received %r with params %r" % \
(self.statement, all_params, _received_statement, all_received)
def clear_rules(self):
del self.rules
- def execute(self, conn, execute, clauseelement, *multiparams, **params):
- result = execute(clauseelement, *multiparams, **params)
-
+ def execute(self, conn, clauseelement, *multiparams, **params):
+ # TODO: this doesn't work. we need to execute before so that we know
+ # what's happened with the parameters.
+
if self.rules is not None:
if not self.rules:
assert False, "All rules have been exhausted, but further statements remain"
if rule.is_consumed():
self.rules.pop(0)
- return result
-
- def cursor_execute(self, conn, execute, cursor, statement, parameters, context, executemany):
- result = execute(cursor, statement, parameters, context, executemany)
+ def cursor_execute(self, conn, cursor, statement, parameters, context, executemany):
+ print "RECEIVE !", statement, parameters
if self.rules:
rule = self.rules[0]
rule.process_cursor_execute(statement, parameters, context, executemany)
- return result
-
asserter = SQLAssert()
options = options or config.db_opts
engine = create_engine(url, **options)
- event.listen_raw(asserter.execute, 'on_execute', engine)
- event.listen_raw(asserter.cursor_execute, 'on_cursor_execute', engine)
- event.listen_raw(testing_reaper.checkout, 'on_checkout', engine.pool)
+ event.listen(asserter.execute, 'on_execute', engine)
+ event.listen(asserter.cursor_execute, 'on_cursor_execute', engine)
+ event.listen(testing_reaper.checkout, 'on_checkout', engine.pool)
# may want to call this, results
# in first-connect initializers
stmts = []
cursor_stmts = []
- def execute(conn, execute, clauseelement, *multiparams,
+ def execute(conn, clauseelement, *multiparams,
**params ):
stmts.append((str(clauseelement), params, multiparams))
- return execute(clauseelement, *multiparams, **params)
- def cursor_execute(conn, execute, cursor, statement, parameters,
+ def cursor_execute(conn, cursor, statement, parameters,
context, executemany):
cursor_stmts.append((str(statement), parameters, None))
- return execute(cursor, statement, parameters, context, executemany)
for engine in [
engines.testing_engine(options=dict(implicit_returning=False,
strategy='threadlocal'))
]:
- event.listen_raw(execute, 'on_execute', engine)
- event.listen_raw(cursor_execute, 'on_cursor_execute', engine)
+ event.listen(execute, 'on_execute', engine)
+ event.listen(cursor_execute, 'on_cursor_execute', engine)
m = MetaData(engine)
t1 = Table('t1', m,
def test_options_raw(self):
track = []
- def on_execute(conn, exec_, *args, **kw):
+ def on_execute(conn, *args, **kw):
track.append('execute')
- return exec_(*args, **kw)
- def on_cursor_execute(conn, exec_, *args, **kw):
+ def on_cursor_execute(conn, *args, **kw):
track.append('cursor_execute')
- return exec_(*args, **kw)
engine = engines.testing_engine()
- event.listen_raw(on_execute, 'on_execute', engine)
- event.listen_raw(on_cursor_execute, 'on_cursor_execute', engine)
+ event.listen(on_execute, 'on_execute', engine)
+ event.listen(on_cursor_execute, 'on_cursor_execute', engine)
conn = engine.connect()
c2 = conn.execution_options(foo='bar')
eq_(c2._execution_options, {'foo':'bar'})
def test_transactional_raw(self):
track = []
def tracker(name):
- def go(conn, exec_, *args, **kw):
+ def go(conn, *args, **kw):
track.append(name)
- return exec_(*args, **kw)
return go
engine = engines.testing_engine()
- event.listen_raw(tracker('execute'), 'on_execute', engine)
- event.listen_raw(tracker('cursor_execute'), 'on_cursor_execute', engine)
- event.listen_raw(tracker('begin'), 'on_begin', engine)
- event.listen_raw(tracker('commit'), 'on_commit', engine)
- event.listen_raw(tracker('rollback'), 'on_rollback', engine)
+ event.listen(tracker('execute'), 'on_execute', engine)
+ event.listen(tracker('cursor_execute'), 'on_cursor_execute', engine)
+ event.listen(tracker('begin'), 'on_begin', engine)
+ event.listen(tracker('commit'), 'on_commit', engine)
+ event.listen(tracker('rollback'), 'on_rollback', engine)
conn = engine.connect()
trans = conn.begin()
'rollback_savepoint', 'release_savepoint',
'rollback', 'begin_twophase',
'prepare_twophase', 'commit_twophase']:
- event.listen_raw(tracker(name), 'on_%s' % name, engine)
+ event.listen(tracker(name), 'on_%s' % name, engine)
conn = engine.connect()