object."""
for ls in _event_descriptors(other):
- getattr(self, ls.name)._update(ls, only_propagate=only_propagate)
+ getattr(self, ls.name).\
+ for_modify(self)._update(ls, only_propagate=only_propagate)
def _event_descriptors(target):
return [getattr(target, k) for k in dir(target) if _is_event_name(k)]
@classmethod
def _listen(cls, target, identifier, fn, propagate=False, insert=False):
if insert:
- getattr(target.dispatch, identifier).insert(fn, target, propagate)
+ getattr(target.dispatch, identifier).\
+ for_modify(target.dispatch).insert(fn, target, propagate)
else:
- getattr(target.dispatch, identifier).append(fn, target, propagate)
+ getattr(target.dispatch, identifier).\
+ for_modify(target.dispatch).append(fn, target, propagate)
@classmethod
def _remove(cls, target, identifier, fn):
self.__name__ = fn.__name__
self.__doc__ = fn.__doc__
self._clslevel = util.defaultdict(list)
+ self._empty_listeners = {}
def insert(self, obj, target, propagate):
assert isinstance(target, type), \
for dispatcher in self._clslevel.values():
dispatcher[:] = []
+ def for_modify(self, obj):
+ """Return an event collection which can be modified.
+
+ For _DispatchDescriptor at the class level of
+ a dispatcher, this returns self.
+
+ """
+ return self
+
def __get__(self, obj, cls):
if obj is None:
return self
- obj.__dict__[self.__name__] = result = \
- _ListenerCollection(self, obj._parent_cls)
+ elif obj._parent_cls in self._empty_listeners:
+ ret = self._empty_listeners[obj._parent_cls]
+ else:
+ self._empty_listeners[obj._parent_cls] = ret = \
+ _EmptyListener(self, obj._parent_cls)
+ # assigning it to __dict__ means
+ # memoized for fast re-access. but more memory.
+ obj.__dict__[self.__name__] = ret
+ return ret
+
+class _EmptyListener(object):
+ """Serves as a class-level interface to the events
+ served by a _DispatchDescriptor, when there are no
+ instance-level events present.
+
+ Is replaced by _ListenerCollection when instance-level
+ events are added.
+
+ """
+ def __init__(self, parent, target_cls):
+ if target_cls not in parent._clslevel:
+ parent.update_subclass(target_cls)
+ self.parent = parent
+ self.parent_listeners = parent._clslevel[target_cls]
+ self.name = parent.__name__
+ self.propagate = frozenset()
+ self.listeners = ()
+
+ def for_modify(self, obj):
+ """Return an event collection which can be modified.
+
+ For _EmptyListener at the instance level of
+ a dispatcher, this generates a new
+ _ListenerCollection, applies it to the instance,
+ and returns it.
+
+ """
+ obj.__dict__[self.name] = result = _ListenerCollection(
+ self.parent, obj._parent_cls)
return result
+ def _needs_modify(self, *args, **kw):
+ raise NotImplementedError("need to call for_modify()")
+
+ exec_once = insert = append = remove = clear = _needs_modify
+
+ def __call__(self, *args, **kw):
+ """Execute this event."""
+
+ for fn in self.parent_listeners:
+ fn(*args, **kw)
+
+ def __len__(self):
+ return len(self.parent_listeners)
+
+ def __iter__(self):
+ return iter(self.parent_listeners)
+
+ def __getitem__(self, index):
+ return (self.parent_listeners)[index]
+
+ def __nonzero__(self):
+ return bool(self.listeners)
+
+
class _ListenerCollection(object):
"""Instance-level attributes on instances of :class:`._Dispatch`.
Represents a collection of listeners.
+ As of 0.7.9, _ListenerCollection is only first
+ created via the _EmptyListener.for_modify() method.
+
"""
_exec_once = False
self.listeners = []
self.propagate = set()
+ def for_modify(self, obj):
+ """Return an event collection which can be modified.
+
+ For _ListenerCollection at the instance level of
+ a dispatcher, this returns self.
+
+ """
+ return self
+
def exec_once(self, *args, **kw):
"""Execute this event, but only if it has not been
executed already for this collection."""
# I'm not entirely thrilled about the overhead here,
# but this allows class-level listeners to be added
# at any point.
- #
- # alternatively, _DispatchDescriptor could notify
- # all _ListenerCollection objects, but then we move
- # to a higher memory model, i.e.weakrefs to all _ListenerCollection
- # objects, the _DispatchDescriptor collection repeated
- # for all instances.
+ #
+ # In the absense of instance-level listeners,
+ # we stay with the _EmptyListener object when called
+ # at the instance level.
def __len__(self):
return len(self.parent_listeners + self.listeners)
"""Test event registration and listening."""
-from test.lib.testing import eq_, assert_raises
+from test.lib.testing import eq_, assert_raises, assert_raises_message, \
+ is_, is_not_
from sqlalchemy import event, exc, util
from test.lib import fixtures
[listen_two]
)
+ def test_no_instance_level_collections(self):
+ @event.listens_for(self.Target, "event_one")
+ def listen_one(x, y):
+ pass
+ t1 = self.Target()
+ t2 = self.Target()
+ t1.dispatch.event_one(5, 6)
+ t2.dispatch.event_one(5, 6)
+ is_(
+ t1.dispatch.__dict__['event_one'],
+ self.Target.dispatch.event_one.\
+ _empty_listeners[self.Target]
+ )
+
+ @event.listens_for(t1, "event_one")
+ def listen_two(x, y):
+ pass
+ is_not_(
+ t1.dispatch.__dict__['event_one'],
+ self.Target.dispatch.event_one.\
+ _empty_listeners[self.Target]
+ )
+ is_(
+ t2.dispatch.__dict__['event_one'],
+ self.Target.dispatch.event_one.\
+ _empty_listeners[self.Target]
+ )
+
+ def test_immutable_methods(self):
+ t1 = self.Target()
+ for meth in [
+ t1.dispatch.event_one.exec_once,
+ t1.dispatch.event_one.insert,
+ t1.dispatch.event_one.append,
+ t1.dispatch.event_one.remove,
+ t1.dispatch.event_one.clear,
+ ]:
+ assert_raises_message(
+ NotImplementedError,
+ r"need to call for_modify\(\)",
+ meth
+ )
+
class TestClsLevelListen(fixtures.TestBase):
+
+
+ def tearDown(self):
+ event._remove_dispatcher(self.TargetOne.__dict__['dispatch'].events)
+
def setUp(self):
class TargetEventsOne(event.Events):
def event_one(self, x, y):
assert handler2 not in s2.dispatch.event_one
-class TestClsLevelListen(fixtures.TestBase):
- def setUp(self):
- class TargetEventsOne(event.Events):
- def event_one(self, x, y):
- pass
- class TargetOne(object):
- dispatch = event.dispatcher(TargetEventsOne)
- self.TargetOne = TargetOne
-
- def tearDown(self):
- event._remove_dispatcher(self.TargetOne.__dict__['dispatch'].events)
-
- def test_lis_subcalss_lis(self):
- @event.listens_for(self.TargetOne, "event_one")
- def handler1(x, y):
- print 'handler1'
-
- class SubTarget(self.TargetOne):
- pass
-
- @event.listens_for(self.TargetOne, "event_one")
- def handler2(x, y):
- pass
-
- eq_(
- len(SubTarget().dispatch.event_one),
- 2
- )
class TestAcceptTargets(fixtures.TestBase):
"""Test default target acceptance."""