def _schedule_next(self):
if self._running:
- current_time = self.io_loop.time()
-
- if self._next_timeout <= current_time:
- callback_time_sec = self.callback_time / 1000.0
- self._next_timeout += (math.floor((current_time - self._next_timeout) /
- callback_time_sec) + 1) * callback_time_sec
-
+ self._update_next(self.io_loop.time())
self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run)
+
+ def _update_next(self, current_time):
+ if self._next_timeout <= current_time:
+ callback_time_sec = self.callback_time / 1000.0
+ self._next_timeout += (math.floor((current_time - self._next_timeout) /
+ callback_time_sec) + 1) * callback_time_sec
io_loop.close()
+class TestPeriodicCallbackMath(unittest.TestCase):
+ def simulate_calls(self, pc, durations):
+ """Simulate a series of calls to the PeriodicCallback.
+
+ Pass a list of call durations in seconds. This method
+ returns the times at which each call would be made.
+ """
+ calls = []
+ now = 1000
+ pc._next_timeout = now
+ for d in durations:
+ pc._update_next(now)
+ calls.append(pc._next_timeout)
+ now = pc._next_timeout + d
+ return calls
+
+ def test_basic(self):
+ pc = PeriodicCallback(None, 10000)
+ self.assertEqual(self.simulate_calls(pc, [0] * 5),
+ [1010, 1020, 1030, 1040, 1050])
+
+ def test_overrun(self):
+ # If a call runs for too long, we skip entire cycles to get
+ # back on schedule.
+ call_durations = [9, 9, 10, 11, 20, 20, 35, 35, 0, 0, 0]
+ expected = [
+ 1010, 1020, 1030, # first 3 calls on schedule
+ 1050, 1070, # next 2 delayed one cycle
+ 1100, 1130, # next 2 delayed 2 cycles
+ 1170, 1210, # next 2 delayed 3 cycles
+ 1220, 1230, # then back on schedule.
+ ]
+
+ pc = PeriodicCallback(None, 10000)
+ self.assertEqual(self.simulate_calls(pc, call_durations),
+ expected)
+
+
class TestIOLoopConfiguration(unittest.TestCase):
def run_python(self, *statements):
statements = [