- libgnutls-dev
jobs:
include:
+ # 3.5.2 is interesting because it's the version in ubuntu 16.04, and due to python's
+ # "provisional feature" rules there are significant differences between patch
+ # versions for asyncio and typing.
+ - python: 3.5.2
- python: 3.5
- python: 3.6
- python: pypy3.5-5.10.1
- if [[ $TRAVIS_PYTHON_VERSION != nightly && $TRAVIS_PYTHON_VERSION != 'pypy'* ]]; then export RUN_COVERAGE=1; fi
- if [[ "$RUN_COVERAGE" == 1 ]]; then export TARGET="-m coverage run $TARGET"; fi
# See comments in tox.ini. Disabled on py35 because ResourceWarnings are too noisy there.
- - if [[ $TRAVIS_PYTHON_VERSION != '3.5' ]]; then export PYTHONWARNINGS=error,ignore:::site; fi
+ - if [[ $TRAVIS_PYTHON_VERSION != '3.5'* ]]; then export PYTHONWARNINGS=error,ignore:::site; fi
- python -bb $TARGET
- python -O $TARGET
- LANG=C python $TARGET
def future_set_result_unless_cancelled(
- future: Union["futures.Future[_T]", "Future[_T]"], value: _T
+ future: "Union[futures.Future[_T], Future[_T]]", value: _T
) -> None:
"""Set the given ``value`` as the `Future`'s result, if not cancelled.
def future_set_exception_unless_cancelled(
- future: Union["futures.Future[_T]", "Future[_T]"], exc: BaseException
+ future: "Union[futures.Future[_T], Future[_T]]", exc: BaseException
) -> None:
"""Set the given ``exc`` as the `Future`'s exception.
def future_set_exc_info(
- future: Union["futures.Future[_T]", "Future[_T]"],
+ future: "Union[futures.Future[_T], Future[_T]]",
exc_info: Tuple[
Optional[type], Optional[BaseException], Optional[types.TracebackType]
],
def future_add_done_callback( # noqa: F811
- future: Union["futures.Future[_T]", "Future[_T]"], callback: Callable[..., None]
+ future: "Union[futures.Future[_T], Future[_T]]", callback: Callable[..., None]
) -> None:
"""Arrange to call ``callback`` when ``future`` is complete.
def multi(
children: Union[List[_Yieldable], Dict[Any, _Yieldable]],
- quiet_exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]] = (),
-) -> Union["Future[List]", "Future[Dict]"]:
+ quiet_exceptions: "Union[Type[Exception], Tuple[Type[Exception], ...]]" = (),
+) -> "Union[Future[List], Future[Dict]]":
"""Runs multiple asynchronous operations in parallel.
``children`` may either be a list or a dict whose values are
def multi_future(
children: Union[List[_Yieldable], Dict[Any, _Yieldable]],
- quiet_exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]] = (),
-) -> Union["Future[List]", "Future[Dict]"]:
+ quiet_exceptions: "Union[Type[Exception], Tuple[Type[Exception], ...]]" = (),
+) -> "Union[Future[List], Future[Dict]]":
"""Wait for multiple asynchronous futures in parallel.
Since Tornado 6.0, this function is exactly the same as `multi`.
def with_timeout(
timeout: Union[float, datetime.timedelta],
future: _Yieldable,
- quiet_exceptions: Union[Type[Exception], Tuple[Type[Exception], ...]] = (),
+ quiet_exceptions: "Union[Type[Exception], Tuple[Type[Exception], ...]]" = (),
) -> Future:
"""Wraps a `.Future` (or other yieldable object) in a timeout.
def __exit__(
self,
- typ: Optional[Type[BaseException]],
+ typ: "Optional[Type[BaseException]]",
value: Optional[BaseException],
tb: types.TracebackType,
) -> None:
return connection_header == "keep-alive"
return False
- def _finish_request(self, future: Optional["Future[None]"]) -> None:
+ def _finish_request(self, future: "Optional[Future[None]]") -> None:
self._clear_callbacks()
if not self.is_client and self._disconnect_on_finish:
self.close()
@classmethod
def configure(
- cls, impl: Union[None, str, Type[Configurable]], **kwargs: Any
+ cls, impl: "Union[None, str, Type[Configurable]]", **kwargs: Any
) -> None:
"""Configures the `AsyncHTTPClient` subclass to use.
@classmethod
def configure(
- cls, impl: Union[None, str, Type[Configurable]], **kwargs: Any
+ cls, impl: "Union[None, str, Type[Configurable]]", **kwargs: Any
) -> None:
if asyncio is not None:
from tornado.platform.asyncio import BaseAsyncIOLoop
def add_future(
self,
- future: Union["Future[_T]", "concurrent.futures.Future[_T]"],
+ future: "Union[Future[_T], concurrent.futures.Future[_T]]",
callback: Callable[["Future[_T]"], None],
) -> None:
"""Schedules a callback on the ``IOLoop`` when the given
Optional,
Awaitable,
Callable,
- Type,
Pattern,
Any,
Dict,
from types import TracebackType
if typing.TYPE_CHECKING:
- from typing import Deque, List # noqa: F401
+ from typing import Deque, List, Type # noqa: F401
_IOStreamType = TypeVar("_IOStreamType", bound="IOStream")
bool,
BaseException,
Tuple[
- Optional[Type[BaseException]],
+ "Optional[Type[BaseException]]",
Optional[BaseException],
Optional[TracebackType],
],
def __exit__(
self,
- exc_type: Optional[Type[BaseException]],
+ exc_type: "Optional[Type[BaseException]]",
exc_val: Optional[BaseException],
exc_tb: Optional[types.TracebackType],
) -> None:
def __exit__(
self,
- typ: Optional[Type[BaseException]],
+ typ: "Optional[Type[BaseException]]",
value: Optional[BaseException],
traceback: Optional[types.TracebackType],
) -> None:
async def __aexit__(
self,
- typ: Optional[Type[BaseException]],
+ typ: "Optional[Type[BaseException]]",
value: Optional[BaseException],
tb: Optional[types.TracebackType],
) -> None:
def __exit__(
self,
- typ: Optional[Type[BaseException]],
+ typ: "Optional[Type[BaseException]]",
value: Optional[BaseException],
tb: Optional[types.TracebackType],
) -> None:
async def __aexit__(
self,
- typ: Optional[Type[BaseException]],
+ typ: "Optional[Type[BaseException]]",
value: Optional[BaseException],
tb: Optional[types.TracebackType],
) -> None:
def _handle_exception(
self,
- typ: Optional[Type[BaseException]],
+ typ: "Optional[Type[BaseException]]",
value: Optional[BaseException],
tb: Optional[TracebackType],
) -> bool:
from tornado.web import Application
import typing
-from typing import Tuple, Any, Callable, Type, Dict, Union, Coroutine, Optional
+from typing import Tuple, Any, Callable, Type, Dict, Union, Optional
from types import TracebackType
if typing.TYPE_CHECKING:
+ # Coroutine wasn't added to typing until 3.5.3, so only import it
+ # when mypy is running and use forward references.
+ from typing import Coroutine # noqa: F401
+
_ExcInfoTuple = Tuple[
Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]
]
@typing.overload
def gen_test(
*, timeout: float = None
-) -> Callable[[Callable[..., Union[Generator, Coroutine]]], Callable[..., None]]:
+) -> Callable[[Callable[..., Union[Generator, "Coroutine"]]], Callable[..., None]]:
pass
@typing.overload # noqa: F811
-def gen_test(func: Callable[..., Union[Generator, Coroutine]]) -> Callable[..., None]:
+def gen_test(func: Callable[..., Union[Generator, "Coroutine"]]) -> Callable[..., None]:
pass
def gen_test( # noqa: F811
- func: Callable[..., Union[Generator, Coroutine]] = None, timeout: float = None
+ func: Callable[..., Union[Generator, "Coroutine"]] = None, timeout: float = None
) -> Union[
Callable[..., None],
- Callable[[Callable[..., Union[Generator, Coroutine]]], Callable[..., None]],
+ Callable[[Callable[..., Union[Generator, "Coroutine"]]], Callable[..., None]],
]:
"""Testing equivalent of ``@gen.coroutine``, to be applied to test methods.
if timeout is None:
timeout = get_async_test_timeout()
- def wrap(f: Callable[..., Union[Generator, Coroutine]]) -> Callable[..., None]:
+ def wrap(f: Callable[..., Union[Generator, "Coroutine"]]) -> Callable[..., None]:
# Stack up several decorators to allow us to access the generator
# object itself. In the innermost wrapper, we capture the generator
# and save it in an attribute of self. Next, we run the wrapped
def __exit__(
self,
- typ: Optional[Type[BaseException]],
+ typ: "Optional[Type[BaseException]]",
value: Optional[BaseException],
tb: Optional[TracebackType],
) -> None:
def log_exception(
self,
- typ: Optional[Type[BaseException]],
+ typ: "Optional[Type[BaseException]]",
value: Optional[BaseException],
tb: Optional[TracebackType],
) -> None:
def _run_callback(
self, callback: Callable, *args: Any, **kwargs: Any
- ) -> Optional["Future[Any]"]:
+ ) -> "Optional[Future[Any]]":
"""Runs the given callback with exception handling.
If the callback is a coroutine, returns its Future. On error, aborts the
if handled_future is not None:
await handled_future
- def _handle_message(self, opcode: int, data: bytes) -> Optional["Future[None]"]:
+ def _handle_message(self, opcode: int, data: bytes) -> "Optional[Future[None]]":
"""Execute on_message, returning its Future if it is a coroutine."""
if self.client_terminated:
return None
def log_exception(
self,
- typ: Optional[Type[BaseException]],
+ typ: "Optional[Type[BaseException]]",
value: Optional[BaseException],
tb: Optional[TracebackType],
) -> None:
from tornado import httputil
from tornado.log import access_log
-from typing import List, Tuple, Optional, Type, Callable, Any, Dict, Text
+from typing import List, Tuple, Optional, Callable, Any, Dict, Text
from types import TracebackType
import typing
if typing.TYPE_CHECKING:
+ from typing import Type # noqa: F401
from wsgiref.types import WSGIApplication as WSGIAppType # noqa: F401
headers: List[Tuple[str, str]],
exc_info: Optional[
Tuple[
- Optional[Type[BaseException]],
+ "Optional[Type[BaseException]]",
Optional[BaseException],
Optional[TracebackType],
]