self._callbacks = []
self._callback_lock = threading.Lock()
self._timeouts = []
+ self._cancellations = 0
self._running = False
self._stopped = False
self._closing = False
if self._timeouts[0].callback is None:
# the timeout was cancelled
heapq.heappop(self._timeouts)
+ self._cancellations -= 1
elif self._timeouts[0].deadline <= now:
timeout = heapq.heappop(self._timeouts)
self._run_callback(timeout.callback)
seconds = self._timeouts[0].deadline - now
poll_timeout = min(seconds, poll_timeout)
break
+ if (self._cancellations > 512
+ and self._cancellations > (len(self._timeouts) >> 1)):
+ # Clean up the timeout queue when it gets large and it's
+ # more than half cancellations.
+ self._cancellations = 0
+ self._timeouts = [x for x in self._timeouts
+ if x.callback is not None]
+ heapq.heapify(self._timeouts)
if self._callbacks:
# If any callbacks or timeouts called add_callback,
# If this turns out to be a problem, we could add a garbage
# collection pass whenever there are too many dead timeouts.
timeout.callback = None
+ self._cancellations += 1
def add_callback(self, callback, *args, **kwargs):
with self._callback_lock:
self.wait()
self.io_loop.remove_timeout(handle)
+ def test_remove_timeout_cleanup(self):
+ # Add and remove enough callbacks to trigger cleanup.
+ # Not a very thorough test, but it ensures that the cleanup code
+ # gets executed and doesn't blow up. This test is only really useful
+ # on PollIOLoop subclasses, but it should run silently on any
+ # implementation.
+ for i in range(2000):
+ timeout = self.io_loop.add_timeout(self.io_loop.time() + 3600,
+ lambda: None)
+ self.io_loop.remove_timeout(timeout)
+ # HACK: wait two IOLoop iterations for the GC to happen.
+ self.io_loop.add_callback(lambda: self.io_loop.add_callback(self.stop))
+ self.wait()
+
# Deliberately not a subclass of AsyncTestCase so the IOLoop isn't
# automatically set as current.