From: Ben Darnell Date: Sat, 11 Aug 2018 15:27:13 +0000 (-0400) Subject: gen: Type annotate the module X-Git-Tag: v6.0.0b1~35^2~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=467fec4b758fba442698c1ee252ed34fa90b1418;p=thirdparty%2Ftornado.git gen: Type annotate the module This is a tricky module to type. In some cases mypy just isn't able to express things (e.g. decorators that preserve the parameter types of their functions but change the return type), and in others I think it's possible to do better but I haven't figured out how (preserving types passed through with_timeout). --- diff --git a/setup.cfg b/setup.cfg index 9b302ac13..b88a5b393 100644 --- a/setup.cfg +++ b/setup.cfg @@ -13,6 +13,9 @@ disallow_untyped_defs = True [mypy-tornado.concurrent] disallow_untyped_defs = True +[mypy-tornado.gen] +disallow_untyped_defs = True + # It's generally too tedious to require type annotations in tests, but # we do want to type check them as much as type inference allows. [mypy-tornado.test.util_test] @@ -26,3 +29,6 @@ check_untyped_defs = True [mypy-tornado.test.concurrent_test] check_untyped_defs = True + +[mypy-tornado.test.gen_test] +check_untyped_defs = True diff --git a/tornado/gen.py b/tornado/gen.py index b7e84cc49..71c7f71bd 100644 --- a/tornado/gen.py +++ b/tornado/gen.py @@ -74,10 +74,13 @@ import asyncio import builtins import collections from collections.abc import Generator +import concurrent.futures +import datetime import functools from functools import singledispatch from inspect import isawaitable import sys +import types from tornado.concurrent import (Future, is_future, chain_future, future_set_exc_info, future_add_done_callback, future_set_result_unless_cancelled) @@ -85,6 +88,17 @@ from tornado.ioloop import IOLoop from tornado.log import app_log from tornado.util import TimeoutError +import typing +from typing import Union, Any, Callable, List, Type, Tuple, Awaitable, Dict + +if typing.TYPE_CHECKING: + from typing import Sequence, Deque, Optional, Set, Iterable # noqa: F401 + +_T = typing.TypeVar('_T') + +_Yieldable = Union[None, Awaitable, List[Awaitable], Dict[Any, Awaitable], + concurrent.futures.Future] + class KeyReuseError(Exception): pass @@ -106,7 +120,7 @@ class ReturnValueIgnoredError(Exception): pass -def _value_from_stopiteration(e): +def _value_from_stopiteration(e: Union[StopIteration, 'Return']) -> Any: try: # StopIteration has a value attribute beginning in py33. # So does our Return class. @@ -121,8 +135,8 @@ def _value_from_stopiteration(e): return None -def _create_future(): - future = Future() +def _create_future() -> Future: + future = Future() # type: Future # Fixup asyncio debug info by removing extraneous stack entries source_traceback = getattr(future, "_source_traceback", ()) while source_traceback: @@ -136,7 +150,7 @@ def _create_future(): return future -def coroutine(func): +def coroutine(func: Callable[..., _T]) -> Callable[..., 'Future[_T]']: """Decorator for asynchronous generators. Any generator that yields objects from this module must be wrapped @@ -170,6 +184,9 @@ def coroutine(func): """ @functools.wraps(func) def wrapper(*args, **kwargs): + # type: (*Any, **Any) -> Future[_T] + # This function is type-annotated with a comment to work around + # https://bitbucket.org/pypy/pypy/issues/2868/segfault-with-args-type-annotation-in future = _create_future() try: result = func(*args, **kwargs) @@ -181,7 +198,7 @@ def coroutine(func): return future finally: # Avoid circular references - future = None + future = None # type: ignore else: if isinstance(result, Generator): # Inline the first iteration of Runner.run. This lets us @@ -218,16 +235,16 @@ def coroutine(func): # benchmarks (relative to the refcount-based scheme # used in the absence of cycles). We can avoid the # cycle by clearing the local variable after we return it. - future = None + future = None # type: ignore future_set_result_unless_cancelled(future, result) return future - wrapper.__wrapped__ = func - wrapper.__tornado_coroutine__ = True + wrapper.__wrapped__ = func # type: ignore + wrapper.__tornado_coroutine__ = True # type: ignore return wrapper -def is_coroutine_function(func): +def is_coroutine_function(func: Any) -> bool: """Return whether *func* is a coroutine function, i.e. a function wrapped with `~.gen.coroutine`. @@ -256,7 +273,7 @@ class Return(Exception): but it is never necessary to ``raise gen.Return()``. The ``return`` statement can be used with no arguments instead. """ - def __init__(self, value=None): + def __init__(self, value: Any=None) -> None: super(Return, self).__init__() self.value = value # Cython recognizes subclasses of StopIteration with a .args tuple. @@ -316,26 +333,30 @@ class WaitIterator(object): Added ``async for`` support in Python 3.5. """ - def __init__(self, *args, **kwargs): + + _unfinished = {} # type: Dict[Future, Union[int, str]] + + def __init__(self, *args: Future, **kwargs: Future) -> None: if args and kwargs: raise ValueError( "You must provide args or kwargs, not both") if kwargs: self._unfinished = dict((f, k) for (k, f) in kwargs.items()) - futures = list(kwargs.values()) + futures = list(kwargs.values()) # type: Sequence[Future] else: self._unfinished = dict((f, i) for (i, f) in enumerate(args)) futures = args - self._finished = collections.deque() - self.current_index = self.current_future = None - self._running_future = None + self._finished = collections.deque() # type: Deque[Future] + self.current_index = None # type: Optional[Union[str, int]] + self.current_future = None # type: Optional[Future] + self._running_future = None # type: Optional[Future] for future in futures: future_add_done_callback(future, self._done_callback) - def done(self): + def done(self) -> bool: """Returns True if this iterator has no more results.""" if self._finished or self._unfinished: return False @@ -343,7 +364,7 @@ class WaitIterator(object): self.current_index = self.current_future = None return True - def next(self): + def next(self) -> Future: """Returns a `.Future` that will yield the next available result. Note that this `.Future` will not be the same object as any of @@ -356,32 +377,37 @@ class WaitIterator(object): return self._running_future - def _done_callback(self, done): + def _done_callback(self, done: Future) -> None: if self._running_future and not self._running_future.done(): self._return_result(done) else: self._finished.append(done) - def _return_result(self, done): + def _return_result(self, done: Future) -> None: """Called set the returned future's state that of the future we yielded, and set the current future for the iterator. """ + if self._running_future is None: + raise Exception("no future is running") chain_future(done, self._running_future) self.current_future = done self.current_index = self._unfinished.pop(done) - def __aiter__(self): + def __aiter__(self) -> typing.AsyncIterator: return self - def __anext__(self): + def __anext__(self) -> Future: if self.done(): # Lookup by name to silence pyflakes on older versions. raise getattr(builtins, 'StopAsyncIteration')() return self.next() -def multi(children, quiet_exceptions=()): +def multi( + children: Union[List[_Yieldable], Dict[Any, _Yieldable]], + quiet_exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]]=(), +) -> Union['Future[List]', 'Future[Dict]']: """Runs multiple asynchronous operations in parallel. ``children`` may either be a list or a dict whose values are @@ -432,7 +458,10 @@ def multi(children, quiet_exceptions=()): Multi = multi -def multi_future(children, quiet_exceptions=()): +def multi_future( + children: Union[List[_Yieldable], Dict[Any, _Yieldable]], + quiet_exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]]=(), +) -> Union['Future[List]', 'Future[Dict]']: """Wait for multiple asynchronous futures in parallel. Since Tornado 6.0, this function is exactly the same as `multi`. @@ -448,24 +477,25 @@ def multi_future(children, quiet_exceptions=()): Use `multi` instead. """ if isinstance(children, dict): - keys = list(children.keys()) - children = children.values() + keys = list(children.keys()) # type: Optional[List] + children_seq = children.values() # type: Iterable else: keys = None - children = list(map(convert_yielded, children)) - assert all(is_future(i) or isinstance(i, _NullFuture) for i in children) - unfinished_children = set(children) + children_seq = children + children_futs = list(map(convert_yielded, children_seq)) + assert all(is_future(i) or isinstance(i, _NullFuture) for i in children_futs) + unfinished_children = set(children_futs) future = _create_future() - if not children: + if not children_futs: future_set_result_unless_cancelled(future, {} if keys is not None else []) - def callback(f): - unfinished_children.remove(f) + def callback(fut: Future) -> None: + unfinished_children.remove(fut) if not unfinished_children: result_list = [] - for f in children: + for f in children_futs: try: result_list.append(f.result()) except Exception as e: @@ -482,15 +512,15 @@ def multi_future(children, quiet_exceptions=()): else: future_set_result_unless_cancelled(future, result_list) - listening = set() - for f in children: + listening = set() # type: Set[Future] + for f in children_futs: if f not in listening: listening.add(f) future_add_done_callback(f, callback) return future -def maybe_future(x): +def maybe_future(x: Any) -> Future: """Converts ``x`` into a `.Future`. If ``x`` is already a `.Future`, it is simply returned; otherwise @@ -511,7 +541,10 @@ def maybe_future(x): return fut -def with_timeout(timeout, future, quiet_exceptions=()): +def with_timeout( + timeout: Union[float, datetime.timedelta], future: _Yieldable, + quiet_exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]]=(), +) -> Future: """Wraps a `.Future` (or other yieldable object) in a timeout. Raises `tornado.util.TimeoutError` if the input future does not @@ -542,12 +575,12 @@ def with_timeout(timeout, future, quiet_exceptions=()): # one waiting on the input future, so cancelling it might disrupt other # callers and B) concurrent futures can only be cancelled while they are # in the queue, so cancellation cannot reliably bound our waiting time. - future = convert_yielded(future) + future_converted = convert_yielded(future) result = _create_future() - chain_future(future, result) + chain_future(future_converted, result) io_loop = IOLoop.current() - def error_callback(future): + def error_callback(future: Future) -> None: try: future.result() except Exception as e: @@ -555,28 +588,28 @@ def with_timeout(timeout, future, quiet_exceptions=()): app_log.error("Exception in Future %r after timeout", future, exc_info=True) - def timeout_callback(): + def timeout_callback() -> None: if not result.done(): result.set_exception(TimeoutError("Timeout")) # In case the wrapped future goes on to fail, log it. - future_add_done_callback(future, error_callback) + future_add_done_callback(future_converted, error_callback) timeout_handle = io_loop.add_timeout( timeout, timeout_callback) - if isinstance(future, Future): + if isinstance(future_converted, Future): # We know this future will resolve on the IOLoop, so we don't # need the extra thread-safety of IOLoop.add_future (and we also # don't care about StackContext here. future_add_done_callback( - future, lambda future: io_loop.remove_timeout(timeout_handle)) + future_converted, lambda future: io_loop.remove_timeout(timeout_handle)) else: # concurrent.futures.Futures may resolve on any thread, so we # need to route them back to the IOLoop. io_loop.add_future( - future, lambda future: io_loop.remove_timeout(timeout_handle)) + future_converted, lambda future: io_loop.remove_timeout(timeout_handle)) return result -def sleep(duration): +def sleep(duration: float) -> 'Future[None]': """Return a `.Future` that resolves after the given number of seconds. When used with ``yield`` in a coroutine, this is a non-blocking @@ -601,20 +634,26 @@ class _NullFuture(object): It's not actually a `Future` to avoid depending on a particular event loop. Handled as a special case in the coroutine runner. + + We lie and tell the type checker that a _NullFuture is a Future so + we don't have to leak _NullFuture into lots of public APIs. But + this means that the type checker can't warn us when we're passing + a _NullFuture into a code path that doesn't understand what to do + with it. """ - def result(self): + def result(self) -> None: return None - def done(self): + def done(self) -> bool: return True # _null_future is used as a dummy value in the coroutine runner. It differs # from moment in that moment always adds a delay of one IOLoop iteration # while _null_future is processed as soon as possible. -_null_future = _NullFuture() +_null_future = typing.cast(Future, _NullFuture()) -moment = _NullFuture() +moment = typing.cast(Future, _NullFuture()) moment.__doc__ = \ """A special object which may be yielded to allow the IOLoop to run for one iteration. @@ -640,18 +679,19 @@ class Runner(object): The results of the generator are stored in ``result_future`` (a `.Future`) """ - def __init__(self, gen, result_future, first_yielded): + def __init__(self, gen: 'Generator[_Yieldable, Any, _T]', result_future: 'Future[_T]', + first_yielded: _Yieldable) -> None: self.gen = gen self.result_future = result_future - self.future = _null_future + self.future = _null_future # type: Union[None, Future] self.running = False self.finished = False self.io_loop = IOLoop.current() if self.handle_yield(first_yielded): - gen = result_future = first_yielded = None + gen = result_future = first_yielded = None # type: ignore self.run() - def run(self): + def run(self) -> None: """Starts or resumes the generator, running until it reaches a yield point that is not ready. """ @@ -661,6 +701,8 @@ class Runner(object): self.running = True while True: future = self.future + if future is None: + raise Exception("No pending future") if not future.done(): return self.future = None @@ -675,7 +717,7 @@ class Runner(object): if exc_info is not None: try: - yielded = self.gen.throw(*exc_info) + yielded = self.gen.throw(*exc_info) # type: ignore finally: # Break up a reference to itself # for faster GC on CPython. @@ -688,13 +730,13 @@ class Runner(object): self.future = _null_future future_set_result_unless_cancelled(self.result_future, _value_from_stopiteration(e)) - self.result_future = None + self.result_future = None # type: ignore return except Exception: self.finished = True self.future = _null_future future_set_exc_info(self.result_future, sys.exc_info()) - self.result_future = None + self.result_future = None # type: ignore return if not self.handle_yield(yielded): return @@ -702,7 +744,7 @@ class Runner(object): finally: self.running = False - def handle_yield(self, yielded): + def handle_yield(self, yielded: _Yieldable) -> bool: try: self.future = convert_yielded(yielded) except BadYieldError: @@ -712,8 +754,10 @@ class Runner(object): if self.future is moment: self.io_loop.add_callback(self.run) return False + elif self.future is None: + raise Exception("no pending future") elif not self.future.done(): - def inner(f): + def inner(f: Any) -> None: # Break a reference cycle to speed GC. f = None # noqa self.run() @@ -722,7 +766,8 @@ class Runner(object): return False return True - def handle_exception(self, typ, value, tb): + def handle_exception(self, typ: Type[Exception], value: Exception, + tb: types.TracebackType) -> bool: if not self.running and not self.finished: self.future = Future() future_set_exc_info(self.future, (typ, value, tb)) @@ -741,7 +786,7 @@ except AttributeError: _wrap_awaitable = getattr(asyncio, 'async') -def convert_yielded(yielded): +def convert_yielded(yielded: _Yieldable) -> Future: """Convert a yielded object into a `.Future`. The default implementation accepts lists, dictionaries, and Futures. @@ -760,11 +805,11 @@ def convert_yielded(yielded): elif yielded is _null_future: return _null_future elif isinstance(yielded, (list, dict)): - return multi(yielded) + return multi(yielded) # type: ignore elif is_future(yielded): - return yielded + return typing.cast(Future, yielded) elif isawaitable(yielded): - return _wrap_awaitable(yielded) + return _wrap_awaitable(yielded) # type: ignore else: raise BadYieldError("yielded unknown object %r" % (yielded,)) diff --git a/tornado/test/gen_test.py b/tornado/test/gen_test.py index 9e6cb095e..dcc50583b 100644 --- a/tornado/test/gen_test.py +++ b/tornado/test/gen_test.py @@ -16,6 +16,11 @@ from tornado.web import Application, RequestHandler, HTTPError from tornado import gen +import typing + +if typing.TYPE_CHECKING: + from typing import List, Optional # noqa: F401 + class GenBasicTest(AsyncTestCase): @gen.coroutine @@ -268,7 +273,7 @@ class GenCoroutineTest(AsyncTestCase): coro = gen.coroutine(f) self.assertEqual(coro.__name__, f.__name__) self.assertEqual(coro.__module__, f.__module__) - self.assertIs(coro.__wrapped__, f) + self.assertIs(coro.__wrapped__, f) # type: ignore def test_is_coroutine_function(self): self.finished = True @@ -484,7 +489,7 @@ class GenCoroutineTest(AsyncTestCase): yield yieldable # First, confirm the behavior without moment: each coroutine # monopolizes the event loop until it finishes. - immediate = Future() + immediate = Future() # type: Future[None] immediate.set_result(None) yield [f('a', immediate), f('b', immediate)] self.assertEqual(''.join(calls), 'aaaaabbbbb') @@ -539,7 +544,10 @@ class GenCoroutineTest(AsyncTestCase): pass local_var = Foo() self.local_ref = weakref.ref(local_var) - yield gen.coroutine(lambda: None)() + + def dummy(): + pass + yield gen.coroutine(dummy)() raise ValueError('Some error') @gen.coroutine @@ -610,20 +618,11 @@ class GenCoroutineUnfinishedSequenceHandler(RequestHandler): self.write("3") -class GenCoroutineExceptionHandler(RequestHandler): - @gen.coroutine - def get(self): - # This test depends on the order of the two decorators. - io_loop = self.request.connection.stream.io_loop - yield gen.Task(io_loop.add_callback) - raise Exception("oops") - - # "Undecorated" here refers to the absence of @asynchronous. class UndecoratedCoroutinesHandler(RequestHandler): @gen.coroutine def prepare(self): - self.chunks = [] + self.chunks = [] # type: List[str] yield gen.moment self.chunks.append('1') @@ -658,7 +657,6 @@ class GenWebTest(AsyncHTTPTestCase): ('/coroutine_sequence', GenCoroutineSequenceHandler), ('/coroutine_unfinished_sequence', GenCoroutineUnfinishedSequenceHandler), - ('/coroutine_exception', GenCoroutineExceptionHandler), ('/undecorated_coroutine', UndecoratedCoroutinesHandler), ('/async_prepare_error', AsyncPrepareErrorHandler), ('/native_coroutine', NativeCoroutineHandler), @@ -672,12 +670,6 @@ class GenWebTest(AsyncHTTPTestCase): response = self.fetch('/coroutine_unfinished_sequence') self.assertEqual(response.body, b"123") - def test_coroutine_exception_handler(self): - # Make sure we get an error and not a timeout - with ExpectLog(app_log, "Uncaught exception GET /coroutine_exception"): - response = self.fetch('/coroutine_exception') - self.assertEqual(500, response.code) - def test_undecorated_coroutines(self): response = self.fetch('/undecorated_coroutine') self.assertEqual(response.body, b'123') @@ -701,7 +693,7 @@ class WithTimeoutTest(AsyncTestCase): @gen_test def test_completes_before_timeout(self): - future = Future() + future = Future() # type: Future[str] self.io_loop.add_timeout(datetime.timedelta(seconds=0.1), lambda: future.set_result('asdf')) result = yield gen.with_timeout(datetime.timedelta(seconds=3600), @@ -710,7 +702,7 @@ class WithTimeoutTest(AsyncTestCase): @gen_test def test_fails_before_timeout(self): - future = Future() + future = Future() # type: Future[str] self.io_loop.add_timeout( datetime.timedelta(seconds=0.1), lambda: future.set_exception(ZeroDivisionError())) @@ -720,7 +712,7 @@ class WithTimeoutTest(AsyncTestCase): @gen_test def test_already_resolved(self): - future = Future() + future = Future() # type: Future[str] future.set_result('asdf') result = yield gen.with_timeout(datetime.timedelta(seconds=3600), future) @@ -739,7 +731,9 @@ class WithTimeoutTest(AsyncTestCase): # A concurrent future that is resolved before we even submit it # to with_timeout. with futures.ThreadPoolExecutor(1) as executor: - f = executor.submit(lambda: None) + def dummy(): + pass + f = executor.submit(dummy) f.result() # wait for completion yield gen.with_timeout(datetime.timedelta(seconds=3600), f) @@ -758,16 +752,16 @@ class WaitIteratorTest(AsyncTestCase): self.assertTrue(g.done(), 'empty generator iterated') with self.assertRaises(ValueError): - g = gen.WaitIterator(False, bar=False) + g = gen.WaitIterator(Future(), bar=Future()) self.assertEqual(g.current_index, None, "bad nil current index") self.assertEqual(g.current_future, None, "bad nil current future") @gen_test def test_already_done(self): - f1 = Future() - f2 = Future() - f3 = Future() + f1 = Future() # type: Future[int] + f2 = Future() # type: Future[int] + f3 = Future() # type: Future[int] f1.set_result(24) f2.set_result(42) f3.set_result(84) @@ -828,7 +822,7 @@ class WaitIteratorTest(AsyncTestCase): @gen_test def test_iterator(self): - futures = [Future(), Future(), Future(), Future()] + futures = [Future(), Future(), Future(), Future()] # type: List[Future[int]] self.finish_coroutines(0, futures) @@ -858,7 +852,7 @@ class WaitIteratorTest(AsyncTestCase): # Recreate the previous test with py35 syntax. It's a little clunky # because of the way the previous test handles an exception on # a single iteration. - futures = [Future(), Future(), Future(), Future()] + futures = [Future(), Future(), Future(), Future()] # type: List[Future[int]] self.finish_coroutines(0, futures) self.finished = False @@ -908,15 +902,15 @@ class RunnerGCTest(AsyncTestCase): def test_gc(self): # Github issue 1769: Runner objects can get GCed unexpectedly # while their future is alive. - weakref_scope = [None] + weakref_scope = [None] # type: List[Optional[weakref.ReferenceType]] def callback(): gc.collect(2) - weakref_scope[0]().set_result(123) + weakref_scope[0]().set_result(123) # type: ignore @gen.coroutine def tester(): - fut = Future() + fut = Future() # type: Future[int] weakref_scope[0] = weakref.ref(fut) self.io_loop.add_callback(callback) yield fut @@ -931,7 +925,7 @@ class RunnerGCTest(AsyncTestCase): # their loop is closed, even if they're involved in a reference # cycle. loop = self.get_new_ioloop() - result = [] + result = [] # type: List[Optional[bool]] wfut = [] @gen.coroutine @@ -947,7 +941,7 @@ class RunnerGCTest(AsyncTestCase): @gen.coroutine def do_something(): fut = infinite_coro() - fut._refcycle = fut + fut._refcycle = fut # type: ignore wfut.append(weakref.ref(fut)) yield gen.sleep(0.2) @@ -976,13 +970,13 @@ class RunnerGCTest(AsyncTestCase): result.append(None) loop = self.get_new_ioloop() - result = [] + result = [] # type: List[Optional[bool]] wfut = [] @gen.coroutine def do_something(): fut = asyncio.get_event_loop().create_task(infinite_coro(result)) - fut._refcycle = fut + fut._refcycle = fut # type: ignore wfut.append(weakref.ref(fut)) yield gen.sleep(0.2)