import builtins
import collections
from collections.abc import Generator
+import concurrent.futures
+import datetime
import functools
from functools import singledispatch
from inspect import isawaitable
import sys
+import types
from tornado.concurrent import (Future, is_future, chain_future, future_set_exc_info,
future_add_done_callback, future_set_result_unless_cancelled)
from tornado.log import app_log
from tornado.util import TimeoutError
+import typing
+from typing import Union, Any, Callable, List, Type, Tuple, Awaitable, Dict
+
+if typing.TYPE_CHECKING:
+ from typing import Sequence, Deque, Optional, Set, Iterable # noqa: F401
+
+_T = typing.TypeVar('_T')
+
+_Yieldable = Union[None, Awaitable, List[Awaitable], Dict[Any, Awaitable],
+ concurrent.futures.Future]
+
class KeyReuseError(Exception):
pass
pass
-def _value_from_stopiteration(e):
+def _value_from_stopiteration(e: Union[StopIteration, 'Return']) -> Any:
try:
# StopIteration has a value attribute beginning in py33.
# So does our Return class.
return None
-def _create_future():
- future = Future()
+def _create_future() -> Future:
+ future = Future() # type: Future
# Fixup asyncio debug info by removing extraneous stack entries
source_traceback = getattr(future, "_source_traceback", ())
while source_traceback:
return future
-def coroutine(func):
+def coroutine(func: Callable[..., _T]) -> Callable[..., 'Future[_T]']:
"""Decorator for asynchronous generators.
Any generator that yields objects from this module must be wrapped
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
+ # type: (*Any, **Any) -> Future[_T]
+ # This function is type-annotated with a comment to work around
+ # https://bitbucket.org/pypy/pypy/issues/2868/segfault-with-args-type-annotation-in
future = _create_future()
try:
result = func(*args, **kwargs)
return future
finally:
# Avoid circular references
- future = None
+ future = None # type: ignore
else:
if isinstance(result, Generator):
# Inline the first iteration of Runner.run. This lets us
# benchmarks (relative to the refcount-based scheme
# used in the absence of cycles). We can avoid the
# cycle by clearing the local variable after we return it.
- future = None
+ future = None # type: ignore
future_set_result_unless_cancelled(future, result)
return future
- wrapper.__wrapped__ = func
- wrapper.__tornado_coroutine__ = True
+ wrapper.__wrapped__ = func # type: ignore
+ wrapper.__tornado_coroutine__ = True # type: ignore
return wrapper
-def is_coroutine_function(func):
+def is_coroutine_function(func: Any) -> bool:
"""Return whether *func* is a coroutine function, i.e. a function
wrapped with `~.gen.coroutine`.
but it is never necessary to ``raise gen.Return()``. The ``return``
statement can be used with no arguments instead.
"""
- def __init__(self, value=None):
+ def __init__(self, value: Any=None) -> None:
super(Return, self).__init__()
self.value = value
# Cython recognizes subclasses of StopIteration with a .args tuple.
Added ``async for`` support in Python 3.5.
"""
- def __init__(self, *args, **kwargs):
+
+ _unfinished = {} # type: Dict[Future, Union[int, str]]
+
+ def __init__(self, *args: Future, **kwargs: Future) -> None:
if args and kwargs:
raise ValueError(
"You must provide args or kwargs, not both")
if kwargs:
self._unfinished = dict((f, k) for (k, f) in kwargs.items())
- futures = list(kwargs.values())
+ futures = list(kwargs.values()) # type: Sequence[Future]
else:
self._unfinished = dict((f, i) for (i, f) in enumerate(args))
futures = args
- self._finished = collections.deque()
- self.current_index = self.current_future = None
- self._running_future = None
+ self._finished = collections.deque() # type: Deque[Future]
+ self.current_index = None # type: Optional[Union[str, int]]
+ self.current_future = None # type: Optional[Future]
+ self._running_future = None # type: Optional[Future]
for future in futures:
future_add_done_callback(future, self._done_callback)
- def done(self):
+ def done(self) -> bool:
"""Returns True if this iterator has no more results."""
if self._finished or self._unfinished:
return False
self.current_index = self.current_future = None
return True
- def next(self):
+ def next(self) -> Future:
"""Returns a `.Future` that will yield the next available result.
Note that this `.Future` will not be the same object as any of
return self._running_future
- def _done_callback(self, done):
+ def _done_callback(self, done: Future) -> None:
if self._running_future and not self._running_future.done():
self._return_result(done)
else:
self._finished.append(done)
- def _return_result(self, done):
+ def _return_result(self, done: Future) -> None:
"""Called set the returned future's state that of the future
we yielded, and set the current future for the iterator.
"""
+ if self._running_future is None:
+ raise Exception("no future is running")
chain_future(done, self._running_future)
self.current_future = done
self.current_index = self._unfinished.pop(done)
- def __aiter__(self):
+ def __aiter__(self) -> typing.AsyncIterator:
return self
- def __anext__(self):
+ def __anext__(self) -> Future:
if self.done():
# Lookup by name to silence pyflakes on older versions.
raise getattr(builtins, 'StopAsyncIteration')()
return self.next()
-def multi(children, quiet_exceptions=()):
+def multi(
+ children: Union[List[_Yieldable], Dict[Any, _Yieldable]],
+ quiet_exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]]=(),
+) -> Union['Future[List]', 'Future[Dict]']:
"""Runs multiple asynchronous operations in parallel.
``children`` may either be a list or a dict whose values are
Multi = multi
-def multi_future(children, quiet_exceptions=()):
+def multi_future(
+ children: Union[List[_Yieldable], Dict[Any, _Yieldable]],
+ quiet_exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]]=(),
+) -> Union['Future[List]', 'Future[Dict]']:
"""Wait for multiple asynchronous futures in parallel.
Since Tornado 6.0, this function is exactly the same as `multi`.
Use `multi` instead.
"""
if isinstance(children, dict):
- keys = list(children.keys())
- children = children.values()
+ keys = list(children.keys()) # type: Optional[List]
+ children_seq = children.values() # type: Iterable
else:
keys = None
- children = list(map(convert_yielded, children))
- assert all(is_future(i) or isinstance(i, _NullFuture) for i in children)
- unfinished_children = set(children)
+ children_seq = children
+ children_futs = list(map(convert_yielded, children_seq))
+ assert all(is_future(i) or isinstance(i, _NullFuture) for i in children_futs)
+ unfinished_children = set(children_futs)
future = _create_future()
- if not children:
+ if not children_futs:
future_set_result_unless_cancelled(future,
{} if keys is not None else [])
- def callback(f):
- unfinished_children.remove(f)
+ def callback(fut: Future) -> None:
+ unfinished_children.remove(fut)
if not unfinished_children:
result_list = []
- for f in children:
+ for f in children_futs:
try:
result_list.append(f.result())
except Exception as e:
else:
future_set_result_unless_cancelled(future, result_list)
- listening = set()
- for f in children:
+ listening = set() # type: Set[Future]
+ for f in children_futs:
if f not in listening:
listening.add(f)
future_add_done_callback(f, callback)
return future
-def maybe_future(x):
+def maybe_future(x: Any) -> Future:
"""Converts ``x`` into a `.Future`.
If ``x`` is already a `.Future`, it is simply returned; otherwise
return fut
-def with_timeout(timeout, future, quiet_exceptions=()):
+def with_timeout(
+ timeout: Union[float, datetime.timedelta], future: _Yieldable,
+ quiet_exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]]=(),
+) -> Future:
"""Wraps a `.Future` (or other yieldable object) in a timeout.
Raises `tornado.util.TimeoutError` if the input future does not
# one waiting on the input future, so cancelling it might disrupt other
# callers and B) concurrent futures can only be cancelled while they are
# in the queue, so cancellation cannot reliably bound our waiting time.
- future = convert_yielded(future)
+ future_converted = convert_yielded(future)
result = _create_future()
- chain_future(future, result)
+ chain_future(future_converted, result)
io_loop = IOLoop.current()
- def error_callback(future):
+ def error_callback(future: Future) -> None:
try:
future.result()
except Exception as e:
app_log.error("Exception in Future %r after timeout",
future, exc_info=True)
- def timeout_callback():
+ def timeout_callback() -> None:
if not result.done():
result.set_exception(TimeoutError("Timeout"))
# In case the wrapped future goes on to fail, log it.
- future_add_done_callback(future, error_callback)
+ future_add_done_callback(future_converted, error_callback)
timeout_handle = io_loop.add_timeout(
timeout, timeout_callback)
- if isinstance(future, Future):
+ if isinstance(future_converted, Future):
# We know this future will resolve on the IOLoop, so we don't
# need the extra thread-safety of IOLoop.add_future (and we also
# don't care about StackContext here.
future_add_done_callback(
- future, lambda future: io_loop.remove_timeout(timeout_handle))
+ future_converted, lambda future: io_loop.remove_timeout(timeout_handle))
else:
# concurrent.futures.Futures may resolve on any thread, so we
# need to route them back to the IOLoop.
io_loop.add_future(
- future, lambda future: io_loop.remove_timeout(timeout_handle))
+ future_converted, lambda future: io_loop.remove_timeout(timeout_handle))
return result
-def sleep(duration):
+def sleep(duration: float) -> 'Future[None]':
"""Return a `.Future` that resolves after the given number of seconds.
When used with ``yield`` in a coroutine, this is a non-blocking
It's not actually a `Future` to avoid depending on a particular event loop.
Handled as a special case in the coroutine runner.
+
+ We lie and tell the type checker that a _NullFuture is a Future so
+ we don't have to leak _NullFuture into lots of public APIs. But
+ this means that the type checker can't warn us when we're passing
+ a _NullFuture into a code path that doesn't understand what to do
+ with it.
"""
- def result(self):
+ def result(self) -> None:
return None
- def done(self):
+ def done(self) -> bool:
return True
# _null_future is used as a dummy value in the coroutine runner. It differs
# from moment in that moment always adds a delay of one IOLoop iteration
# while _null_future is processed as soon as possible.
-_null_future = _NullFuture()
+_null_future = typing.cast(Future, _NullFuture())
-moment = _NullFuture()
+moment = typing.cast(Future, _NullFuture())
moment.__doc__ = \
"""A special object which may be yielded to allow the IOLoop to run for
one iteration.
The results of the generator are stored in ``result_future`` (a
`.Future`)
"""
- def __init__(self, gen, result_future, first_yielded):
+ def __init__(self, gen: 'Generator[_Yieldable, Any, _T]', result_future: 'Future[_T]',
+ first_yielded: _Yieldable) -> None:
self.gen = gen
self.result_future = result_future
- self.future = _null_future
+ self.future = _null_future # type: Union[None, Future]
self.running = False
self.finished = False
self.io_loop = IOLoop.current()
if self.handle_yield(first_yielded):
- gen = result_future = first_yielded = None
+ gen = result_future = first_yielded = None # type: ignore
self.run()
- def run(self):
+ def run(self) -> None:
"""Starts or resumes the generator, running until it reaches a
yield point that is not ready.
"""
self.running = True
while True:
future = self.future
+ if future is None:
+ raise Exception("No pending future")
if not future.done():
return
self.future = None
if exc_info is not None:
try:
- yielded = self.gen.throw(*exc_info)
+ yielded = self.gen.throw(*exc_info) # type: ignore
finally:
# Break up a reference to itself
# for faster GC on CPython.
self.future = _null_future
future_set_result_unless_cancelled(self.result_future,
_value_from_stopiteration(e))
- self.result_future = None
+ self.result_future = None # type: ignore
return
except Exception:
self.finished = True
self.future = _null_future
future_set_exc_info(self.result_future, sys.exc_info())
- self.result_future = None
+ self.result_future = None # type: ignore
return
if not self.handle_yield(yielded):
return
finally:
self.running = False
- def handle_yield(self, yielded):
+ def handle_yield(self, yielded: _Yieldable) -> bool:
try:
self.future = convert_yielded(yielded)
except BadYieldError:
if self.future is moment:
self.io_loop.add_callback(self.run)
return False
+ elif self.future is None:
+ raise Exception("no pending future")
elif not self.future.done():
- def inner(f):
+ def inner(f: Any) -> None:
# Break a reference cycle to speed GC.
f = None # noqa
self.run()
return False
return True
- def handle_exception(self, typ, value, tb):
+ def handle_exception(self, typ: Type[Exception], value: Exception,
+ tb: types.TracebackType) -> bool:
if not self.running and not self.finished:
self.future = Future()
future_set_exc_info(self.future, (typ, value, tb))
_wrap_awaitable = getattr(asyncio, 'async')
-def convert_yielded(yielded):
+def convert_yielded(yielded: _Yieldable) -> Future:
"""Convert a yielded object into a `.Future`.
The default implementation accepts lists, dictionaries, and Futures.
elif yielded is _null_future:
return _null_future
elif isinstance(yielded, (list, dict)):
- return multi(yielded)
+ return multi(yielded) # type: ignore
elif is_future(yielded):
- return yielded
+ return typing.cast(Future, yielded)
elif isawaitable(yielded):
- return _wrap_awaitable(yielded)
+ return _wrap_awaitable(yielded) # type: ignore
else:
raise BadYieldError("yielded unknown object %r" % (yielded,))
from tornado import gen
+import typing
+
+if typing.TYPE_CHECKING:
+ from typing import List, Optional # noqa: F401
+
class GenBasicTest(AsyncTestCase):
@gen.coroutine
coro = gen.coroutine(f)
self.assertEqual(coro.__name__, f.__name__)
self.assertEqual(coro.__module__, f.__module__)
- self.assertIs(coro.__wrapped__, f)
+ self.assertIs(coro.__wrapped__, f) # type: ignore
def test_is_coroutine_function(self):
self.finished = True
yield yieldable
# First, confirm the behavior without moment: each coroutine
# monopolizes the event loop until it finishes.
- immediate = Future()
+ immediate = Future() # type: Future[None]
immediate.set_result(None)
yield [f('a', immediate), f('b', immediate)]
self.assertEqual(''.join(calls), 'aaaaabbbbb')
pass
local_var = Foo()
self.local_ref = weakref.ref(local_var)
- yield gen.coroutine(lambda: None)()
+
+ def dummy():
+ pass
+ yield gen.coroutine(dummy)()
raise ValueError('Some error')
@gen.coroutine
self.write("3")
-class GenCoroutineExceptionHandler(RequestHandler):
- @gen.coroutine
- def get(self):
- # This test depends on the order of the two decorators.
- io_loop = self.request.connection.stream.io_loop
- yield gen.Task(io_loop.add_callback)
- raise Exception("oops")
-
-
# "Undecorated" here refers to the absence of @asynchronous.
class UndecoratedCoroutinesHandler(RequestHandler):
@gen.coroutine
def prepare(self):
- self.chunks = []
+ self.chunks = [] # type: List[str]
yield gen.moment
self.chunks.append('1')
('/coroutine_sequence', GenCoroutineSequenceHandler),
('/coroutine_unfinished_sequence',
GenCoroutineUnfinishedSequenceHandler),
- ('/coroutine_exception', GenCoroutineExceptionHandler),
('/undecorated_coroutine', UndecoratedCoroutinesHandler),
('/async_prepare_error', AsyncPrepareErrorHandler),
('/native_coroutine', NativeCoroutineHandler),
response = self.fetch('/coroutine_unfinished_sequence')
self.assertEqual(response.body, b"123")
- def test_coroutine_exception_handler(self):
- # Make sure we get an error and not a timeout
- with ExpectLog(app_log, "Uncaught exception GET /coroutine_exception"):
- response = self.fetch('/coroutine_exception')
- self.assertEqual(500, response.code)
-
def test_undecorated_coroutines(self):
response = self.fetch('/undecorated_coroutine')
self.assertEqual(response.body, b'123')
@gen_test
def test_completes_before_timeout(self):
- future = Future()
+ future = Future() # type: Future[str]
self.io_loop.add_timeout(datetime.timedelta(seconds=0.1),
lambda: future.set_result('asdf'))
result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
@gen_test
def test_fails_before_timeout(self):
- future = Future()
+ future = Future() # type: Future[str]
self.io_loop.add_timeout(
datetime.timedelta(seconds=0.1),
lambda: future.set_exception(ZeroDivisionError()))
@gen_test
def test_already_resolved(self):
- future = Future()
+ future = Future() # type: Future[str]
future.set_result('asdf')
result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
future)
# A concurrent future that is resolved before we even submit it
# to with_timeout.
with futures.ThreadPoolExecutor(1) as executor:
- f = executor.submit(lambda: None)
+ def dummy():
+ pass
+ f = executor.submit(dummy)
f.result() # wait for completion
yield gen.with_timeout(datetime.timedelta(seconds=3600), f)
self.assertTrue(g.done(), 'empty generator iterated')
with self.assertRaises(ValueError):
- g = gen.WaitIterator(False, bar=False)
+ g = gen.WaitIterator(Future(), bar=Future())
self.assertEqual(g.current_index, None, "bad nil current index")
self.assertEqual(g.current_future, None, "bad nil current future")
@gen_test
def test_already_done(self):
- f1 = Future()
- f2 = Future()
- f3 = Future()
+ f1 = Future() # type: Future[int]
+ f2 = Future() # type: Future[int]
+ f3 = Future() # type: Future[int]
f1.set_result(24)
f2.set_result(42)
f3.set_result(84)
@gen_test
def test_iterator(self):
- futures = [Future(), Future(), Future(), Future()]
+ futures = [Future(), Future(), Future(), Future()] # type: List[Future[int]]
self.finish_coroutines(0, futures)
# Recreate the previous test with py35 syntax. It's a little clunky
# because of the way the previous test handles an exception on
# a single iteration.
- futures = [Future(), Future(), Future(), Future()]
+ futures = [Future(), Future(), Future(), Future()] # type: List[Future[int]]
self.finish_coroutines(0, futures)
self.finished = False
def test_gc(self):
# Github issue 1769: Runner objects can get GCed unexpectedly
# while their future is alive.
- weakref_scope = [None]
+ weakref_scope = [None] # type: List[Optional[weakref.ReferenceType]]
def callback():
gc.collect(2)
- weakref_scope[0]().set_result(123)
+ weakref_scope[0]().set_result(123) # type: ignore
@gen.coroutine
def tester():
- fut = Future()
+ fut = Future() # type: Future[int]
weakref_scope[0] = weakref.ref(fut)
self.io_loop.add_callback(callback)
yield fut
# their loop is closed, even if they're involved in a reference
# cycle.
loop = self.get_new_ioloop()
- result = []
+ result = [] # type: List[Optional[bool]]
wfut = []
@gen.coroutine
@gen.coroutine
def do_something():
fut = infinite_coro()
- fut._refcycle = fut
+ fut._refcycle = fut # type: ignore
wfut.append(weakref.ref(fut))
yield gen.sleep(0.2)
result.append(None)
loop = self.get_new_ioloop()
- result = []
+ result = [] # type: List[Optional[bool]]
wfut = []
@gen.coroutine
def do_something():
fut = asyncio.get_event_loop().create_task(infinite_coro(result))
- fut._refcycle = fut
+ fut._refcycle = fut # type: ignore
wfut.append(weakref.ref(fut))
yield gen.sleep(0.2)