import time
import math
import random
+from inspect import isawaitable
from tornado.concurrent import (
Future,
def add_timeout(
self,
deadline: Union[float, datetime.timedelta],
- callback: Callable[..., None],
+ callback: Callable[..., Optional[Awaitable]],
*args: Any,
**kwargs: Any
) -> object:
raise TypeError("Unsupported deadline %r" % deadline)
def call_later(
- self, delay: float, callback: Callable[..., None], *args: Any, **kwargs: Any
+ self, delay: float, callback: Callable, *args: Any, **kwargs: Any
) -> object:
"""Runs the ``callback`` after ``delay`` seconds have passed.
return self.call_at(self.time() + delay, callback, *args, **kwargs)
def call_at(
- self, when: float, callback: Callable[..., None], *args: Any, **kwargs: Any
+ self, when: float, callback: Callable, *args: Any, **kwargs: Any
) -> object:
"""Runs the ``callback`` at the absolute time designated by ``when``.
.. versionchanged:: 5.1
The ``jitter`` argument is added.
+
+ .. versionchanged:: 6.2
+ If the ``callback`` argument is a coroutine, and a callback runs for
+ longer than ``callback_time``, subsequent invocations will be skipped.
+ Previously this was only true for regular functions, not coroutines,
+ which were "fire-and-forget" for `PeriodicCallback`.
"""
def __init__(
self,
- callback: Callable[[], None],
+ callback: Callable[[], Optional[Awaitable]],
callback_time: Union[datetime.timedelta, float],
jitter: float = 0,
) -> None:
"""
return self._running
- def _run(self) -> None:
+ async def _run(self) -> None:
if not self._running:
return
try:
- return self.callback()
+ val = self.callback()
+ if val is not None and isawaitable(val):
+ await val
except Exception:
app_log.error("Exception in callback %r", self.callback, exc_info=True)
finally:
self.io_loop.add_handler(client.fileno(), handler, IOLoop.READ)
self.io_loop.add_timeout(
- self.io_loop.time() + 0.01, functools.partial(server.send, b"asdf") # type: ignore
+ self.io_loop.time() + 0.01, functools.partial(server.send, b"asdf")
)
self.wait()
self.io_loop.remove_handler(client.fileno())
self.assertEqual(pc.callback_time, expected_callback_time)
+class TestPeriodicCallbackAsync(AsyncTestCase):
+ def test_periodic_plain(self):
+ count = 0
+
+ def callback() -> None:
+ nonlocal count
+ count += 1
+ if count == 3:
+ self.stop()
+
+ pc = PeriodicCallback(callback, 10)
+ pc.start()
+ self.wait()
+ pc.stop()
+ self.assertEqual(count, 3)
+
+ def test_periodic_coro(self):
+ counts = [0, 0]
+ pc = None
+
+ @gen.coroutine
+ def callback() -> None:
+ counts[0] += 1
+ yield gen.sleep(0.025)
+ counts[1] += 1
+ if counts[1] == 3:
+ pc.stop()
+ self.io_loop.add_callback(self.stop)
+
+ pc = PeriodicCallback(callback, 10)
+ pc.start()
+ self.wait()
+ self.assertEqual(counts[0], 3)
+ self.assertEqual(counts[1], 3)
+
+ def test_periodic_async(self):
+ counts = [0, 0]
+ pc = None
+
+ async def callback() -> None:
+ counts[0] += 1
+ await gen.sleep(0.025)
+ counts[1] += 1
+ if counts[1] == 3:
+ pc.stop()
+ self.io_loop.add_callback(self.stop)
+
+ pc = PeriodicCallback(callback, 10)
+ pc.start()
+ self.wait()
+ self.assertEqual(counts[0], 3)
+ self.assertEqual(counts[1], 3)
+
+
class TestIOLoopConfiguration(unittest.TestCase):
def run_python(self, *statements):
stmt_list = [