from __future__ import absolute_import, division, print_function, with_statement
+import collections
import datetime
import errno
import functools
self.time_func = time_func or time.time
self._handlers = {}
self._events = {}
- self._callbacks = []
- self._callback_lock = threading.Lock()
+ self._callbacks = collections.deque()
self._timeouts = []
self._cancellations = 0
self._running = False
self.READ)
def close(self, all_fds=False):
- with self._callback_lock:
- self._closing = True
+ self._waker.mark_closing()
+ self._closing = True
self.remove_handler(self._waker.fileno())
if all_fds:
for fd, handler in self._handlers.values():
try:
while True:
- # Prevent IO event starvation by delaying new callbacks
- # to the next iteration of the event loop.
- with self._callback_lock:
- callbacks = self._callbacks
- self._callbacks = []
-
# Add any timeouts that have come due to the callback list.
# Do not run anything until we have determined which ones
# are ready, so timeouts that call add_timeout cannot
if x.callback is not None]
heapq.heapify(self._timeouts)
- for callback in callbacks:
- self._run_callback(callback)
+ # Prevent IO event starvation by delaying new callbacks
+ # to the next iteration of the event loop.
+ n = len(self._callbacks)
+ for i in range(n):
+ self._run_callback(self._callbacks.popleft())
for timeout in due_timeouts:
if timeout.callback is not None:
self._run_callback(timeout.callback)
# Closures may be holding on to a lot of memory, so allow
# them to be freed before we go into our poll wait.
- callbacks = callback = due_timeouts = timeout = None
+ due_timeouts = timeout = None
if self._callbacks:
# If any callbacks or timeouts called add_callback,
self._cancellations += 1
def add_callback(self, callback, *args, **kwargs):
+ if self._closing:
+ return
+ # Blindly insert into self._callbacks. This is safe even
+ # from signal handlers because deque.append is atomic.
+ self._callbacks.append(functools.partial(
+ stack_context.wrap(callback), *args, **kwargs))
if thread.get_ident() != self._thread_ident:
- # If we're not on the IOLoop's thread, we need to synchronize
- # with other threads, or waking logic will induce a race.
- with self._callback_lock:
- if self._closing:
- return
- list_empty = not self._callbacks
- self._callbacks.append(functools.partial(
- stack_context.wrap(callback), *args, **kwargs))
- if list_empty:
- # If we're not in the IOLoop's thread, and we added the
- # first callback to an empty list, we may need to wake it
- # up (it may wake up on its own, but an occasional extra
- # wake is harmless). Waking up a polling IOLoop is
- # relatively expensive, so we try to avoid it when we can.
- self._waker.wake()
+ # This will write one byte but Waker.consume() reads many
+ # at once, so it's ok to write even when not strictly
+ # necessary.
+ self._waker.wake()
else:
- if self._closing:
- return
- # If we're on the IOLoop's thread, we don't need the lock,
- # since we don't need to wake anyone, just add the
- # callback. Blindly insert into self._callbacks. This is
- # safe even from signal handlers because the GIL makes
- # list.append atomic. One subtlety is that if the signal
- # is interrupting another thread holding the
- # _callback_lock block in IOLoop.start, we may modify
- # either the old or new version of self._callbacks, but
- # either way will work.
- self._callbacks.append(functools.partial(
- stack_context.wrap(callback), *args, **kwargs))
+ # If we're on the IOLoop's thread, we don't need to wake anyone.
+ pass
def add_callback_from_signal(self, callback, *args, **kwargs):
with stack_context.NullContext():