deadline,
functools.partial(stack_context.wrap(callback), *args, **kwargs),
self)
- heapq.heappush(self._timeouts, timeout)
+ try:
+ heapq.heappush(self._timeouts, timeout)
+ except:
+ print(self._timeouts, timeout)
+ raise
return timeout
def remove_timeout(self, timeout):
"""Schedules the given callback to be called periodically.
The callback is called every ``callback_time`` milliseconds.
+ Note that the timeout is given in milliseconds, while most other
+ time-related functions in Tornado use seconds.
+
+ If the callback runs for longer than ``callback_time`` milliseconds,
+ subsequent invocations will be skipped to get back on schedule.
`start` must be called after the `PeriodicCallback` is created.
def _schedule_next(self):
if self._running:
current_time = self.io_loop.time()
-
+
if self._next_timeout <= current_time:
callback_time_sec = self.callback_time / 1000.0
self._next_timeout += (math.floor((current_time - self._next_timeout) / callback_time_sec) + 1) * callback_time_sec
import time
from tornado import gen
-from tornado.ioloop import IOLoop, TimeoutError
+from tornado.ioloop import IOLoop, TimeoutError, PollIOLoop, PeriodicCallback
from tornado.log import app_log
+from tornado.platform.select import _Select
from tornado.stack_context import ExceptionStackContext, StackContext, wrap, NullContext
from tornado.testing import AsyncTestCase, bind_unused_port, ExpectLog
from tornado.test.util import unittest, skipIfNonUnix, skipOnTravis
futures = None
+class FakeTimeSelect(_Select):
+ def __init__(self):
+ self._time = 1000
+ super(FakeTimeSelect, self).__init__()
+
+ def time(self):
+ return self._time
+
+ def sleep(self, t):
+ self._time += t
+
+ def poll(self, timeout):
+ events = super(FakeTimeSelect, self).poll(0)
+ if events:
+ return events
+ self._time += timeout
+ return []
+
+
+class FakeTimeIOLoop(PollIOLoop):
+ """IOLoop implementation with a fake and deterministic clock.
+
+ The clock advances as needed to trigger timeouts immediately.
+ For use when testing code that involves the passage of time
+ and no external dependencies.
+ """
+ def initialize(self):
+ self.fts = FakeTimeSelect()
+ super(FakeTimeIOLoop, self).initialize(impl=self.fts,
+ time_func=self.fts.time)
+
+ def sleep(self, t):
+ """Simulate a blocking sleep by advancing the clock."""
+ self.fts.sleep(t)
+
+
class TestIOLoop(AsyncTestCase):
@skipOnTravis
def test_add_callback_wakeup(self):
self.assertRaises(TimeoutError, self.io_loop.run_sync, f, timeout=0.01)
+class TestPeriodicCallback(unittest.TestCase):
+ def setUp(self):
+ self.io_loop = FakeTimeIOLoop()
+ self.io_loop.make_current()
+
+ def tearDown(self):
+ self.io_loop.close()
+
+ def test_basic(self):
+ calls = []
+
+ def cb():
+ calls.append(self.io_loop.time())
+ pc = PeriodicCallback(cb, 10000)
+ pc.start()
+ self.io_loop.call_later(50, self.io_loop.stop)
+ self.io_loop.start()
+ self.assertEqual(calls, [1010, 1020, 1030, 1040, 1050])
+
+ def test_overrun(self):
+ sleep_durations = [9, 9, 10, 11, 20, 20, 35, 35, 0, 0]
+ expected = [
+ 1010, 1020, 1030, # first 3 calls on schedule
+ 1050, 1070, # next 2 delayed one cycle
+ 1100, 1130, # next 2 delayed 2 cycles
+ 1170, 1210, # next 2 delayed 3 cycles
+ 1220, 1230, # then back on schedule.
+ ]
+ calls = []
+
+ def cb():
+ calls.append(self.io_loop.time())
+ if not sleep_durations:
+ self.io_loop.stop()
+ return
+ self.io_loop.sleep(sleep_durations.pop(0))
+ pc = PeriodicCallback(cb, 10000)
+ pc.start()
+ self.io_loop.start()
+ self.assertEqual(calls, expected)
+
+
if __name__ == "__main__":
unittest.main()