def my_on_connect(dbapi_con, connection_record):
print "New DBAPI connection:", dbapi_con
- listen(my_on_connect, 'on_connect', Pool)
+ listen(Pool, 'on_connect', my_on_connect)
Targets
-------
my_engine = create_engine('postgresql://ed@localhost/test')
# associate listener with all instances of Pool
- listen(my_on_connect, 'on_connect', Pool)
+ listen(Pool, 'on_connect', my_on_connect)
# associate listener with all instances of Pool
# via the Engine class
- listen(my_on_connect, 'on_connect', Engine)
+ listen(Engine, 'on_connect', my_on_connect)
# associate listener with my_pool
- listen(my_on_connect, 'on_connect', my_pool)
+ listen(my_pool, 'on_connect', my_on_connect)
# associate listener with my_engine.pool
- listen(my_on_connect, 'on_connect', my_engine)
+ listen(my_engine, 'on_connect', my_on_connect)
Modifiers
----------
# setup listener on UserContact.phone attribute, instructing
# it to use the return value
- listen(validate_phone, 'on_set', UserContact.phone, retval=True)
+ listen(UserContact.phone, 'on_set', validate_phone, retval=True)
Event Reference
----------------
from sqlalchemy import event
event.listen(
- AddConstraint(constraint),
+ users,
"on_after_create",
- users
+ AddConstraint(constraint)
)
event.listen(
- DropConstraint(constraint),
+ users,
"on_before_drop",
- users
+ DropConstraint(constraint)
)
{sql}users.create(engine)
Postgresql and not other databases, we could limit its usage to just that dialect::
event.listen(
- AddConstraint(constraint).execute_if(dialect='postgresql'),
+ users,
'on_after_create',
- users
+ AddConstraint(constraint).execute_if(dialect='postgresql')
)
event.listen(
- DropConstraint(constraint).execute_if(dialect='postgresql'),
+ users,
'on_before_drop',
- users
+ DropConstraint(constraint).execute_if(dialect='postgresql')
)
Or to any set of dialects::
event.listen(
- AddConstraint(constraint).execute_if(dialect=('postgresql', 'mysql')),
+ users,
"on_after_create",
- users
+ AddConstraint(constraint).execute_if(dialect=('postgresql', 'mysql'))
)
event.listen(
- DropConstraint(constraint).execute_if(dialect=('postgresql', 'mysql')),
+ users,
"on_before_drop",
- users
+ DropConstraint(constraint).execute_if(dialect=('postgresql', 'mysql'))
)
When using a callable, the callable is passed the ddl element, the
return not should_create(ddl, target, connection, **kw)
event.listen(
- AddConstraint(constraint).execute_if(callable_=should_create),
+ users,
"on_after_create",
- users
+ AddConstraint(constraint).execute_if(callable_=should_create)
)
event.listen(
- DropConstraint(constraint).execute_if(callable_=should_drop),
+ users,
"on_before_drop",
- users
+ DropConstraint(constraint).execute_if(callable_=should_drop)
)
{sql}users.create(engine)
.. sourcecode:: python+sql
event.listen(
+ metadata,
+ "on_after_create",
DDL("ALTER TABLE users ADD CONSTRAINT "
"cst_user_name_length "
- " CHECK (length(user_name) >= 8)"),
- "on_after_create",
- metadata
+ " CHECK (length(user_name) >= 8)")
)
A more comprehensive method of creating libraries of DDL constructs is to use
def set_(instance, value, oldvalue, initiator):
instance.receive_change_event("set", key, value, oldvalue)
- event.listen(append, 'on_append', inst)
- event.listen(remove, 'on_remove', inst)
- event.listen(set_, 'on_set', inst)
+ event.listen(inst, 'on_append', append)
+ event.listen(inst, 'on_remove', remove)
+ event.listen(inst, 'on_set', set_)
if __name__ == '__main__':
Base = declarative_base(cls=Base)
- event.listen(configure_listener, 'on_attribute_instrument', Base)
+ event.listen(Base, 'on_attribute_instrument', configure_listener)
class MyMappedClass(Base):
__tablename__ = "mytable"
return
do_on_connect(conn)
- event.listen(on_connect, 'on_first_connect', pool)
- event.listen(on_connect, 'on_connect', pool)
+ event.listen(pool, 'on_first_connect', on_connect)
+ event.listen(pool, 'on_connect', on_connect)
def first_connect(dbapi_connection, connection_record):
c = base.Connection(engine, connection=dbapi_connection)
dialect.initialize(c)
- event.listen(first_connect, 'on_first_connect', pool)
+ event.listen(pool, 'on_first_connect', first_connect)
return engine
class TLEvents(events.EngineEvents):
@classmethod
- def listen(cls, fn, identifier, target):
+ def listen(cls, target, identifier, fn):
if target.TLConnection is TLConnection:
target.TLConnection = base._listener_connection_cls(
TLConnection,
target.dispatch)
- events.EngineEvents.listen(fn, identifier, target)
+ events.EngineEvents.listen(target, identifier, fn)
class TLEngine(base.Engine):
"""An Engine that includes support for thread-local managed transactions."""
CANCEL = util.symbol('CANCEL')
NO_RETVAL = util.symbol('NO_RETVAL')
-def listen(fn, identifier, target, *args, **kw):
+def listen(target, identifier, fn, *args, **kw):
"""Register a listener function for the given target.
"""
for evt_cls in _registrars[identifier]:
tgt = evt_cls.accept_with(target)
if tgt is not None:
- tgt.dispatch.listen(fn, identifier, tgt, *args, **kw)
+ tgt.dispatch.listen(tgt, identifier, fn, *args, **kw)
return
raise exc.InvalidRequestError("No such event %s for target %s" %
(identifier,target))
-def remove(fn, identifier, target):
+def remove(target, identifier, fn):
"""Remove an event listener.
Note that some event removals, particularly for those event dispatchers
"""
for evt_cls in _registrars[identifier]:
for tgt in evt_cls.accept_with(target):
- tgt.dispatch.remove(fn, identifier, tgt, *args, **kw)
+ tgt.dispatch.remove(identifier, tgt, fn, *args, **kw)
return
_registrars = util.defaultdict(list)
return None
@classmethod
- def listen(cls, fn, identifier, target, propagate=False):
+ def listen(cls, target, identifier, fn, propagate=False):
getattr(target.dispatch, identifier).append(fn, target, propagate)
@classmethod
- def remove(cls, fn, identifier, target):
+ def remove(cls, target, identifier, fn):
getattr(target.dispatch, identifier).remove(fn, target)
@classmethod
connection.execute("ALTER TABLE %s SET name=foo_%s" %
(target.name, target.name))
- event.listen(on_after_create, "on_after_create", some_table)
+ event.listen(some_table, "on_after_create", on_after_create)
DDL events integrate closely with the
:class:`.DDL` class and the :class:`.DDLElement` hierarchy
from sqlalchemy import DDL
event.listen(
- DDL("ALTER TABLE %(table)s SET name=foo_%(table)s"),
+ some_table,
"on_after_create",
- some_table
+ DDL("ALTER TABLE %(table)s SET name=foo_%(table)s")
)
The methods here define the name of an event as well
def my_on_checkout(dbapi_conn, connection_rec, connection_proxy):
"handle an on checkout event"
- events.listen(my_on_checkout, 'on_checkout', Pool)
+ events.listen(Pool, 'on_checkout', my_on_checkout)
In addition to accepting the :class:`.Pool` class and :class:`.Pool` instances,
:class:`.PoolEvents` also accepts :class:`.Engine` objects and
engine = create_engine("postgresql://scott:tiger@localhost/test")
# will associate with engine.pool
- events.listen(my_on_checkout, 'on_checkout', engine)
+ events.listen(engine, 'on_checkout', my_on_checkout)
"""
log.info("Received statement: %s" % clauseelement)
engine = create_engine('postgresql://scott:tiger@localhost/test')
- event.listen(on_before_execute, "on_before_execute", engine)
+ event.listen(engine, "on_before_execute", on_before_execute)
Some events allow modifiers to the listen() function.
"""
@classmethod
- def listen(cls, fn, identifier, target, retval=False):
+ def listen(cls, target, identifier, fn, retval=False):
from sqlalchemy.engine.base import Connection, \
_listener_connection_cls
if target.Connection is Connection:
"'on_before_cursor_execute' engine "
"event listeners accept the 'retval=True' "
"argument.")
- event.Events.listen(fn, identifier, target)
+ event.Events.listen(target, identifier, fn)
def on_before_execute(self, conn, clauseelement, multiparams, params):
"""Intercept high level execute() events."""
listener = util.as_interface(listener, methods=('connect',
'first_connect', 'checkout', 'checkin'))
if hasattr(listener, 'connect'):
- event.listen(listener.connect, 'on_connect', self)
+ event.listen(self, 'on_connect', listener.connect)
if hasattr(listener, 'first_connect'):
- event.listen(listener.first_connect, 'on_first_connect', self)
+ event.listen(self, 'on_first_connect', listener.first_connect)
if hasattr(listener, 'checkout'):
- event.listen(listener.checkout, 'on_checkout', self)
+ event.listen(self, 'on_checkout', listener.checkout)
if hasattr(listener, 'checkin'):
- event.listen(listener.checkin, 'on_checkin', self)
+ event.listen(self, 'on_checkin', listener.checkin)
def connect(self, dbapi_con, con_record):
clauseelement, *multiparams,
**params)
- event.listen(adapt_execute, 'on_before_execute', self)
+ event.listen(self, 'on_before_execute', adapt_execute)
def adapt_cursor_execute(conn, cursor, statement,
parameters,context, executemany, ):
executemany,
)
- event.listen(adapt_cursor_execute, 'on_before_cursor_execute',
- self)
+ event.listen(self, 'on_before_cursor_execute', adapt_cursor_execute)
def do_nothing_callback(*arg, **kw):
pass
return util.update_wrapper(go, fn)
- event.listen(adapt_listener(listener.begin), 'on_begin', self)
- event.listen(adapt_listener(listener.rollback), 'on_rollback',
- self)
- event.listen(adapt_listener(listener.commit), 'on_commit', self)
- event.listen(adapt_listener(listener.savepoint), 'on_savepoint'
- , self)
- event.listen(adapt_listener(listener.rollback_savepoint),
- 'on_rollback_savepoint', self)
- event.listen(adapt_listener(listener.release_savepoint),
- 'on_release_savepoint', self)
- event.listen(adapt_listener(listener.begin_twophase),
- 'on_begin_twophase', self)
- event.listen(adapt_listener(listener.prepare_twophase),
- 'on_prepare_twophase', self)
- event.listen(adapt_listener(listener.rollback_twophase),
- 'on_rollback_twophase', self)
- event.listen(adapt_listener(listener.commit_twophase),
- 'on_commit_twophase', self)
+ event.listen(self, 'on_begin', adapt_listener(listener.begin))
+ event.listen(self, 'on_rollback',
+ adapt_listener(listener.rollback))
+ event.listen(self, 'on_commit', adapt_listener(listener.commit))
+ event.listen(self, 'on_savepoint',
+ adapt_listener(listener.savepoint))
+ event.listen(self, 'on_rollback_savepoint',
+ adapt_listener(listener.rollback_savepoint))
+ event.listen(self, 'on_release_savepoint',
+ adapt_listener(listener.release_savepoint))
+ event.listen(self, 'on_begin_twophase',
+ adapt_listener(listener.begin_twophase))
+ event.listen(self, 'on_prepare_twophase',
+ adapt_listener(listener.prepare_twophase))
+ event.listen(self, 'on_rollback_twophase',
+ adapt_listener(listener.rollback_twophase))
+ event.listen(self, 'on_commit_twophase',
+ adapt_listener(listener.commit_twophase))
def execute(self, conn, execute, clauseelement, *multiparams, **params):
passive=PASSIVE_NO_FETCH)
if uselist:
- event.listen(append, "on_append", attribute, retval=False, raw=True)
+ event.listen(attribute, "on_append", append, retval=False, raw=True)
else:
- event.listen(set_, "on_set", attribute, retval=False, raw=True)
+ event.listen(attribute, "on_set", set_, retval=False, raw=True)
# TODO: need coverage in test/orm/ of remove event
- event.listen(remove, "on_remove", attribute, retval=False, raw=True)
+ event.listen(attribute, "on_remove", remove, retval=False, raw=True)
class History(tuple):
"""A 3-tuple of added, unchanged and deleted values,
def reconstruct(instance):
ls_meth(self, instance)
return reconstruct
- event.listen(go(ls_meth), 'on_load',
- self.class_manager, raw=False, propagate=True)
+ event.listen(self.class_manager, 'on_load',
+ go(ls_meth), raw=False, propagate=True)
elif meth == 'init_instance':
def go(ls_meth):
def init_instance(instance, args, kwargs):
self.class_manager.original_init,
instance, args, kwargs)
return init_instance
- event.listen(go(ls_meth), 'on_init',
- self.class_manager, raw=False, propagate=True)
+ event.listen(self.class_manager, 'on_init',
+ go(ls_meth), raw=False, propagate=True)
elif meth == 'init_failed':
def go(ls_meth):
def init_failed(instance, args, kwargs):
instance, args, kwargs)
return init_failed
- event.listen(go(ls_meth), 'on_init_failure',
- self.class_manager, raw=False, propagate=True)
+ event.listen(self.class_manager, 'on_init_failure',
+ go(ls_meth), raw=False, propagate=True)
else:
- event.listen(ls_meth, "on_%s" % meth, self,
+ event.listen(self, "on_%s" % meth, ls_meth,
raw=False, retval=True, propagate=True)
@classmethod
def _adapt_listener(cls, self, listener):
- event.listen(listener.before_commit, 'on_before_commit', self)
- event.listen(listener.after_commit, 'on_after_commit', self)
- event.listen(listener.after_rollback, 'on_after_rollback', self)
- event.listen(listener.before_flush, 'on_before_flush', self)
- event.listen(listener.after_flush, 'on_after_flush', self)
- event.listen(listener.after_flush_postexec, 'on_after_flush_postexec', self)
- event.listen(listener.after_begin, 'on_after_begin', self)
- event.listen(listener.after_attach, 'on_after_attach', self)
- event.listen(listener.after_bulk_update, 'on_after_bulk_update', self)
- event.listen(listener.after_bulk_delete, 'on_after_bulk_delete', self)
+ event.listen(self, 'on_before_commit', listener.before_commit)
+ event.listen(self, 'on_after_commit', listener.after_commit)
+ event.listen(self, 'on_after_rollback', listener.after_rollback)
+ event.listen(self, 'on_before_flush', listener.before_flush)
+ event.listen(self, 'on_after_flush', listener.after_flush)
+ event.listen(self, 'on_after_flush_postexec', listener.after_flush_postexec)
+ event.listen(self, 'on_after_begin', listener.after_begin)
+ event.listen(self, 'on_after_attach', listener.after_attach)
+ event.listen(self, 'on_after_bulk_update', listener.after_bulk_update)
+ event.listen(self, 'on_after_bulk_delete', listener.after_bulk_delete)
def before_commit(self, session):
"""Execute right before commit is called.
@classmethod
def _adapt_listener(cls, self, listener):
- event.listen(listener.append, 'on_append', self,
+ event.listen(self, 'on_append', listener.append,
active_history=listener.active_history,
raw=True, retval=True)
- event.listen(listener.remove, 'on_remove', self,
+ event.listen(self, 'on_remove', listener.remove,
active_history=listener.active_history,
raw=True, retval=True)
- event.listen(listener.set, 'on_set', self,
+ event.listen(self, 'on_set', listener.set,
active_history=listener.active_history,
raw=True, retval=True)
return None
@classmethod
- def listen(cls, fn, identifier, target, propagate=False):
- event.Events.listen(fn, identifier, target, propagate=propagate)
+ def listen(cls, target, identifier, fn, propagate=False):
+ event.Events.listen(target, identifier, fn, propagate=propagate)
@classmethod
- def remove(cls, fn, identifier, target):
+ def remove(cls, identifier, target, fn):
raise NotImplementedError("Removal of instrumentation events not yet implemented")
def on_class_instrument(self, cls):
return None
@classmethod
- def listen(cls, fn, identifier, target, raw=False, propagate=False):
+ def listen(cls, target, identifier, fn, raw=False, propagate=False):
if not raw:
orig_fn = fn
def wrap(state, *arg, **kw):
return orig_fn(state.obj(), *arg, **kw)
fn = wrap
- event.Events.listen(fn, identifier, target, propagate=propagate)
+ event.Events.listen(target, identifier, fn, propagate=propagate)
if propagate:
for mgr in target.subclass_managers(True):
- event.Events.listen(fn, identifier, mgr, True)
+ event.Events.listen(mgr, identifier, fn, True)
@classmethod
- def remove(cls, fn, identifier, target):
+ def remove(cls, identifier, target, fn):
raise NotImplementedError("Removal of instance events not yet implemented")
def on_init(self, target, args, kwargs):
# associate the listener function with SomeMappedClass,
# to execute during the "on_before_insert" hook
- event.listen(my_before_insert_listener, 'on_before_insert', SomeMappedClass)
+ event.listen(SomeMappedClass, 'on_before_insert', my_before_insert_listener)
Available targets include mapped classes, instances of
:class:`.Mapper` (i.e. returned by :func:`.mapper`,
log.debug("Instance %s being inserted" % target)
# attach to all mappers
- event.listen(some_listener, 'on_before_insert', mapper)
+ event.listen(mapper, 'on_before_insert', some_listener)
Mapper events provide hooks into critical sections of the
mapper, including those related to object instrumentation,
return target
@classmethod
- def listen(cls, fn, identifier, target,
+ def listen(cls, target, identifier, fn,
raw=False, retval=False, propagate=False):
from sqlalchemy.orm.interfaces import EXT_CONTINUE
if propagate:
for mapper in target.self_and_descendants:
- event.Events.listen(fn, identifier, mapper, propagate=True)
+ event.Events.listen(mapper, identifier, fn, propagate=True)
else:
- event.Events.listen(fn, identifier, target)
+ event.Events.listen(target, identifier, fn)
def on_instrument_class(self, mapper, class_):
"""Receive a class when the mapper is first constructed, and has
"""
@classmethod
- def remove(cls, fn, identifier, target):
+ def remove(cls, identifier, target, fn):
raise NotImplementedError("Removal of mapper events not yet implemented")
class SessionEvents(event.Events):
Session = sessionmaker()
- event.listen(my_before_commit, "on_before_commit", Session)
+ event.listen(Session, "on_before_commit", my_before_commit)
The :func:`~.event.listen` function will accept
:class:`.Session` objects as well as the return result
return None
@classmethod
- def remove(cls, fn, identifier, target):
+ def remove(cls, identifier, target, fn):
raise NotImplementedError("Removal of session events not yet implemented")
def on_before_commit(self, session):
def my_append_listener(target, value, initiator):
print "received append event for target: %s" % target
- event.listen(my_append_listener, 'on_append', MyClass.collection)
+ event.listen(MyClass.collection, 'on_append', my_append_listener)
Listeners have the option to return a possibly modified version
of the value, when the ``retval=True`` flag is passed
# setup listener on UserContact.phone attribute, instructing
# it to use the return value
- listen(validate_phone, 'on_set', UserContact.phone, retval=True)
+ listen(UserContact.phone, 'on_set', validate_phone, retval=True)
A validation function like the above can also raise an exception
such as :class:`ValueError` to halt the operation.
"""
@classmethod
- def listen(cls, fn, identifier, target, active_history=False,
+ def listen(cls, target, identifier, fn, active_history=False,
raw=False, retval=False,
propagate=False):
if active_history:
return orig_fn(target, value, *arg)
fn = wrap
- event.Events.listen(fn, identifier, target, propagate)
+ event.Events.listen(target, identifier, fn, propagate)
if propagate:
from sqlalchemy.orm.instrumentation import manager_of_class
manager = manager_of_class(target.class_)
for mgr in manager.subclass_managers(True):
- event.Events.listen(fn, identifier, mgr[target.key], True)
+ event.Events.listen(mgr[target.key], identifier, fn, True)
@classmethod
- def remove(cls, fn, identifier, target):
+ def remove(cls, identifier, target, fn):
raise NotImplementedError("Removal of attribute events not yet implemented")
def on_append(self, target, value, initiator):
if manager.info.get(_INSTRUMENTOR, False):
return
- event.listen(_event_on_init, 'on_init', manager, raw=True)
- event.listen(_event_on_resurrect, 'on_resurrect', manager, raw=True)
+ event.listen(manager, 'on_init', _event_on_init, raw=True)
+ event.listen(manager, 'on_resurrect', _event_on_resurrect, raw=True)
for key, method in util.iterate_attributes(self.class_):
if isinstance(method, types.FunctionType):
if hasattr(method, '__sa_reconstructor__'):
self._reconstructor = method
- event.listen(_event_on_load, 'on_load', manager, raw=True)
+ event.listen(manager, 'on_load', _event_on_load, raw=True)
elif hasattr(method, '__sa_validators__'):
for name in method.__sa_validators__:
self._validators[name] = method
self.dispatch.update(_dispatch, only_propagate=False)
if events:
for fn, target in events:
- event.listen(fn, target, self)
+ event.listen(self, target, fn)
if listeners:
util.warn_deprecated(
"The 'listeners' argument to Pool (and "
def adapt_listener(target, connection, **kw):
listener(event_name, target, connection, **kw)
- event.listen(adapt_listener,
- "on_" + event_name.replace('-', '_'), self)
+ event.listen(self, "on_" + event_name.replace('-', '_'), adapt_listener)
def _set_parent(self, metadata):
metadata._add_table(self.name, self.schema, self)
return table in set(kw['tables']) and \
bind.dialect.supports_alter
- event.listen(AddConstraint(self, on=supports_alter), "on_after_create", table.metadata)
- event.listen(DropConstraint(self, on=supports_alter), "on_before_drop", table.metadata)
+ event.listen(table.metadata, "on_after_create", AddConstraint(self, on=supports_alter))
+ event.listen(table.metadata, "on_before_drop", DropConstraint(self, on=supports_alter))
def copy(self, **kw):
def adapt_listener(target, connection, **kw):
listener(event, target, connection, **kw)
- event.listen(adapt_listener,
- "on_" + event_name.replace('-', '_'), self)
+ event.listen(self, "on_" + event_name.replace('-', '_'), adapt_listener)
def create_all(self, bind=None, tables=None, checkfirst=True):
"""Create all tables stored in this metadata.
itself an event receiving callable::
event.listen(
- AddConstraint(constraint).execute_if(dialect='postgresql'),
+ users,
'on_after_create',
- users
+ AddConstraint(constraint).execute_if(dialect='postgresql')
)
See also:
target, connection, **kw):
return connection.execute(self.against(target))
- event.listen(call_event, "on_" + event_name.replace('-', '_'), target)
+ event.listen(target, "on_" + event_name.replace('-', '_'), call_event)
@expression._generative
def against(self, target):
Used to provide a wrapper for event listening::
event.listen(
- DDL("my_ddl").execute_if(dialect='postgresql'),
+ metadata,
'on_before_create',
- metadata
+ DDL("my_ddl").execute_if(dialect='postgresql')
)
:param dialect: May be a string, tuple or a callable
from sqlalchemy import event, DDL
tbl = Table('users', metadata, Column('uid', Integer))
- event.listen(DDL('DROP TRIGGER users_trigger'), 'on_before_create', tbl)
+ event.listen(tbl, 'on_before_create', DDL('DROP TRIGGER users_trigger'))
spow = DDL('ALTER TABLE %(table)s SET secretpowers TRUE')
- event.listen(spow.execute_if(dialect='somedb'), 'on_after_create', tbl)
+ event.listen(tbl, 'on_after_create', spow.execute_if(dialect='somedb'))
drop_spow = DDL('ALTER TABLE users SET secretpowers FALSE')
connection.execute(drop_spow)
def listen(x, y):
pass
- event.listen(listen, "on_event_one", Target)
+ event.listen(Target, "on_event_one", listen)
eq_(len(Target().dispatch.on_event_one), 1)
eq_(len(Target().dispatch.on_event_two), 0)
pass
t1 = Target()
- event.listen(listen, "on_event_one", t1)
+ event.listen(t1, "on_event_one", listen)
eq_(len(Target().dispatch.on_event_one), 0)
eq_(len(t1.dispatch.on_event_one), 1)
def listen_two(x, y):
pass
- event.listen(listen_one, "on_event_one", Target)
+ event.listen(Target, "on_event_one", listen_one)
t1 = Target()
- event.listen(listen_two, "on_event_one", t1)
+ event.listen(t1, "on_event_one", listen_two)
eq_(len(Target().dispatch.on_event_one), 1)
eq_(len(t1.dispatch.on_event_one), 2)
def listen_three(x, y):
pass
- event.listen(listen_three, "on_event_one", Target)
+ event.listen(Target, "on_event_one", listen_three)
eq_(len(Target().dispatch.on_event_one), 2)
eq_(len(t1.dispatch.on_event_one), 3)
def listen_four(x, y):
pass
- event.listen(listen_one, "on_event_one", TargetOne)
- event.listen(listen_two, "on_event_one", TargetTwo)
+ event.listen(TargetOne, "on_event_one", listen_one)
+ event.listen(TargetTwo, "on_event_one", listen_two)
eq_(
list(TargetOne().dispatch.on_event_one),
t1 = TargetOne()
t2 = TargetTwo()
- event.listen(listen_three, "on_event_one", t1)
- event.listen(listen_four, "on_event_one", t2)
+ event.listen(t1, "on_event_one", listen_three)
+ event.listen(t2, "on_event_one", listen_four)
eq_(
list(t1.dispatch.on_event_one),
def listen(x, y):
pass
- event.listen(listen, "on_event_one", "one")
+ event.listen("one", "on_event_one", listen)
eq_(
list(Target().dispatch.on_event_one),
class TargetEvents(event.Events):
@classmethod
- def listen(cls, fn, identifier, target, add=False):
+ def listen(cls, target, identifier, fn, add=False):
if add:
def adapt(x, y):
fn(x + y)
else:
adapt = fn
- event.Events.listen(adapt, identifier, target)
+ event.Events.listen(target, identifier, adapt)
def on_event_one(self, x, y):
pass
def listen_two(x, y):
result.append((x, y))
- event.listen(listen_one, "on_event_one", Target, add=True)
- event.listen(listen_two, "on_event_one", Target)
+ event.listen(Target, "on_event_one", listen_one, add=True)
+ event.listen(Target, "on_event_one", listen_two)
t1 = Target()
t1.dispatch.on_event_one(5, 7)
t1 = Target()
- event.listen(listen_one, "on_event_one", t1, propagate=True)
- event.listen(listen_two, "on_event_two", t1)
+ event.listen(t1, "on_event_one", listen_one, propagate=True)
+ event.listen(t1, "on_event_two", listen_two)
t2 = Target()
def test_table_create_before(self):
table, bind = self.table, self.bind
canary = self.Canary(table, bind)
- event.listen(canary.before_create, 'on_before_create', table)
+ event.listen(table, 'on_before_create', canary.before_create)
table.create(bind)
assert canary.state == 'before-create'
def test_table_create_after(self):
table, bind = self.table, self.bind
canary = self.Canary(table, bind)
- event.listen(canary.after_create, 'on_after_create', table)
+ event.listen(table, 'on_after_create', canary.after_create)
canary.state = 'skipped'
table.create(bind)
def test_table_create_both(self):
table, bind = self.table, self.bind
canary = self.Canary(table, bind)
- event.listen(canary.before_create, 'on_before_create', table)
- event.listen(canary.after_create, 'on_after_create', table)
+ event.listen(table, 'on_before_create', canary.before_create)
+ event.listen(table, 'on_after_create', canary.after_create)
table.create(bind)
assert canary.state == 'after-create'
def test_table_drop_before(self):
table, bind = self.table, self.bind
canary = self.Canary(table, bind)
- event.listen(canary.before_drop, 'on_before_drop', table)
+ event.listen(table, 'on_before_drop', canary.before_drop)
table.create(bind)
assert canary.state is None
def test_table_drop_after(self):
table, bind = self.table, self.bind
canary = self.Canary(table, bind)
- event.listen(canary.after_drop, 'on_after_drop', table)
+ event.listen(table, 'on_after_drop', canary.after_drop)
table.create(bind)
assert canary.state is None
table, bind = self.table, self.bind
canary = self.Canary(table, bind)
- event.listen(canary.before_drop, 'on_before_drop', table)
- event.listen(canary.after_drop, 'on_after_drop', table)
+ event.listen(table, 'on_before_drop', canary.before_drop)
+ event.listen(table, 'on_after_drop', canary.after_drop)
table.create(bind)
assert canary.state is None
table, bind = self.table, self.bind
canary = self.Canary(table, bind)
- event.listen(canary.before_create, 'on_before_create', table)
- event.listen(canary.after_create, 'on_after_create', table)
- event.listen(canary.before_drop, 'on_before_drop', table)
- event.listen(canary.after_drop, 'on_after_drop', table)
+ event.listen(table, 'on_before_create', canary.before_create)
+ event.listen(table, 'on_after_create', canary.after_create)
+ event.listen(table, 'on_before_drop', canary.before_drop)
+ event.listen(table, 'on_after_drop', canary.after_drop)
assert canary.state is None
table.create(bind)
def test_table_create_before(self):
metadata, bind = self.metadata, self.bind
canary = self.Canary(metadata, bind)
- event.listen(canary.before_create, 'on_before_create', metadata)
+ event.listen(metadata, 'on_before_create', canary.before_create)
metadata.create_all(bind)
assert canary.state == 'before-create'
def test_metadata_create_after(self):
metadata, bind = self.metadata, self.bind
canary = self.Canary(metadata, bind)
- event.listen(canary.after_create, 'on_after_create', metadata)
+ event.listen(metadata, 'on_after_create', canary.after_create)
canary.state = 'skipped'
metadata.create_all(bind)
metadata, bind = self.metadata, self.bind
canary = self.Canary(metadata, bind)
- event.listen(canary.before_create, 'on_before_create', metadata)
- event.listen(canary.after_create, 'on_after_create', metadata)
+ event.listen(metadata, 'on_before_create', canary.before_create)
+ event.listen(metadata, 'on_after_create', canary.after_create)
metadata.create_all(bind)
assert canary.state == 'after-create'
metadata, table, bind = self.metadata, self.table, self.bind
table_canary = self.Canary(table, bind)
- event.listen(table_canary.before_create, 'on_before_create', table)
+ event.listen(table, 'on_before_create', table_canary.before_create)
metadata_canary = self.Canary(metadata, bind)
- event.listen(metadata_canary.before_create, 'on_before_create',
- metadata)
+ event.listen(metadata, 'on_before_create', metadata_canary.before_create)
self.table.create(self.bind)
assert metadata_canary.state == None
def test_table_standalone(self):
users, engine = self.users, self.engine
- event.listen(DDL('mxyzptlk'), 'on_before_create', users)
- event.listen(DDL('klptzyxm'), 'on_after_create', users)
- event.listen(DDL('xyzzy'), 'on_before_drop', users)
- event.listen(DDL('fnord'), 'on_after_drop', users)
+ event.listen(users, 'on_before_create', DDL('mxyzptlk'))
+ event.listen(users, 'on_after_create', DDL('klptzyxm'))
+ event.listen(users, 'on_before_drop', DDL('xyzzy'))
+ event.listen(users, 'on_after_drop', DDL('fnord'))
users.create()
strings = [str(x) for x in engine.mock]
def test_table_by_metadata(self):
metadata, users, engine = self.metadata, self.users, self.engine
- event.listen(DDL('mxyzptlk'), 'on_before_create', users)
- event.listen(DDL('klptzyxm'), 'on_after_create', users)
- event.listen(DDL('xyzzy'), 'on_before_drop', users)
- event.listen(DDL('fnord'), 'on_after_drop', users)
+ event.listen(users, 'on_before_create', DDL('mxyzptlk'))
+ event.listen(users, 'on_after_create', DDL('klptzyxm'))
+ event.listen(users, 'on_before_drop', DDL('xyzzy'))
+ event.listen(users, 'on_after_drop', DDL('fnord'))
metadata.create_all()
strings = [str(x) for x in engine.mock]
def test_metadata(self):
metadata, engine = self.metadata, self.engine
- event.listen(DDL('mxyzptlk'), 'on_before_create', metadata)
- event.listen(DDL('klptzyxm'), 'on_after_create', metadata)
- event.listen(DDL('xyzzy'), 'on_before_drop', metadata)
- event.listen(DDL('fnord'), 'on_after_drop', metadata)
+ event.listen(metadata, 'on_before_create', DDL('mxyzptlk'))
+ event.listen(metadata, 'on_after_create', DDL('klptzyxm'))
+ event.listen(metadata, 'on_before_drop', DDL('xyzzy'))
+ event.listen(metadata, 'on_after_drop', DDL('fnord'))
metadata.create_all()
strings = [str(x) for x in engine.mock]
# 'inline_ddl' flag is set to False
event.listen(
- AddConstraint(constraint).execute_if(dialect='postgresql'),
+ users,
'on_after_create',
- users
+ AddConstraint(constraint).execute_if(dialect='postgresql'),
)
event.listen(
- DropConstraint(constraint).execute_if(dialect='postgresql'),
+ users,
'on_before_drop',
- users
+ DropConstraint(constraint).execute_if(dialect='postgresql'),
)
metadata.create_all(bind=nonpg_mock)
engines.testing_engine(options=dict(implicit_returning=False,
strategy='threadlocal'))
]:
- event.listen(execute, 'on_before_execute', engine)
- event.listen(cursor_execute, 'on_before_cursor_execute', engine)
+ event.listen(engine, 'on_before_execute', execute)
+ event.listen(engine, 'on_before_cursor_execute', cursor_execute)
m = MetaData(engine)
t1 = Table('t1', m,
canary.append('cursor_execute')
engine = engines.testing_engine()
- event.listen(on_execute, 'on_before_execute', engine)
- event.listen(on_cursor_execute, 'on_before_cursor_execute', engine)
+ event.listen(engine, 'on_before_execute', on_execute)
+ event.listen(engine, 'on_before_cursor_execute', on_cursor_execute)
conn = engine.connect()
c2 = conn.execution_options(foo='bar')
eq_(c2._execution_options, {'foo':'bar'})
assert_raises(
tsa.exc.ArgumentError,
- event.listen, tracker("on_begin"), "on_begin", engine, retval=True
+ event.listen, engine, "on_begin", tracker("on_begin"), retval=True
)
- event.listen(on_execute, "on_before_execute", engine, retval=True)
- event.listen(on_cursor_execute, "on_before_cursor_execute", engine, retval=True)
+ event.listen(engine, "on_before_execute", on_execute, retval=True)
+ event.listen(engine, "on_before_cursor_execute", on_cursor_execute, retval=True)
engine.execute("select 1")
eq_(
canary, ['execute', 'cursor_execute']
return go
engine = engines.testing_engine()
- event.listen(tracker('execute'), 'on_before_execute', engine)
- event.listen(tracker('cursor_execute'), 'on_before_cursor_execute', engine)
- event.listen(tracker('begin'), 'on_begin', engine)
- event.listen(tracker('commit'), 'on_commit', engine)
- event.listen(tracker('rollback'), 'on_rollback', engine)
+ event.listen(engine, 'on_before_execute', tracker('execute'))
+ event.listen(engine, 'on_before_cursor_execute', tracker('cursor_execute'))
+ event.listen(engine, 'on_begin', tracker('begin'))
+ event.listen(engine, 'on_commit', tracker('commit'))
+ event.listen(engine, 'on_rollback', tracker('rollback'))
conn = engine.connect()
trans = conn.begin()
'rollback_savepoint', 'release_savepoint',
'rollback', 'begin_twophase',
'prepare_twophase', 'commit_twophase']:
- event.listen(tracker(name), 'on_%s' % name, engine)
+ event.listen(engine, 'on_%s' % name, tracker(name))
conn = engine.connect()
def on_first_connect(*arg, **kw):
canary.append('first_connect')
- event.listen(on_first_connect, 'on_first_connect', p)
+ event.listen(p, 'on_first_connect', on_first_connect)
return p, canary
canary = []
def on_connect(*arg, **kw):
canary.append('connect')
- event.listen(on_connect, 'on_connect', p)
+ event.listen(p, 'on_connect', on_connect)
return p, canary
canary = []
def on_checkout(*arg, **kw):
canary.append('checkout')
- event.listen(on_checkout, 'on_checkout', p)
+ event.listen(p, 'on_checkout', on_checkout)
return p, canary
canary = []
def on_checkin(*arg, **kw):
canary.append('checkin')
- event.listen(on_checkin, 'on_checkin', p)
+ event.listen(p, 'on_checkin', on_checkin)
return p, canary
canary.append("listen_four")
engine = create_engine(testing.db.url)
- event.listen(listen_one, 'on_connect', pool.Pool)
- event.listen(listen_two, 'on_connect', engine.pool)
- event.listen(listen_three, 'on_connect', engine)
- event.listen(listen_four, 'on_connect', engine.__class__)
+ event.listen(pool.Pool, 'on_connect', listen_one)
+ event.listen(engine.pool, 'on_connect', listen_two)
+ event.listen(engine, 'on_connect', listen_three)
+ event.listen(engine.__class__, 'on_connect', listen_four)
engine.execute(select([1])).close()
eq_(
def listen_three(*args):
canary.append("listen_three")
- event.listen(listen_one, 'on_connect', pool.Pool)
- event.listen(listen_two, 'on_connect', pool.QueuePool)
- event.listen(listen_three, 'on_connect', pool.SingletonThreadPool)
+ event.listen(pool.Pool, 'on_connect', listen_one)
+ event.listen(pool.QueuePool, 'on_connect', listen_two)
+ event.listen(pool.SingletonThreadPool, 'on_connect', listen_three)
p1 = pool.QueuePool(creator=MockDBAPI().connect)
p2 = pool.SingletonThreadPool(creator=MockDBAPI().connect)
options = options or config.db_opts
engine = create_engine(url, **options)
- event.listen(asserter.execute, 'on_after_execute', engine)
- event.listen(asserter.cursor_execute, 'on_after_cursor_execute', engine)
- event.listen(testing_reaper.checkout, 'on_checkout', engine.pool)
+ event.listen(engine, 'on_after_execute', asserter.execute)
+ event.listen(engine, 'on_after_cursor_execute', asserter.cursor_execute)
+ event.listen(engine.pool, 'on_checkout', testing_reaper.checkout)
# may want to call this, results
# in first-connect initializers
attributes.register_attribute(Foo, 'barset', typecallable=set, uselist=True, useobject=True)
attributes.register_attribute(Bar, 'data', uselist=False, useobject=False)
- event.listen(on_set, 'on_set', Foo.data, retval=True)
- event.listen(on_append, 'on_append', Foo.barlist, retval=True)
- event.listen(on_append, 'on_append', Foo.barset, retval=True)
+ event.listen(Foo.data, 'on_set', on_set, retval=True)
+ event.listen(Foo.barlist, 'on_append', on_append, retval=True)
+ event.listen(Foo.barset, 'on_append', on_append, retval=True)
f1 = Foo()
f1.data = "some data"
canary.append(value)
def events_a():
- event.listen(on_set, 'on_set', classes[0].attrib, propagate=True)
+ event.listen(classes[0].attrib, 'on_set', on_set, propagate=True)
def teardown():
classes[:] = [None, None, None]
bind.engine.name not in ('oracle', 'mssql', 'sqlite')
),
):
- event.listen(ins, 'on_after_create', dt)
+ event.listen(dt, 'on_after_create', ins)
- event.listen(sa.DDL("DROP TRIGGER dt_ins"), 'on_before_drop', dt)
+ event.listen(dt, 'on_before_drop', sa.DDL("DROP TRIGGER dt_ins"))
for up in (
sa.DDL("CREATE TRIGGER dt_up AFTER UPDATE ON dt "
bind.engine.name not in ('oracle', 'mssql', 'sqlite')
),
):
- event.listen(up, 'on_after_create', dt)
+ event.listen(dt, 'on_after_create', up)
- event.listen(sa.DDL("DROP TRIGGER dt_up"), 'on_before_drop', dt)
+ event.listen(dt, 'on_before_drop', sa.DDL("DROP TRIGGER dt_up"))
@classmethod
manager = instrumentation.manager_of_class(cls)
def on_init(state, args, kwargs):
canary.append((cls, 'on_init', state.class_))
- event.listen(on_init, 'on_init', manager, raw=True)
+ event.listen(manager, 'on_init', on_init, raw=True)
def test_ai(self):
inits = []
try:
instrumentation.register_class(A)
manager = instrumentation.manager_of_class(A)
- event.listen(canary, 'on_load', manager)
+ event.listen(manager, 'on_load', canary)
a = A()
p_a = pickle.dumps(a)
def on_init_e(target, args, kwargs):
canary.append(('on_init_e', target))
- event.listen(on_init_a, 'on_init', mapper)
- event.listen(on_init_b, 'on_init', Mapper)
- event.listen(on_init_c, 'on_init', class_mapper(A))
- event.listen(on_init_d, 'on_init', A)
- event.listen(on_init_e, 'on_init', A, propagate=True)
+ event.listen(mapper, 'on_init', on_init_a)
+ event.listen(Mapper, 'on_init', on_init_b)
+ event.listen(class_mapper(A), 'on_init', on_init_c)
+ event.listen(A, 'on_init', on_init_d)
+ event.listen(A, 'on_init', on_init_e, propagate=True)
a = A()
eq_(canary, [('on_init_a', a),('on_init_b', a),
'on_before_delete',
'on_after_delete'
]:
- event.listen(evt(meth), meth, mapper, **kw)
+ event.listen(mapper, meth, evt(meth), **kw)
return canary
@testing.resolve_artifact_names
return u
mapper(User, users)
- event.listen(create_instance, 'on_create_instance',
- User, retval=True)
+ event.listen(User, 'on_create_instance', create_instance, retval=True)
sess = create_session()
u1 = User()
u1.name = 'ed'
def on_instrument_class(mapper, cls):
canary.append(cls)
- event.listen(on_instrument_class, 'on_instrument_class', Mapper)
+ event.listen(Mapper, 'on_instrument_class', on_instrument_class)
mapper(User, users)
eq_(canary, [User])
canary.called += 1
canary.called = 0
- event.listen(canary, 'on_load', cls)
+ event.listen(cls, 'on_load', canary)
return canary
def my_listener(*arg, **kw):
pass
- event.listen(my_listener, 'on_before_flush', Session)
+ event.listen(Session, 'on_before_flush', my_listener)
s = Session()
assert my_listener in s.dispatch.on_before_flush
S1 = sessionmaker()
S2 = sessionmaker()
- event.listen(my_listener_one, 'on_before_flush', Session)
- event.listen(my_listener_two, 'on_before_flush', S1)
+ event.listen(Session, 'on_before_flush', my_listener_one)
+ event.listen(S1, 'on_before_flush', my_listener_two)
s1 = S1()
assert my_listener_one in s1.dispatch.on_before_flush
sa.exc.ArgumentError,
"Session event listen on a ScopedSession "
"requries that its creation callable is a Session subclass.",
- event.listen, my_listener_one, "on_before_flush", scope
+ event.listen, scope, "on_before_flush", my_listener_one
)
def test_scoped_session_invalid_class(self):
sa.exc.ArgumentError,
"Session event listen on a ScopedSession "
"requries that its creation callable is a Session subclass.",
- event.listen, my_listener_one, "on_before_flush", scope
+ event.listen, scope, "on_before_flush", my_listener_one
)
def test_scoped_session_listen(self):
pass
scope = scoped_session(sessionmaker())
- event.listen(my_listener_one, "on_before_flush", scope)
+ event.listen(scope, "on_before_flush", my_listener_one)
assert my_listener_one in scope().dispatch.on_before_flush
'on_after_bulk_update',
'on_after_bulk_delete'
]:
- event.listen(listener(evt), evt, sess)
+ event.listen(sess, evt, listener(evt))
return sess, canary
session.flush()
sess = Session()
- event.listen(before_flush, 'on_before_flush', sess)
+ event.listen(sess, 'on_before_flush', before_flush)
sess.add(User(name='foo'))
assert_raises_message(sa.exc.InvalidRequestError,
'already flushing', sess.flush)
session.delete(x)
sess = Session()
- event.listen(before_flush, 'on_before_flush', sess)
+ event.listen(sess, 'on_before_flush', before_flush)
u = User(name='u1')
sess.add(u)
obj.name += " modified"
sess = Session(autoflush=True)
- event.listen(before_flush, 'on_before_flush', sess)
+ event.listen(sess, 'on_before_flush', before_flush)
u = User(name='u1')
sess.add(u)
mapper(User, users, batch=False)
evt = Events()
- event.listen(evt.before_insert, "on_before_insert", User)
- event.listen(evt.after_insert, "on_after_insert", User)
+ event.listen(User, "on_before_insert", evt.before_insert)
+ event.listen(User, "on_after_insert", evt.after_insert)
u1 = User(name='user1')
u2 = User(name='user2')
m = mapper(User, users)
evt = Events()
- event.listen(evt.before_insert, "on_before_insert", User)
- event.listen(evt.after_insert, "on_after_insert", User)
+ event.listen(User, "on_before_insert", evt.before_insert)
+ event.listen(User, "on_after_insert", evt.after_insert)
u1 = User(name='user1')
u2 = User(name='user2')