_registrars = util.defaultdict(list)
+def _is_event_name(name):
+ return not name.startswith('_') and name != 'dispatch'
+
class _UnpickleDispatch(object):
"""Serializable callable that re-generates an instance of :class:`_Dispatch`
given a particular :class:`.Events` subclass.
"""
- def __call__(self, parent_cls):
- return parent_cls.__dict__['dispatch'].dispatch_cls(parent_cls)
+ def __call__(self, _parent_cls):
+ return _parent_cls.__dict__['dispatch'].dispatch_cls(_parent_cls)
class _Dispatch(object):
"""Mirror the event listening definitions of an Events class with
"""
- def __init__(self, parent_cls):
- self.parent_cls = parent_cls
+ def __init__(self, _parent_cls):
+ self._parent_cls = _parent_cls
def __reduce__(self):
- return _UnpickleDispatch(), (self.parent_cls, )
+ return _UnpickleDispatch(), (self._parent_cls, )
@property
- def descriptors(self):
- return (getattr(self, k) for k in dir(self) if k.startswith("on_"))
+ def _descriptors(self):
+ return (getattr(self, k) for k in dir(self) if _is_event_name(k))
- def update(self, other, only_propagate=True):
+ def _update(self, other, only_propagate=True):
"""Populate from the listeners in another :class:`_Dispatch`
object."""
- for ls in other.descriptors:
- getattr(self, ls.name).update(ls, only_propagate=only_propagate)
+ for ls in other._descriptors:
+ getattr(self, ls.name)._update(ls, only_propagate=only_propagate)
class _EventMeta(type):
dispatch_cls._clear = cls._clear
for k in dict_:
- if k.startswith('on_'):
+ if _is_event_name(k):
setattr(dispatch_cls, k, _DispatchDescriptor(dict_[k]))
_registrars[k].append(cls)
def _remove_dispatcher(cls):
for k in dir(cls):
- if k.startswith('on_'):
+ if _is_event_name(k):
_registrars[k].remove(cls)
if not _registrars[k]:
del _registrars[k]
@classmethod
def _clear(cls):
for attr in dir(cls.dispatch):
- if attr.startswith("on_"):
+ if _is_event_name(attr):
getattr(cls.dispatch, attr).clear()
class _DispatchDescriptor(object):
if obj is None:
return self
obj.__dict__[self.__name__] = result = \
- _ListenerCollection(self, obj.parent_cls)
+ _ListenerCollection(self, obj._parent_cls)
return result
class _ListenerCollection(object):
def __nonzero__(self):
return bool(self.listeners or self.parent_listeners)
- def update(self, other, only_propagate=True):
+ def _update(self, other, only_propagate=True):
"""Populate from the listeners in another :class:`_Dispatch`
object."""
# immediate superclass
for base in manager._bases:
if key in base:
- self.dispatch.update(base[key].dispatch)
+ self.dispatch._update(base[key].dispatch)
dispatch = event.dispatcher(events.AttributeEvents)
- dispatch.dispatch_cls.active_history = False
+ dispatch.dispatch_cls._active_history = False
@util.memoized_property
def _supports_population(self):
ext._adapt_listener(attr, ext)
if active_history:
- self.dispatch.active_history = True
+ self.dispatch._active_history = True
self.expire_missing = expire_missing
def _get_active_history(self):
"""Backwards compat for impl.active_history"""
- return self.dispatch.active_history
+ return self.dispatch._active_history
def _set_active_history(self, value):
- self.dispatch.active_history = value
+ self.dispatch._active_history = value
active_history = property(_get_active_history, _set_active_history)
def delete(self, state, dict_):
# TODO: catch key errors, convert to attributeerror?
- if self.dispatch.active_history:
+ if self.dispatch._active_history:
old = self.get(state, dict_)
else:
old = dict_.get(self.key, NO_VALUE)
if initiator and initiator.parent_token is self.parent_token:
return
- if self.dispatch.active_history:
+ if self.dispatch._active_history:
old = self.get(state, dict_)
else:
old = dict_.get(self.key, NO_VALUE)
if initiator and initiator.parent_token is self.parent_token:
return
- if self.dispatch.active_history:
+ if self.dispatch._active_history:
old = self.get(state, dict_, passive=PASSIVE_ONLY_PERSISTENT)
else:
old = self.get(state, dict_, passive=PASSIVE_NO_FETCH)
raw=False, retval=False,
propagate=False):
if active_history:
- target.dispatch.active_history = True
+ target.dispatch._active_history = True
# TODO: for removal, need to package the identity
# of the wrapper with the original function.
def _configure_legacy_instrument_class(self):
if self.inherits:
- self.dispatch.update(self.inherits.dispatch)
+ self.dispatch._update(self.inherits.dispatch)
super_extensions = set(chain(*[m._deprecated_extensions
for m in self.inherits.iterate_to_root()]))
else:
ext._adapt_listener(self, ext)
if self.inherits:
- self.class_manager.dispatch.update(
+ self.class_manager.dispatch._update(
self.inherits.class_manager.dispatch)
def _configure_class_instrumentation(self):
self._reset_on_return = reset_on_return
self.echo = echo
if _dispatch:
- self.dispatch.update(_dispatch, only_propagate=False)
+ self.dispatch._update(_dispatch, only_propagate=False)
if events:
for fn, target in events:
event.listen(self, target, fn)
t2 = Target()
- t2.dispatch.update(t1.dispatch)
+ t2.dispatch._update(t1.dispatch)
t2.dispatch.on_event_one(t2, 1)
t2.dispatch.on_event_two(t2, 2)
# the error condition relies upon
# these things being true
- assert User.foo.dispatch.active_history is False
+ assert User.foo.dispatch._active_history is False
eq_(
attributes.get_history(u1, 'foo'),
([None], (), [attributes.PASSIVE_NO_RESULT])
instrumentation.register_class(A)
manager = instrumentation.manager_of_class(A)
- assert issubclass(manager.dispatch.parent_cls.__dict__['dispatch'].events, MyEvents)
+ assert issubclass(manager.dispatch._parent_cls.__dict__['dispatch'].events, MyEvents)
class NativeInstrumentationTest(_base.ORMTest):