from concurrent import futures
import functools
import sys
+import types
+
+import typing
+from typing import Any, Callable, Optional, Tuple
+
+_T = typing.TypeVar('_T')
class ReturnValueIgnoredError(Exception):
FUTURES = (futures.Future, Future)
-def is_future(x):
+def is_future(x: Any) -> bool:
return isinstance(x, FUTURES)
class DummyExecutor(object):
- def submit(self, fn, *args, **kwargs):
- future = Future()
+ def submit(self, fn: Callable[..., _T], *args: Any, **kwargs: Any) -> 'Future[_T]':
+ future = Future() # type: Future
try:
future_set_result_unless_cancelled(future, fn(*args, **kwargs))
except Exception:
future_set_exc_info(future, sys.exc_info())
return future
- def shutdown(self, wait=True):
+ def shutdown(self, wait: bool=True) -> None:
pass
dummy_executor = DummyExecutor()
-def run_on_executor(*args, **kwargs):
+def run_on_executor(*args: Any, **kwargs: Any) -> Callable:
"""Decorator to run a synchronous method asynchronously on an executor.
The decorated method may be called with a ``callback`` keyword
The ``callback`` argument was removed.
"""
- def run_on_executor_decorator(fn):
+ # Fully type-checking decorators is tricky, and this one is
+ # discouraged anyway so it doesn't have all the generic magic.
+ def run_on_executor_decorator(fn: Callable) -> Callable[..., Future]:
executor = kwargs.get("executor", "executor")
@functools.wraps(fn)
- def wrapper(self, *args, **kwargs):
- async_future = Future()
+ def wrapper(self: Any, *args: Any, **kwargs: Any) -> Future:
+ async_future = Future() # type: Future
conc_future = getattr(self, executor).submit(fn, self, *args, **kwargs)
chain_future(conc_future, async_future)
return async_future
_NO_RESULT = object()
-def chain_future(a, b):
+def chain_future(a: 'Future[_T]', b: 'Future[_T]') -> None:
"""Chain two futures together so that when one completes, so does the other.
The result (success or failure) of ``a`` will be copied to ``b``, unless
`concurrent.futures.Future`.
"""
- def copy(future):
+ def copy(future: 'Future[_T]') -> None:
assert future is a
if b.done():
return
if (hasattr(a, 'exc_info') and
- a.exc_info() is not None):
- future_set_exc_info(b, a.exc_info())
+ a.exc_info() is not None): # type: ignore
+ future_set_exc_info(b, a.exc_info()) # type: ignore
elif a.exception() is not None:
b.set_exception(a.exception())
else:
IOLoop.current().add_future(a, copy)
-def future_set_result_unless_cancelled(future, value):
+def future_set_result_unless_cancelled(future: 'Future[_T]', value: _T) -> None:
"""Set the given ``value`` as the `Future`'s result, if not cancelled.
Avoids asyncio.InvalidStateError when calling set_result() on
future.set_result(value)
-def future_set_exc_info(future, exc_info):
+def future_set_exc_info(future: 'Future[_T]',
+ exc_info: Tuple[Optional[type], Optional[BaseException],
+ Optional[types.TracebackType]]) -> None:
"""Set the given ``exc_info`` as the `Future`'s exception.
Understands both `asyncio.Future` and Tornado's extensions to
"""
if hasattr(future, 'set_exc_info'):
# Tornado's Future
- future.set_exc_info(exc_info)
+ future.set_exc_info(exc_info) # type: ignore
else:
# asyncio.Future
+ if exc_info[1] is None:
+ raise Exception("future_set_exc_info called with no exception")
future.set_exception(exc_info[1])
-def future_add_done_callback(future, callback):
+def future_add_done_callback(future: 'Future[_T]',
+ callback: Callable[['Future[_T]'], None]) -> None:
"""Arrange to call ``callback`` when ``future`` is complete.
``callback`` is invoked with one argument, the ``future``.