target.dispatch)
event.Events.listen(fn, identifier, target)
+ @classmethod
+ def unwrap(cls, identifier, args):
+ return args
+
def on_execute(self, conn, execute, clauseelement, *multiparams, **params):
"""Intercept high level execute() events."""
-
+
def on_cursor_execute(self, conn, execute, cursor, statement,
parameters, context, executemany):
"""Intercept low-level cursor execute() events."""
def on_begin(self, conn, begin):
"""Intercept begin() events."""
-
+
def on_rollback(self, conn, rollback):
"""Intercept rollback() events."""
-
+
def on_commit(self, conn, commit):
"""Intercept commit() events."""
-
+
def on_savepoint(self, conn, savepoint, name=None):
"""Intercept savepoint() events."""
-
+
def on_rollback_savepoint(self, conn, rollback_savepoint, name, context):
"""Intercept rollback_savepoint() events."""
-
+
def on_release_savepoint(self, conn, release_savepoint, name, context):
"""Intercept release_savepoint() events."""
-
+
def on_begin_twophase(self, conn, begin_twophase, xid):
"""Intercept begin_twophase() events."""
-
+
def on_prepare_twophase(self, conn, prepare_twophase, xid):
"""Intercept prepare_twophase() events."""
-
+
def on_rollback_twophase(self, conn, rollback_twophase, xid, is_prepared):
"""Intercept rollback_twophase() events."""
-
+
def on_commit_twophase(self, conn, commit_twophase, xid, is_prepared):
"""Intercept commit_twophase() events."""
-
+
class Engine(Connectable, log.Identified):
"""
Connects a :class:`~sqlalchemy.pool.Pool` and
return self.pool.unique_connection()
def _proxy_connection_cls(cls, dispatch):
+ # TODO: this is insane.
+ # consider some different method of
+ # event propagation / control, possibly
+ # requiring the (target, args) style of calling.
+ # arguments can simply be modified within the "args"
+ # dictionary.
+
+ # perhaps:
+# def execute(self, clauseelement, *multiparams, **params):
+# for fn in dispatch.on_execute:
+# ret = fn(clauseelement, multiparams, params)
+# if ret:
+# clauseelement, multiparams, params = \
+# ret['clauseelment'], ret['multiparams'], ret['params']
+
def _exec_recursive(conn, fns, orig):
if not fns:
return orig
def go(*arg, **kw):
nested = _exec_recursive(conn, fns[1:], orig)
ret = fns[0](conn, nested, *arg, **kw)
- # TODO: need to get consistent way to check
- # for "they called the fn, they didn't", or otherwise
- # make some decision here how this is to work
- #if ret is None:
- # return nested(*arg, **kw)
- #else:
return ret
return go
return
do_on_connect(conn)
- event.listen(on_connect, 'on_first_connect', pool)
- event.listen(on_connect, 'on_connect', pool)
+ event.listen_raw(on_connect, 'on_first_connect', pool)
+ event.listen_raw(on_connect, 'on_connect', pool)
def first_connect(dbapi_connection, connection_record):
c = base.Connection(engine, connection=dbapi_connection)
dialect.initialize(c)
- event.listen(first_connect, 'on_first_connect', pool)
+ event.listen_raw(first_connect, 'on_first_connect', pool)
return engine
"""
from sqlalchemy import util
+import inspect
def listen(fn, identifier, target, *args, **kw):
- """Listen for events, passing to fn."""
+ """Listen for events, passing to fn.
+
+ Event listener functions are in a consistent format::
+
+ def listen(event_name, args):
+ # ...
+
+ Where ``event_name`` is the string name, the same
+ as ``identifier``, and ``args`` is a dict containing
+ an entry for each argument. The names match those
+ of the event declaration.
+
+ """
+
+ for evt_cls in _registrars[identifier]:
+ for tgt in evt_cls.accept_with(target):
+ fn = _create_wrapper(evt_cls, fn, identifier)
+ tgt.dispatch.listen(fn, identifier, tgt, *args, **kw)
+ break
+
+def _create_wrapper(evt_cls, fn, identifier):
+ argspec = inspect.getargspec(getattr(evt_cls, identifier))
+ arg, varargs, keywords, defaults = argspec
+ def go(*args, **kw):
+ # here we are coercing the *arg, **kw to a single
+ # dictionary.
+
+ # TODO: defaults
+ if keywords:
+ kw = {keywords:kw}
+ for n, v in zip(arg[1:], args):
+ kw[n] = v
+ if varargs:
+ kw[varargs] = arg[len(args)+1:]
+
+ fn(identifier, kw)
+
+ # then here, we ask the Events subclass to interpret
+ # the dictionary back to what it wants for a return.
+
+ return evt_cls.unwrap(identifier, kw)
+
+ return util.update_wrapper(go, fn)
+
+def listen_raw(fn, identifier, target, *args, **kw):
+ """Listen for events, accepting an event function that's "raw".
+ Only the exact arguments are received in order.
+
+ This is used by SQLA internals simply to reduce the overhead
+ of creating an event dictionary for each event call.
+
+ """
# rationale - the events on ClassManager, Session, and Mapper
# will need to accept mapped classes directly as targets and know
def listen(cls, fn, identifier, target):
getattr(target.dispatch, identifier).append(fn, target)
-
+ @classmethod
+ def unwrap(cls, identifier, event):
+ return None
class _DispatchDescriptor(object):
"""Class-level attributes on _Dispatch classes."""
"""Hooks into the lifecycle of connections in a :class:`Pool`.
.. note:: :class:`PoolListener` is deprecated. Please
- refer to :func:`event.listen` as well as
+ refer to :func:`event.listen_raw` as well as
:attr:`.Pool.events`.
Usage::
listener = as_interface(listener, methods=('connect',
'first_connect', 'checkout', 'checkin'))
if hasattr(listener, 'connect'):
- event.listen(listener.connect, 'on_connect', self)
+ event.listen_raw(listener.connect, 'on_connect', self)
if hasattr(listener, 'first_connect'):
- event.listen(listener.first_connect, 'on_first_connect', self)
+ event.listen_raw(listener.first_connect, 'on_first_connect', self)
if hasattr(listener, 'checkout'):
- event.listen(listener.checkout, 'on_checkout', self)
+ event.listen_raw(listener.checkout, 'on_checkout', self)
if hasattr(listener, 'checkin'):
- event.listen(listener.checkin, 'on_checkin', self)
+ event.listen_raw(listener.checkin, 'on_checkin', self)
def connect(self, dbapi_con, con_record):
"""Allows interception of statement execution by Connections.
.. note:: :class:`ConnectionProxy` is deprecated. Please
- refer to :func:`event.listen` as well as
+ refer to :func:`event.listen_raw` as well as
:attr:`.Engine.events`.
Either or both of the ``execute()`` and ``cursor_execute()``
@classmethod
def _adapt_listener(cls, self, listener):
- event.listen(listener.execute, 'on_execute', self)
+ # TODO: suppose if new style listeners used here. then we say:
+
+ # def _wrap_in_some_way(legacy_listener):
+ # def go(clauseelement, *multiparams, **params):
+ # # 'fake' execute function. in reality just repopulates
+ # # the event with the given args in case they were modified.
+ # args.update({'clauseelement':clauseelement, 'multiparams':multiparams, 'params':params})
+ # return args
+ # def listen(evt, args):
+ # return legacy_listener(args['conn'], go, args['clauseelement'], *args['multiparams'], **args['params'])
+ #
+ # event.listen(_wrap_in_some_way(self.execute), 'on_execute', self)
+ #
+ # that way all the complex crap is left in the legacy adapter, and the "re-execute" idea is
+ # scrapped, since it was fairly pointless. The proxyconnection stuff in base.py can just
+ # iterate through listeners.
+ #
+
+ event.listen_raw(listener.execute, 'on_execute', self)
def _adapt_cursor_execute(conn, execute, cursor, statement,
parameters, context, executemany):
def _re_execute(cursor, statement, parameters, context):
return execute(cursor, statement, parameters, context, executemany)
return listener.cursor_execute(_re_execute, cursor, statement,
parameters, context, executemany)
- event.listen(_adapt_cursor_execute, 'on_cursor_execute', self)
- event.listen(listener.begin, 'on_begin', self)
- event.listen(listener.rollback, 'on_rollback', self)
- event.listen(listener.commit, 'on_commit', self)
- event.listen(listener.savepoint, 'on_savepoint', self)
- event.listen(listener.rollback_savepoint, 'on_rollback_savepoint', self)
- event.listen(listener.release_savepoint, 'on_release_savepoint', self)
- event.listen(listener.begin_twophase, 'on_begin_twophase', self)
- event.listen(listener.prepare_twophase, 'on_prepare_twophase', self)
- event.listen(listener.rollback_twophase, 'on_rollback_twophase', self)
- event.listen(listener.commit_twophase, 'on_commit_twophase', self)
+ event.listen_raw(_adapt_cursor_execute, 'on_cursor_execute', self)
+ event.listen_raw(listener.begin, 'on_begin', self)
+ event.listen_raw(listener.rollback, 'on_rollback', self)
+ event.listen_raw(listener.commit, 'on_commit', self)
+ event.listen_raw(listener.savepoint, 'on_savepoint', self)
+ event.listen_raw(listener.rollback_savepoint, 'on_rollback_savepoint', self)
+ event.listen_raw(listener.release_savepoint, 'on_release_savepoint', self)
+ event.listen_raw(listener.begin_twophase, 'on_begin_twophase', self)
+ event.listen_raw(listener.prepare_twophase, 'on_prepare_twophase', self)
+ event.listen_raw(listener.rollback_twophase, 'on_rollback_twophase', self)
+ event.listen_raw(listener.commit_twophase, 'on_commit_twophase', self)
def execute(self, conn, execute, clauseelement, *multiparams, **params):
"""Intercept high level execute() events."""
+
return execute(clauseelement, *multiparams, **params)
def cursor_execute(self, execute, cursor, statement, parameters, context, executemany):
if active_history:
target.active_history = True
event.Events.listen(fn, identifier, target)
+
+ @classmethod
+ def unwrap(cls, identifier, event):
+ return event['value']
def on_append(self, state, value, initiator):
"""Receive a collection append event.
"""An event handler for individual attribute change events.
.. note:: :class:`AttributeExtension` is deprecated. Please
- refer to :func:`event.listen` as well as
+ refer to :func:`event.listen_raw` as well as
:attr:`AttributeImpl.events`.
AttributeExtension is assembled within the descriptors associated
@classmethod
def _adapt_listener(cls, self, listener):
- event.listen(listener.append, 'on_append', self, active_history=listener.active_history)
- event.listen(listener.remove, 'on_remove', self, active_history=listener.active_history)
- event.listen(listener.set, 'on_set', self, active_history=listener.active_history)
+ event.listen_raw(listener.append, 'on_append', self, active_history=listener.active_history)
+ event.listen_raw(listener.remove, 'on_remove', self, active_history=listener.active_history)
+ event.listen_raw(listener.set, 'on_set', self, active_history=listener.active_history)
def append(self, state, value, initiator):
if manager.info.get(_INSTRUMENTOR, False):
return
- event.listen(_event_on_init, 'on_init', manager)
- event.listen(_event_on_init_failure, 'on_init_failure', manager)
- event.listen(_event_on_resurrect, 'on_resurrect', manager)
+ event.listen_raw(_event_on_init, 'on_init', manager)
+ event.listen_raw(_event_on_init_failure, 'on_init_failure', manager)
+ event.listen_raw(_event_on_resurrect, 'on_resurrect', manager)
for key, method in util.iterate_attributes(self.class_):
if isinstance(method, types.FunctionType):
if hasattr(method, '__sa_reconstructor__'):
- event.listen(method, 'on_load', manager)
+ event.listen_raw(method, 'on_load', manager)
elif hasattr(method, '__sa_validators__'):
for name in method.__sa_validators__:
self._validators[name] = method
if 'reconstruct_instance' in self.extension:
def reconstruct(instance):
self.extension.reconstruct_instance(self, instance)
- event.listen(reconstruct, 'on_load', manager)
+ event.listen_raw(reconstruct, 'on_load', manager)
manager.info[_INSTRUMENTOR] = self
dispatch = event.dispatcher(PoolEvents)
- @util.deprecated("Pool.add_listener is deprecated. Use event.listen()")
+ @util.deprecated(2.7, "Pool.add_listener is deprecated. Use event.listen()")
def add_listener(self, listener):
"""Add a :class:`.PoolListener`-like object to this pool.
options = options or config.db_opts
engine = create_engine(url, **options)
- event.listen(asserter.execute, 'on_execute', engine)
- event.listen(asserter.cursor_execute, 'on_cursor_execute', engine)
- event.listen(testing_reaper.checkout, 'on_checkout', engine.pool)
+ event.listen_raw(asserter.execute, 'on_execute', engine)
+ event.listen_raw(asserter.cursor_execute, 'on_cursor_execute', engine)
+ event.listen_raw(testing_reaper.checkout, 'on_checkout', engine.pool)
# may want to call this, results
# in first-connect initializers
class EngineEventsTest(TestBase):
+ def _assert_stmts(self, expected, received):
+ for stmt, params, posn in expected:
+ if not received:
+ assert False
+ while received:
+ teststmt, testparams, testmultiparams = \
+ received.pop(0)
+ teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ',
+ teststmt).strip()
+ if teststmt.startswith(stmt) and (testparams
+ == params or testparams == posn):
+ break
+
@testing.fails_on('firebird', 'Data type unknown')
- def test_execute_events(self):
+ def test_execute_events_raw(self):
stmts = []
cursor_stmts = []
cursor_stmts.append((str(statement), parameters, None))
return execute(cursor, statement, parameters, context, executemany)
- def assert_stmts(expected, received):
- for stmt, params, posn in expected:
- if not received:
- assert False
- while received:
- teststmt, testparams, testmultiparams = \
- received.pop(0)
- teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ',
- teststmt).strip()
- if teststmt.startswith(stmt) and (testparams
- == params or testparams == posn):
- break
for engine in [
-# engines.testing_engine(options=dict(implicit_returning=False)),
+ engines.testing_engine(options=dict(implicit_returning=False)),
engines.testing_engine(options=dict(implicit_returning=False,
strategy='threadlocal'))
]:
- event.listen(execute, 'on_execute', engine)
- event.listen(cursor_execute, 'on_cursor_execute', engine)
+ event.listen_raw(execute, 'on_execute', engine)
+ event.listen_raw(cursor_execute, 'on_cursor_execute', engine)
m = MetaData(engine)
t1 = Table('t1', m,
('select * from t1', {}, ()), ('DROP TABLE t1'
, {}, ())] # bind param name 'lower_2' might
# be incorrect
- assert_stmts(compiled, stmts)
- assert_stmts(cursor, cursor_stmts)
+ self._assert_stmts(compiled, stmts)
+ self._assert_stmts(cursor, cursor_stmts)
- def test_options(self):
+ @testing.fails_on('firebird', 'Data type unknown')
+ def _broken_test_execute_events_generic(self):
+
+ stmts = []
+ cursor_stmts = []
+
+ def listen(event_name, args):
+ if event_name == 'on_execute':
+ clauseelement, params, multiparams = \
+ args['clauseelement'], args['params'], args['multiparams']
+ stmts.append((str(clauseelement), params, multiparams))
+ elif event_name == 'on_cursor_execute':
+ statement, parameters = args['statement'], args['parameters']
+ cursor_stmts.append((str(statement), parameters, None))
+
+ for engine in [
+ engines.testing_engine(options=dict(implicit_returning=False)),
+ engines.testing_engine(options=dict(implicit_returning=False,
+ strategy='threadlocal'))
+ ]:
+ event.listen(listen, 'on_execute', engine)
+ event.listen(listen, 'on_cursor_execute', engine)
+
+ m = MetaData(engine)
+ t1 = Table('t1', m,
+ Column('c1', Integer, primary_key=True),
+ Column('c2', String(50), default=func.lower('Foo'),
+ primary_key=True)
+ )
+ m.create_all()
+ try:
+ t1.insert().execute(c1=5, c2='some data')
+ t1.insert().execute(c1=6)
+ eq_(engine.execute('select * from t1').fetchall(), [(5,
+ 'some data'), (6, 'foo')])
+ finally:
+ m.drop_all()
+ engine.dispose()
+ compiled = [('CREATE TABLE t1', {}, None),
+ ('INSERT INTO t1 (c1, c2)', {'c2': 'some data',
+ 'c1': 5}, None), ('INSERT INTO t1 (c1, c2)',
+ {'c1': 6}, None), ('select * from t1', {},
+ None), ('DROP TABLE t1', {}, None)]
+ if not testing.against('oracle+zxjdbc'): # or engine.dialect.pr
+ # eexecute_pk_sequence
+ # s:
+ cursor = [
+ ('CREATE TABLE t1', {}, ()),
+ ('INSERT INTO t1 (c1, c2)', {'c2': 'some data', 'c1'
+ : 5}, (5, 'some data')),
+ ('SELECT lower', {'lower_2': 'Foo'}, ('Foo', )),
+ ('INSERT INTO t1 (c1, c2)', {'c2': 'foo', 'c1': 6},
+ (6, 'foo')),
+ ('select * from t1', {}, ()),
+ ('DROP TABLE t1', {}, ()),
+ ]
+ else:
+ insert2_params = 6, 'Foo'
+ if testing.against('oracle+zxjdbc'):
+ insert2_params += (ReturningParam(12), )
+ cursor = [('CREATE TABLE t1', {}, ()),
+ ('INSERT INTO t1 (c1, c2)', {'c2': 'some data'
+ , 'c1': 5}, (5, 'some data')),
+ ('INSERT INTO t1 (c1, c2)', {'c1': 6,
+ 'lower_2': 'Foo'}, insert2_params),
+ ('select * from t1', {}, ()), ('DROP TABLE t1'
+ , {}, ())] # bind param name 'lower_2' might
+ # be incorrect
+ self._assert_stmts(compiled, stmts)
+ self._assert_stmts(cursor, cursor_stmts)
+
+ def test_options_raw(self):
track = []
def on_execute(conn, exec_, *args, **kw):
track.append('execute')
return exec_(*args, **kw)
engine = engines.testing_engine()
- event.listen(on_execute, 'on_execute', engine)
- event.listen(on_cursor_execute, 'on_cursor_execute', engine)
+ event.listen_raw(on_execute, 'on_execute', engine)
+ event.listen_raw(on_cursor_execute, 'on_cursor_execute', engine)
conn = engine.connect()
c2 = conn.execution_options(foo='bar')
eq_(c2._execution_options, {'foo':'bar'})
eq_(track, ['execute', 'cursor_execute'])
- def test_transactional(self):
+ def test_transactional_raw(self):
track = []
def tracker(name):
def go(conn, exec_, *args, **kw):
return go
engine = engines.testing_engine()
- event.listen(tracker('execute'), 'on_execute', engine)
- event.listen(tracker('cursor_execute'), 'on_cursor_execute', engine)
- event.listen(tracker('begin'), 'on_begin', engine)
- event.listen(tracker('commit'), 'on_commit', engine)
- event.listen(tracker('rollback'), 'on_rollback', engine)
+ event.listen_raw(tracker('execute'), 'on_execute', engine)
+ event.listen_raw(tracker('cursor_execute'), 'on_cursor_execute', engine)
+ event.listen_raw(tracker('begin'), 'on_begin', engine)
+ event.listen_raw(tracker('commit'), 'on_commit', engine)
+ event.listen_raw(tracker('rollback'), 'on_rollback', engine)
conn = engine.connect()
trans = conn.begin()
@testing.requires.savepoints
@testing.requires.two_phase_transactions
- def test_transactional_advanced(self):
+ def test_transactional_advanced_raw(self):
track = []
def tracker(name):
def go(conn, exec_, *args, **kw):
'rollback_savepoint', 'release_savepoint',
'rollback', 'begin_twophase',
'prepare_twophase', 'commit_twophase']:
- event.listen(tracker(name), 'on_%s' % name, engine)
+ event.listen_raw(tracker(name), 'on_%s' % name, engine)
conn = engine.connect()
'rollback', 'begin_twophase',
'prepare_twophase', 'commit_twophase']
)
+
class ProxyConnectionTest(TestBase):
"""These are the same tests as EngineEventsTest, except using