"""
import asyncio
-from concurrent.futures import ThreadPoolExecutor
+import concurrent.futures
import datetime
import logging
import numbers
from tornado.concurrent import Future, is_future, chain_future, future_set_exc_info, future_add_done_callback # noqa: E501
from tornado.log import app_log
-from tornado.util import Configurable, TimeoutError, unicode_type, import_object
+from tornado.util import Configurable, TimeoutError, import_object
-import typing # noqa
+import typing
+from typing import Union, Any, Type, Optional, Callable, TypeVar, Tuple, Awaitable
+if typing.TYPE_CHECKING:
+ from typing import Dict, List # noqa: F401
+
+ from typing_extensions import Protocol
+else:
+ Protocol = object
+
+
+class _Selectable(Protocol):
+ def fileno(self) -> int:
+ pass
+
+ def close(self) -> None:
+ pass
+
+
+_T = TypeVar('_T')
+_S = TypeVar('_S', bound=_Selectable)
class IOLoop(Configurable):
ERROR = 0x018
# In Python 3, _ioloop_for_asyncio maps from asyncio loops to IOLoops.
- _ioloop_for_asyncio = dict() # type: typing.Dict[typing.Any, typing.Any]
+ _ioloop_for_asyncio = dict() # type: Dict[asyncio.AbstractEventLoop, IOLoop]
@classmethod
- def configure(cls, impl, **kwargs):
+ def configure(cls, impl: Union[None, str, Type[Configurable]], **kwargs: Any) -> None:
if asyncio is not None:
from tornado.platform.asyncio import BaseAsyncIOLoop
- if isinstance(impl, (str, unicode_type)):
+ if isinstance(impl, str):
impl = import_object(impl)
- if not issubclass(impl, BaseAsyncIOLoop):
+ if isinstance(impl, type) and not issubclass(impl, BaseAsyncIOLoop):
raise RuntimeError(
"only AsyncIOLoop is allowed when asyncio is available")
super(IOLoop, cls).configure(impl, **kwargs)
@staticmethod
- def instance():
+ def instance() -> 'IOLoop':
"""Deprecated alias for `IOLoop.current()`.
.. versionchanged:: 5.0
"""
return IOLoop.current()
- def install(self):
+ def install(self) -> None:
"""Deprecated alias for `make_current()`.
.. versionchanged:: 5.0
self.make_current()
@staticmethod
- def clear_instance():
+ def clear_instance() -> None:
"""Deprecated alias for `clear_current()`.
.. versionchanged:: 5.0
"""
IOLoop.clear_current()
+ @typing.overload
+ @staticmethod
+ def current() -> 'IOLoop':
+ pass
+
+ @typing.overload # noqa: F811
@staticmethod
- def current(instance=True):
+ def current(instance: bool=True) -> Optional['IOLoop']:
+ pass
+
+ @staticmethod # noqa: F811
+ def current(instance: bool=True) -> Optional['IOLoop']:
"""Returns the current thread's `IOLoop`.
If an `IOLoop` is currently running or has been marked as
except KeyError:
if instance:
from tornado.platform.asyncio import AsyncIOMainLoop
- current = AsyncIOMainLoop(make_current=True)
+ current = AsyncIOMainLoop(make_current=True) # type: Optional[IOLoop]
else:
current = None
return current
- def make_current(self):
+ def make_current(self) -> None:
"""Makes this the `IOLoop` for the current thread.
An `IOLoop` automatically becomes current for its thread
raise NotImplementedError()
@staticmethod
- def clear_current():
+ def clear_current() -> None:
"""Clears the `IOLoop` for the current thread.
Intended primarily for use by test frameworks in between tests.
if asyncio is None:
IOLoop._current.instance = None
- def _clear_current_hook(self):
+ def _clear_current_hook(self) -> None:
"""Instance method called when an IOLoop ceases to be current.
May be overridden by subclasses as a counterpart to make_current.
pass
@classmethod
- def configurable_base(cls):
+ def configurable_base(cls) -> Type[Configurable]:
return IOLoop
@classmethod
- def configurable_default(cls):
+ def configurable_default(cls) -> Type[Configurable]:
from tornado.platform.asyncio import AsyncIOLoop
return AsyncIOLoop
- def initialize(self, make_current=None):
+ def initialize(self, make_current: bool=None) -> None:
if make_current is None:
if IOLoop.current(instance=False) is None:
self.make_current()
raise RuntimeError("current IOLoop already exists")
self.make_current()
- def close(self, all_fds=False):
+ def close(self, all_fds: bool=False) -> None:
"""Closes the `IOLoop`, freeing any resources used.
If ``all_fds`` is true, all file descriptors registered on the
"""
raise NotImplementedError()
- def add_handler(self, fd, handler, events):
+ @typing.overload
+ def add_handler(self, fd: int, handler: Callable[[int, int], None], events: int) -> None:
+ pass
+
+ @typing.overload # noqa: F811
+ def add_handler(self, fd: _S, handler: Callable[[_S, int], None], events: int) -> None:
+ pass
+
+ def add_handler(self, fd: Union[int, _Selectable], # noqa: F811
+ handler: Callable[..., None], events: int) -> None:
"""Registers the given handler to receive the given events for ``fd``.
The ``fd`` argument may either be an integer file descriptor or
- a file-like object with a ``fileno()`` method (and optionally a
- ``close()`` method, which may be called when the `IOLoop` is shut
- down).
+ a file-like object with a ``fileno()`` and ``close()`` method.
The ``events`` argument is a bitwise or of the constants
``IOLoop.READ``, ``IOLoop.WRITE``, and ``IOLoop.ERROR``.
"""
raise NotImplementedError()
- def update_handler(self, fd, events):
+ def update_handler(self, fd: Union[int, _Selectable], events: int) -> None:
"""Changes the events we listen for ``fd``.
.. versionchanged:: 4.0
"""
raise NotImplementedError()
- def remove_handler(self, fd):
+ def remove_handler(self, fd: Union[int, _Selectable]) -> None:
"""Stop listening for events on ``fd``.
.. versionchanged:: 4.0
"""
raise NotImplementedError()
- def start(self):
+ def start(self) -> None:
"""Starts the I/O loop.
The loop will run until one of the callbacks calls `stop()`, which
"""
raise NotImplementedError()
- def _setup_logging(self):
+ def _setup_logging(self) -> None:
"""The IOLoop catches and logs exceptions, so it's
important that log output be visible. However, python's
default behavior for non-root loggers (prior to python
logging.getLogger('tornado.application').handlers]):
logging.basicConfig()
- def stop(self):
+ def stop(self) -> None:
"""Stop the I/O loop.
If the event loop is not currently running, the next call to `start()`
"""
raise NotImplementedError()
- def run_sync(self, func, timeout=None):
+ def run_sync(self, func: Callable, timeout: float=None) -> Any:
"""Starts the `IOLoop`, runs the given function, and stops the loop.
The function must return either an awaitable object or
If a timeout occurs, the ``func`` coroutine will be cancelled.
"""
- future_cell = [None]
+ future_cell = [None] # type: List[Optional[Future]]
- def run():
+ def run() -> None:
try:
result = func()
if result is not None:
from tornado.gen import convert_yielded
result = convert_yielded(result)
except Exception:
- future_cell[0] = Future()
- future_set_exc_info(future_cell[0], sys.exc_info())
+ fut = Future() # type: Future[Any]
+ future_cell[0] = fut
+ future_set_exc_info(fut, sys.exc_info())
else:
if is_future(result):
future_cell[0] = result
else:
- future_cell[0] = Future()
- future_cell[0].set_result(result)
+ fut = Future()
+ future_cell[0] = fut
+ fut.set_result(result)
+ assert future_cell[0] is not None
self.add_future(future_cell[0], lambda future: self.stop())
self.add_callback(run)
if timeout is not None:
- def timeout_callback():
+ def timeout_callback() -> None:
# If we can cancel the future, do so and wait on it. If not,
# Just stop the loop and return with the task still pending.
# (If we neither cancel nor wait for the task, a warning
# will be logged).
+ assert future_cell[0] is not None
if not future_cell[0].cancel():
self.stop()
timeout_handle = self.add_timeout(self.time() + timeout, timeout_callback)
self.start()
if timeout is not None:
self.remove_timeout(timeout_handle)
+ assert future_cell[0] is not None
if future_cell[0].cancelled() or not future_cell[0].done():
raise TimeoutError('Operation timed out after %s seconds' % timeout)
return future_cell[0].result()
- def time(self):
+ def time(self) -> float:
"""Returns the current time according to the `IOLoop`'s clock.
The return value is a floating-point number relative to an
"""
return time.time()
- def add_timeout(self, deadline, callback, *args, **kwargs):
+ def add_timeout(self, deadline: Union[float, datetime.timedelta],
+ callback: Callable[..., None],
+ *args: Any, **kwargs: Any) -> object:
"""Runs the ``callback`` at the time ``deadline`` from the I/O loop.
Returns an opaque handle that may be passed to
else:
raise TypeError("Unsupported deadline %r" % deadline)
- def call_later(self, delay, callback, *args, **kwargs):
+ def call_later(self, delay: float, callback: Callable[..., None],
+ *args: Any, **kwargs: Any) -> object:
"""Runs the ``callback`` after ``delay`` seconds have passed.
Returns an opaque handle that may be passed to `remove_timeout`
"""
return self.call_at(self.time() + delay, callback, *args, **kwargs)
- def call_at(self, when, callback, *args, **kwargs):
+ def call_at(self, when: float, callback: Callable[..., None],
+ *args: Any, **kwargs: Any) -> object:
"""Runs the ``callback`` at the absolute time designated by ``when``.
``when`` must be a number using the same reference point as
"""
return self.add_timeout(when, callback, *args, **kwargs)
- def remove_timeout(self, timeout):
+ def remove_timeout(self, timeout: object) -> None:
"""Cancels a pending timeout.
The argument is a handle as returned by `add_timeout`. It is
"""
raise NotImplementedError()
- def add_callback(self, callback, *args, **kwargs):
+ def add_callback(self, callback: Callable,
+ *args: Any, **kwargs: Any) -> None:
"""Calls the given callback on the next I/O loop iteration.
It is safe to call this method from any thread at any time,
"""
raise NotImplementedError()
- def add_callback_from_signal(self, callback, *args, **kwargs):
+ def add_callback_from_signal(self, callback: Callable,
+ *args: Any, **kwargs: Any) -> None:
"""Calls the given callback on the next I/O loop iteration.
Safe for use from a Python signal handler; should not be used
"""
raise NotImplementedError()
- def spawn_callback(self, callback, *args, **kwargs):
+ def spawn_callback(self, callback: Callable,
+ *args: Any, **kwargs: Any) -> None:
"""Calls the given callback on the next IOLoop iteration.
As of Tornado 6.0, this method is equivalent to `add_callback`.
"""
self.add_callback(callback, *args, **kwargs)
- def add_future(self, future, callback):
+ def add_future(self, future: Union['Future[_T]', 'concurrent.futures.Future[_T]'],
+ callback: Callable[['Future[_T]'], None]) -> None:
"""Schedules a callback on the ``IOLoop`` when the given
`.Future` is finished.
future_add_done_callback(
future, lambda future: self.add_callback(callback, future))
- def run_in_executor(self, executor, func, *args):
+ def run_in_executor(self, executor: Optional[concurrent.futures.Executor],
+ func: Callable[..., _T], *args: Any) -> Awaitable[_T]:
"""Runs a function in a ``concurrent.futures.Executor``. If
``executor`` is ``None``, the IO loop's default executor will be used.
if executor is None:
if not hasattr(self, '_executor'):
from tornado.process import cpu_count
- self._executor = ThreadPoolExecutor(max_workers=(cpu_count() * 5))
+ self._executor = concurrent.futures.ThreadPoolExecutor(
+ max_workers=(cpu_count() * 5)) # type: concurrent.futures.Executor
executor = self._executor
c_future = executor.submit(func, *args)
# Concurrent Futures are not usable with await. Wrap this in a
# Tornado Future instead, using self.add_future for thread-safety.
- t_future = Future()
+ t_future = Future() # type: Future[_T]
self.add_future(c_future, lambda f: chain_future(f, t_future))
return t_future
- def set_default_executor(self, executor):
+ def set_default_executor(self, executor: concurrent.futures.Executor) -> None:
"""Sets the default executor to use with :meth:`run_in_executor`.
.. versionadded:: 5.0
"""
self._executor = executor
- def _run_callback(self, callback):
+ def _run_callback(self, callback: Callable[[], Any]) -> None:
"""Runs a callback with error handling.
For use in subclasses.
except Exception:
app_log.error("Exception in callback %r", callback, exc_info=True)
- def _discard_future_result(self, future):
+ def _discard_future_result(self, future: Future) -> None:
"""Avoid unhandled-exception warnings from spawned coroutines."""
future.result()
- def split_fd(self, fd):
+ def split_fd(self, fd: Union[int, _Selectable]) -> Tuple[int, Union[int, _Selectable]]:
"""Returns an (fd, obj) pair from an ``fd`` parameter.
We accept both raw file descriptors and file-like objects as
.. versionadded:: 4.0
"""
- try:
- return fd.fileno(), fd
- except AttributeError:
+ if isinstance(fd, int):
return fd, fd
+ return fd.fileno(), fd
- def close_fd(self, fd):
+ def close_fd(self, fd: Union[int, _Selectable]) -> None:
"""Utility method to close an ``fd``.
If ``fd`` is a file-like object, we close it directly; otherwise
.. versionadded:: 4.0
"""
try:
- try:
- fd.close()
- except AttributeError:
+ if isinstance(fd, int):
os.close(fd)
+ else:
+ fd.close()
except OSError:
pass
# Reduce memory overhead when there are lots of pending callbacks
__slots__ = ['deadline', 'callback', 'tdeadline']
- def __init__(self, deadline, callback, io_loop):
+ def __init__(self, deadline: float, callback: Callable[[], None],
+ io_loop: IOLoop) -> None:
if not isinstance(deadline, numbers.Real):
raise TypeError("Unsupported deadline %r" % deadline)
self.deadline = deadline
self.callback = callback
- self.tdeadline = (deadline, next(io_loop._timeout_counter))
+ self.tdeadline = (deadline, next(io_loop._timeout_counter)) # type: Tuple[float, int]
# Comparison methods to sort by deadline, with object id as a tiebreaker
# to guarantee a consistent ordering. The heapq module uses __le__
# in python2.5, and __lt__ in 2.6+ (sort() and most other comparisons
# use __lt__).
- def __lt__(self, other):
+ def __lt__(self, other: '_Timeout') -> bool:
return self.tdeadline < other.tdeadline
- def __le__(self, other):
+ def __le__(self, other: '_Timeout') -> bool:
return self.tdeadline <= other.tdeadline
.. versionchanged:: 5.1
The ``jitter`` argument is added.
"""
- def __init__(self, callback, callback_time, jitter=0):
+ def __init__(self, callback: Callable[[], None],
+ callback_time: float, jitter: float=0) -> None:
self.callback = callback
if callback_time <= 0:
raise ValueError("Periodic callback must have a positive callback_time")
self.callback_time = callback_time
self.jitter = jitter
self._running = False
- self._timeout = None
+ self._timeout = None # type: object
- def start(self):
+ def start(self) -> None:
"""Starts the timer."""
# Looking up the IOLoop here allows to first instantiate the
# PeriodicCallback in another thread, then start it using
self._next_timeout = self.io_loop.time()
self._schedule_next()
- def stop(self):
+ def stop(self) -> None:
"""Stops the timer."""
self._running = False
if self._timeout is not None:
self.io_loop.remove_timeout(self._timeout)
self._timeout = None
- def is_running(self):
+ def is_running(self) -> bool:
"""Return True if this `.PeriodicCallback` has been started.
.. versionadded:: 4.1
"""
return self._running
- def _run(self):
+ def _run(self) -> None:
if not self._running:
return
try:
finally:
self._schedule_next()
- def _schedule_next(self):
+ def _schedule_next(self) -> None:
if self._running:
self._update_next(self.io_loop.time())
self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run)
- def _update_next(self, current_time):
+ def _update_next(self, current_time: float) -> None:
callback_time_sec = self.callback_time / 1000.0
if self.jitter:
# apply jitter fraction
Windows. Use the `~asyncio.SelectorEventLoop` instead.
"""
+import concurrent.futures
import functools
from tornado.gen import convert_yielded
-from tornado.ioloop import IOLoop
+from tornado.ioloop import IOLoop, _Selectable
import asyncio
+import typing
+from typing import Any, TypeVar, Awaitable, Callable, Union, Optional
+if typing.TYPE_CHECKING:
+ from typing import Set, Dict, Tuple # noqa: F401
+
+_T = TypeVar('_T')
+
class BaseAsyncIOLoop(IOLoop):
- def initialize(self, asyncio_loop, **kwargs):
+ def initialize(self, asyncio_loop: asyncio.AbstractEventLoop, # type: ignore
+ **kwargs: Any) -> None:
self.asyncio_loop = asyncio_loop
# Maps fd to (fileobj, handler function) pair (as in IOLoop.add_handler)
- self.handlers = {}
+ self.handlers = {} # type: Dict[int, Tuple[Union[int, _Selectable], Callable]]
# Set of fds listening for reads/writes
- self.readers = set()
- self.writers = set()
+ self.readers = set() # type: Set[int]
+ self.writers = set() # type: Set[int]
self.closing = False
# If an asyncio loop was closed through an asyncio interface
# instead of IOLoop.close(), we'd never hear about it and may
IOLoop._ioloop_for_asyncio[asyncio_loop] = self
super(BaseAsyncIOLoop, self).initialize(**kwargs)
- def close(self, all_fds=False):
+ def close(self, all_fds: bool=False) -> None:
self.closing = True
for fd in list(self.handlers):
fileobj, handler_func = self.handlers[fd]
del IOLoop._ioloop_for_asyncio[self.asyncio_loop]
self.asyncio_loop.close()
- def add_handler(self, fd, handler, events):
+ def add_handler(self, fd: Union[int, _Selectable],
+ handler: Callable[..., None], events: int) -> None:
fd, fileobj = self.split_fd(fd)
if fd in self.handlers:
raise ValueError("fd %s added twice" % fd)
fd, self._handle_events, fd, IOLoop.WRITE)
self.writers.add(fd)
- def update_handler(self, fd, events):
+ def update_handler(self, fd: Union[int, _Selectable], events: int) -> None:
fd, fileobj = self.split_fd(fd)
if events & IOLoop.READ:
if fd not in self.readers:
self.asyncio_loop.remove_writer(fd)
self.writers.remove(fd)
- def remove_handler(self, fd):
+ def remove_handler(self, fd: Union[int, _Selectable]) -> None:
fd, fileobj = self.split_fd(fd)
if fd not in self.handlers:
return
self.writers.remove(fd)
del self.handlers[fd]
- def _handle_events(self, fd, events):
+ def _handle_events(self, fd: int, events: int) -> None:
fileobj, handler_func = self.handlers[fd]
handler_func(fileobj, events)
- def start(self):
+ def start(self) -> None:
try:
old_loop = asyncio.get_event_loop()
except (RuntimeError, AssertionError):
- old_loop = None
+ old_loop = None # type: ignore
try:
self._setup_logging()
asyncio.set_event_loop(self.asyncio_loop)
finally:
asyncio.set_event_loop(old_loop)
- def stop(self):
+ def stop(self) -> None:
self.asyncio_loop.stop()
- def call_at(self, when, callback, *args, **kwargs):
+ def call_at(self, when: float, callback: Callable[..., None],
+ *args: Any, **kwargs: Any) -> object:
# asyncio.call_at supports *args but not **kwargs, so bind them here.
# We do not synchronize self.time and asyncio_loop.time, so
# convert from absolute to relative.
max(0, when - self.time()), self._run_callback,
functools.partial(callback, *args, **kwargs))
- def remove_timeout(self, timeout):
- timeout.cancel()
+ def remove_timeout(self, timeout: object) -> None:
+ timeout.cancel() # type: ignore
- def add_callback(self, callback, *args, **kwargs):
+ def add_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None:
try:
self.asyncio_loop.call_soon_threadsafe(
self._run_callback,
add_callback_from_signal = add_callback
- def run_in_executor(self, executor, func, *args):
+ def run_in_executor(self, executor: Optional[concurrent.futures.Executor],
+ func: Callable[..., _T], *args: Any) -> Awaitable[_T]:
return self.asyncio_loop.run_in_executor(executor, func, *args)
- def set_default_executor(self, executor):
+ def set_default_executor(self, executor: concurrent.futures.Executor) -> None:
return self.asyncio_loop.set_default_executor(executor)
Closing an `AsyncIOMainLoop` now closes the underlying asyncio loop.
"""
- def initialize(self, **kwargs):
+ def initialize(self, **kwargs: Any) -> None: # type: ignore
super(AsyncIOMainLoop, self).initialize(asyncio.get_event_loop(), **kwargs)
- def make_current(self):
+ def make_current(self) -> None:
# AsyncIOMainLoop already refers to the current asyncio loop so
# nothing to do here.
pass
Now used automatically when appropriate; it is no longer necessary
to refer to this class directly.
"""
- def initialize(self, **kwargs):
+ def initialize(self, **kwargs: Any) -> None: # type: ignore
self.is_current = False
loop = asyncio.new_event_loop()
try:
loop.close()
raise
- def close(self, all_fds=False):
+ def close(self, all_fds: bool=False) -> None:
if self.is_current:
self.clear_current()
super(AsyncIOLoop, self).close(all_fds=all_fds)
- def make_current(self):
+ def make_current(self) -> None:
if not self.is_current:
try:
self.old_asyncio = asyncio.get_event_loop()
except (RuntimeError, AssertionError):
- self.old_asyncio = None
+ self.old_asyncio = None # type: ignore
self.is_current = True
asyncio.set_event_loop(self.asyncio_loop)
- def _clear_current_hook(self):
+ def _clear_current_hook(self) -> None:
if self.is_current:
asyncio.set_event_loop(self.old_asyncio)
self.is_current = False
-def to_tornado_future(asyncio_future):
+def to_tornado_future(asyncio_future: asyncio.Future) -> asyncio.Future:
"""Convert an `asyncio.Future` to a `tornado.concurrent.Future`.
.. versionadded:: 4.1
return asyncio_future
-def to_asyncio_future(tornado_future):
+def to_asyncio_future(tornado_future: asyncio.Future) -> asyncio.Future:
"""Convert a Tornado yieldable object to an `asyncio.Future`.
.. versionadded:: 4.1
.. versionadded:: 5.0
"""
- def get_event_loop(self):
+ def get_event_loop(self) -> asyncio.AbstractEventLoop:
try:
return super().get_event_loop()
except (RuntimeError, AssertionError):