"""A level-triggered I/O loop for non-blocking sockets."""
-import bisect
import errno
+import heapq
import os
import logging
import select
if self._timeouts:
now = time.time()
- while self._timeouts and self._timeouts[0].deadline <= now:
- timeout = self._timeouts.pop(0)
- self._run_callback(timeout.callback)
- if self._timeouts:
- milliseconds = self._timeouts[0].deadline - now
- poll_timeout = min(milliseconds, poll_timeout)
+ while self._timeouts:
+ if self._timeouts[0].callback is None:
+ # the timeout was cancelled
+ heapq.heappop(self._timeouts)
+ elif self._timeouts[0].deadline <= now:
+ timeout = heapq.heappop(self._timeouts)
+ self._run_callback(timeout.callback)
+ else:
+ milliseconds = self._timeouts[0].deadline - now
+ poll_timeout = min(milliseconds, poll_timeout)
+ break
if not self._running:
break
Returns a handle that may be passed to remove_timeout to cancel.
"""
timeout = _Timeout(deadline, stack_context.wrap(callback))
- bisect.insort(self._timeouts, timeout)
+ heapq.heappush(self._timeouts, timeout)
return timeout
def remove_timeout(self, timeout):
The argument is a handle as returned by add_timeout.
"""
- self._timeouts.remove(timeout)
+ # Removing from a heap is complicated, so just leave the defunct
+ # timeout object in the queue (see discussion in
+ # http://docs.python.org/library/heapq.html).
+ # If this turns out to be a problem, we could add a garbage
+ # collection pass whenever there are too many dead timeouts.
+ timeout.callback = None
def add_callback(self, callback):
"""Calls the given callback on the next I/O loop iteration.
self.callback = callback
def __lt__(self, other):
- return ((self.deadline, id(self.callback)) <
- (other.deadline, id(other.callback)))
+ return ((self.deadline, id(self)) <
+ (other.deadline, id(other)))
class PeriodicCallback(object):