# License for the specific language governing permissions and limitations
# under the License.
+# type: ignore
+
import os
import platform
import sys
) -> Dict[str, Any]:
handler = cast(RequestHandler, self)
if b"is_valid:true" not in response.body:
- raise AuthError("Invalid OpenID response: %s" % response.body)
+ raise AuthError("Invalid OpenID response: %r" % response.body)
# Make sure we got back at least an email from attribute exchange
ax_ns = None
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
extra_params: Optional[Dict[str, Any]] = None,
- scope: Optional[str] = None,
+ scope: Optional[List[str]] = None,
response_type: str = "code",
) -> None:
"""Redirects the user to obtain OAuth authorization for this service.
# Unfortunately the errno returned in this case does not
# appear to be consistent, so we can't easily check for
# this error specifically.
- os.spawnv( # type: ignore
- os.P_NOWAIT, sys.executable, [sys.executable] + argv
+ os.spawnv(
+ os.P_NOWAIT, sys.executable, [sys.executable] + argv # type: ignore
)
# At this point the IOLoop has been closed and finally
# blocks will experience errors if we allow the stack to
from tornado.ioloop import IOLoop
from tornado.util import Configurable
-from typing import Type, Any, Union, Dict, Callable, Optional, cast, Awaitable
+from typing import Type, Any, Union, Dict, Callable, Optional, cast
class HTTPClient(object):
request: Union[str, "HTTPRequest"],
raise_error: bool = True,
**kwargs: Any
- ) -> Awaitable["HTTPResponse"]:
+ ) -> "Future[HTTPResponse]":
"""Executes a request, asynchronously returning an `HTTPResponse`.
The request may be either a string URL or an `HTTPRequest` object.
"""
# Note that some of these attributes go through property setters
# defined below.
- self.headers = headers
+ self.headers = headers # type: ignore
if if_modified_since:
self.headers["If-Modified-Since"] = httputil.format_timestamp(
if_modified_since
self.proxy_auth_mode = proxy_auth_mode
self.url = url
self.method = method
- self.body = body
+ self.body = body # type: ignore
self.body_producer = body_producer
self.auth_username = auth_username
self.auth_password = auth_password
def cookies(self) -> Dict[str, http.cookies.Morsel]:
"""A dictionary of ``http.cookies.Morsel`` objects."""
if not hasattr(self, "_cookies"):
- self._cookies = http.cookies.SimpleCookie()
+ self._cookies = (
+ http.cookies.SimpleCookie()
+ ) # type: http.cookies.SimpleCookie
if "Cookie" in self.headers:
try:
parsed = parse_cookie(self.headers["Cookie"])
del data
def connect(
- self: _IOStreamType, address: tuple, server_hostname: Optional[str] = None
+ self: _IOStreamType, address: Any, server_hostname: Optional[str] = None
) -> "Future[_IOStreamType]":
"""Connects the socket to a remote address without blocking.
results = []
for fam, socktype, proto, canonname, address in addrinfo:
results.append((fam, address))
- return results
+ return results # type: ignore
class DefaultExecutorResolver(Resolver):
except NotImplementedError:
pass
try:
- return os.sysconf("SC_NPROCESSORS_CONF")
+ return os.sysconf("SC_NPROCESSORS_CONF") # type: ignore
except (AttributeError, ValueError):
pass
gen_log.error("Could not detect number of processors; assuming 1")
Availability: Unix
"""
+ if sys.platform == "win32":
+ # The exact form of this condition matters to mypy; it understands
+ # if but not assert in this context.
+ raise Exception("fork not available on windows")
if max_restarts is None:
max_restarts = 100
@classmethod
def _try_cleanup_process(cls, pid: int) -> None:
try:
- ret_pid, status = os.waitpid(pid, os.WNOHANG)
+ ret_pid, status = os.waitpid(pid, os.WNOHANG) # type: ignore
except ChildProcessError:
return
if ret_pid == 0:
subproc.io_loop.add_callback_from_signal(subproc._set_returncode, status)
def _set_returncode(self, status: int) -> None:
- if os.WIFSIGNALED(status):
- self.returncode = -os.WTERMSIG(status)
+ if sys.platform == "win32":
+ self.returncode = -1
else:
- assert os.WIFEXITED(status)
- self.returncode = os.WEXITSTATUS(status)
+ if os.WIFSIGNALED(status):
+ self.returncode = -os.WTERMSIG(status)
+ else:
+ assert os.WIFEXITED(status)
+ self.returncode = os.WEXITSTATUS(status)
# We've taken over wait() duty from the subprocess.Popen
# object. If we don't inform it of the process's return code,
# it will log a warning at destruction in python 3.6+.
self.assertIsInstance(self.executor.submit(accessor).result(), expected_type)
# Clean up to silence leak warnings. Always use asyncio since
# IOLoop doesn't (currently) close the underlying loop.
- self.executor.submit(lambda: asyncio.get_event_loop().close()).result()
+ self.executor.submit(lambda: asyncio.get_event_loop().close()).result() # type: ignore
def test_asyncio_accessor(self):
self.run_policy_test(asyncio.get_event_loop, asyncio.AbstractEventLoop)
raise Exception("user is None")
self.finish(user)
return
- res = self.authenticate_redirect()
+ res = self.authenticate_redirect() # type: ignore
assert res is None
self._OAUTH_AUTHORIZE_URL = test.get_url("/oauth2/server/authorize")
def get(self):
- res = self.authorize_redirect()
+ res = self.authorize_redirect() # type: ignore
assert res is None
)
self.write(user)
else:
- yield self.authorize_redirect(
+ self.authorize_redirect(
redirect_uri=self.request.full_url(),
client_id=self.settings["facebook_api_key"],
extra_params={"scope": "read_stream,offline_access"},
user["access_token"] = access["access_token"]
self.write(user)
else:
- yield self.authorize_redirect(
+ self.authorize_redirect(
redirect_uri=self._OAUTH_REDIRECT_URI,
client_id=self.settings["google_oauth"]["key"],
client_secret=self.settings["google_oauth"]["secret"],
import logging
import re
import socket
+import typing
import unittest
from tornado.concurrent import (
class ClientTestMixin(object):
+ client_class = None # type: typing.Callable
+
def setUp(self):
super(ClientTestMixin, self).setUp() # type: ignore
self.server = CapServer()
self.server.stop()
super(ClientTestMixin, self).tearDown() # type: ignore
- def test_future(self):
+ def test_future(self: typing.Any):
future = self.client.capitalize("hello")
self.io_loop.add_future(future, self.stop)
self.wait()
self.assertEqual(future.result(), "HELLO")
- def test_future_error(self):
+ def test_future_error(self: typing.Any):
future = self.client.capitalize("HELLO")
self.io_loop.add_future(future, self.stop)
self.wait()
- self.assertRaisesRegexp(CapError, "already capitalized", future.result)
+ self.assertRaisesRegexp(CapError, "already capitalized", future.result) # type: ignore
- def test_generator(self):
+ def test_generator(self: typing.Any):
@gen.coroutine
def f():
result = yield self.client.capitalize("hello")
self.io_loop.run_sync(f)
- def test_generator_error(self):
+ def test_generator_error(self: typing.Any):
@gen.coroutine
def f():
with self.assertRaisesRegexp(CapError, "already capitalized"):
import socket
+import typing
from tornado.http1connection import HTTP1Connection
from tornado.httputil import HTTPMessageDelegate
class HTTP1ConnectionTest(AsyncTestCase):
+ code = None # type: typing.Optional[int]
+
def setUp(self):
super(HTTP1ConnectionTest, self).setUp()
self.asyncSetUp()
SUPPORTED_METHODS = RequestHandler.SUPPORTED_METHODS + ("OTHER",) # type: ignore
def method(self):
+ assert self.request.method is not None
self.write(self.request.method)
get = head = post = put = delete = options = patch = other = method # type: ignore
self.assertEqual(response.code, 200)
self.assertEqual(response.headers["Content-Type"], "text/plain")
self.assertEqual(response.body, b"Hello world!")
+ assert response.request_time is not None
self.assertEqual(int(response.request_time), 0)
response = self.fetch("/hello?name=Ben")
# important thing is that they don't fall back to basic auth
# on an unknown mode.
with ExpectLog(gen_log, "uncaught exception", required=False):
- with self.assertRaises((ValueError, HTTPError)):
+ with self.assertRaises((ValueError, HTTPError)): # type: ignore
self.fetch(
"/auth",
auth_username="Aladdin",
def test_future_http_error(self):
with self.assertRaises(HTTPError) as context:
yield self.http_client.fetch(self.get_url("/notfound"))
+ assert context.exception is not None
+ assert context.exception.response is not None
self.assertEqual(context.exception.code, 404)
self.assertEqual(context.exception.response.code, 404)
response = yield self.http_client.fetch(request)
self.assertEqual(response.code, 200)
- with self.assertRaises((ValueError, HTTPError)) as context:
+ with self.assertRaises((ValueError, HTTPError)) as context: # type: ignore
request = HTTPRequest(url, network_interface="not-interface-or-ip")
yield self.http_client.fetch(request)
self.assertIn("not-interface-or-ip", str(context.exception))
self.assertLess(response.request_time, 1.0)
# A very crude check to make sure that start_time is based on
# wall time and not the monotonic clock.
+ assert response.start_time is not None
self.assertLess(abs(response.start_time - start_time), 1.0)
for k, v in response.time_info.items():
class HTTPResponseTestCase(unittest.TestCase):
def test_str(self):
response = HTTPResponse( # type: ignore
- HTTPRequest("http://example.com"), 200, headers={}, buffer=BytesIO()
+ HTTPRequest("http://example.com"), 200, buffer=BytesIO()
)
s = str(response)
self.assertTrue(s.startswith("HTTPResponse("))
class HandlerBaseTestCase(AsyncHTTPTestCase):
+ Handler = None
+
def get_app(self):
return Application([("/", self.__class__.Handler)])
def get_ssl_version(self):
raise NotImplementedError()
- def test_ssl(self):
+ def test_ssl(self: typing.Any):
response = self.fetch("/")
self.assertEqual(response.body, b"Hello world")
- def test_large_post(self):
+ def test_large_post(self: typing.Any):
response = self.fetch("/", method="POST", body="A" * 5000)
self.assertEqual(response.body, b"Got 5000 bytes in POST")
- def test_non_ssl_request(self):
+ def test_non_ssl_request(self: typing.Any):
# Make sure the server closes the connection when it gets a non-ssl
# connection, rather than waiting for a timeout or otherwise
# misbehaving.
with ExpectLog(gen_log, "(SSL Error|uncaught exception)"):
with ExpectLog(gen_log, "Uncaught exception", required=False):
- with self.assertRaises((IOError, HTTPError)):
+ with self.assertRaises((IOError, HTTPError)): # type: ignore
self.fetch(
self.get_url("/").replace("https:", "http:"),
request_timeout=3600,
raise_error=True,
)
- def test_error_logging(self):
+ def test_error_logging(self: typing.Any):
# No stack traces are logged for SSL errors.
with ExpectLog(gen_log, "SSL Error") as expect_log:
- with self.assertRaises((IOError, HTTPError)):
+ with self.assertRaises((IOError, HTTPError)): # type: ignore
self.fetch(
self.get_url("/").replace("https:", "http:"), raise_error=True
)
self.close()
-class GzipBaseTest(object):
+class GzipBaseTest(AsyncHTTPTestCase):
def get_app(self):
return Application([("/", EchoHandler)])
self.bytes_read = 0
def prepare(self):
+ conn = typing.cast(HTTP1Connection, self.request.connection)
if "expected_size" in self.request.arguments:
- self.request.connection.set_max_body_size(
- int(self.get_argument("expected_size"))
- )
+ conn.set_max_body_size(int(self.get_argument("expected_size")))
if "body_timeout" in self.request.arguments:
- self.request.connection.set_body_timeout(
- float(self.get_argument("body_timeout"))
- )
+ conn.set_body_timeout(float(self.get_argument("body_timeout")))
def data_received(self, data):
self.bytes_read += len(data)
test.calls += 1
old_add_callback(callback, *args, **kwargs)
- loop.add_callback = types.MethodType(add_callback, loop)
- loop.add_callback(lambda: {})
- loop.add_callback(lambda: [])
+ loop.add_callback = types.MethodType(add_callback, loop) # type: ignore
+ loop.add_callback(lambda: {}) # type: ignore
+ loop.add_callback(lambda: []) # type: ignore
loop.add_timeout(datetime.timedelta(milliseconds=50), loop.stop)
loop.start()
self.assertLess(self.calls, 10)
self.io_loop.add_handler(client.fileno(), handler, IOLoop.READ)
self.io_loop.add_timeout(
- self.io_loop.time() + 0.01, functools.partial(server.send, b"asdf")
+ self.io_loop.time() + 0.01, functools.partial(server.send, b"asdf") # type: ignore
)
self.wait()
self.io_loop.remove_handler(client.fileno())
# automatically set as current.
class TestIOLoopCurrent(unittest.TestCase):
def setUp(self):
- self.io_loop = None
+ self.io_loop = None # type: typing.Optional[IOLoop]
IOLoop.clear_current()
def tearDown(self):
def f():
self.current_io_loop = IOLoop.current()
+ assert self.io_loop is not None
self.io_loop.stop()
self.io_loop.add_callback(f)
import random
import socket
import ssl
+import typing
from unittest import mock
import unittest
def get_app(self):
return Application([("/", HelloHandler)])
- def test_connection_closed(self):
+ def test_connection_closed(self: typing.Any):
# When a server sends a response and then closes the connection,
# the client must be allowed to read the data before the IOStream
# closes itself. Epoll reports closed connections with a separate
response.rethrow()
@gen_test
- def test_read_until_close(self):
+ def test_read_until_close(self: typing.Any):
stream = self._make_client_iostream()
yield stream.connect(("127.0.0.1", self.get_http_port()))
stream.write(b"GET / HTTP/1.0\r\n\r\n")
self.assertTrue(data.endswith(b"Hello"))
@gen_test
- def test_read_zero_bytes(self):
+ def test_read_zero_bytes(self: typing.Any):
self.stream = self._make_client_iostream()
yield self.stream.connect(("127.0.0.1", self.get_http_port()))
self.stream.write(b"GET / HTTP/1.0\r\n\r\n")
self.stream.close()
@gen_test
- def test_write_while_connecting(self):
+ def test_write_while_connecting(self: typing.Any):
stream = self._make_client_iostream()
connect_fut = stream.connect(("127.0.0.1", self.get_http_port()))
# unlike the previous tests, try to write before the connection
stream.close()
@gen_test
- def test_future_interface(self):
+ def test_future_interface(self: typing.Any):
"""Basic test of IOStream's ability to return Futures."""
stream = self._make_client_iostream()
connect_result = yield stream.connect(("127.0.0.1", self.get_http_port()))
stream.close()
@gen_test
- def test_future_close_while_reading(self):
+ def test_future_close_while_reading(self: typing.Any):
stream = self._make_client_iostream()
yield stream.connect(("127.0.0.1", self.get_http_port()))
yield stream.write(b"GET / HTTP/1.0\r\n\r\n")
stream.close()
@gen_test
- def test_future_read_until_close(self):
+ def test_future_read_until_close(self: typing.Any):
# Ensure that the data comes through before the StreamClosedError.
stream = self._make_client_iostream()
yield stream.connect(("127.0.0.1", self.get_http_port()))
rs.close()
@gen_test
- def test_future_delayed_close_callback(self):
+ def test_future_delayed_close_callback(self: typing.Any):
# Same as test_delayed_close_callback, but with the future interface.
rs, ws = yield self.make_iostream_pair()
rs.close()
@gen_test
- def test_close_buffered_data(self):
+ def test_close_buffered_data(self: typing.Any):
# Similar to the previous test, but with data stored in the OS's
# socket buffers instead of the IOStream's read buffer. Out-of-band
# close notifications must be delayed until all data has been
rs.close()
@gen_test
- def test_read_until_close_after_close(self):
+ def test_read_until_close_after_close(self: typing.Any):
# Similar to test_delayed_close_callback, but read_until_close takes
# a separate code path so test it separately.
rs, ws = yield self.make_iostream_pair()
rs.close()
@gen_test
- def test_large_read_until(self):
+ def test_large_read_until(self: typing.Any):
# Performance test: read_until used to have a quadratic component
# so a read_until of 4MB would take 8 seconds; now it takes 0.25
# seconds.
rs.close()
@gen_test
- def test_close_callback_with_pending_read(self):
+ def test_close_callback_with_pending_read(self: typing.Any):
# Regression test for a bug that was introduced in 2.3
# where the IOStream._close_callback would never be called
# if there were pending reads.
rs.close()
@gen_test
- def test_future_close_callback(self):
+ def test_future_close_callback(self: typing.Any):
# Regression test for interaction between the Future read interfaces
# and IOStream._maybe_add_error_listener.
rs, ws = yield self.make_iostream_pair()
ws.close()
@gen_test
- def test_write_memoryview(self):
+ def test_write_memoryview(self: typing.Any):
rs, ws = yield self.make_iostream_pair()
try:
fut = rs.read_bytes(4)
rs.close()
@gen_test
- def test_read_bytes_partial(self):
+ def test_read_bytes_partial(self: typing.Any):
rs, ws = yield self.make_iostream_pair()
try:
# Ask for more than is available with partial=True
rs.close()
@gen_test
- def test_read_until_max_bytes(self):
+ def test_read_until_max_bytes(self: typing.Any):
rs, ws = yield self.make_iostream_pair()
closed = Event()
rs.set_close_callback(closed.set)
rs.close()
@gen_test
- def test_read_until_max_bytes_inline(self):
+ def test_read_until_max_bytes_inline(self: typing.Any):
rs, ws = yield self.make_iostream_pair()
closed = Event()
rs.set_close_callback(closed.set)
rs.close()
@gen_test
- def test_read_until_max_bytes_ignores_extra(self):
+ def test_read_until_max_bytes_ignores_extra(self: typing.Any):
rs, ws = yield self.make_iostream_pair()
closed = Event()
rs.set_close_callback(closed.set)
rs.close()
@gen_test
- def test_read_until_regex_max_bytes(self):
+ def test_read_until_regex_max_bytes(self: typing.Any):
rs, ws = yield self.make_iostream_pair()
closed = Event()
rs.set_close_callback(closed.set)
rs.close()
@gen_test
- def test_read_until_regex_max_bytes_inline(self):
+ def test_read_until_regex_max_bytes_inline(self: typing.Any):
rs, ws = yield self.make_iostream_pair()
closed = Event()
rs.set_close_callback(closed.set)
rs.close()
@gen_test
- def test_small_reads_from_large_buffer(self):
+ def test_small_reads_from_large_buffer(self: typing.Any):
# 10KB buffer size, 100KB available to read.
# Read 1KB at a time and make sure that the buffer is not eagerly
# filled.
rs.close()
@gen_test
- def test_small_read_untils_from_large_buffer(self):
+ def test_small_read_untils_from_large_buffer(self: typing.Any):
# 10KB buffer size, 100KB available to read.
# Read 1KB at a time and make sure that the buffer is not eagerly
# filled.
ws.close()
@gen_test
- def test_read_into(self):
+ def test_read_into(self: typing.Any):
rs, ws = yield self.make_iostream_pair()
def sleep_some():
rs.close()
@gen_test
- def test_read_into_partial(self):
+ def test_read_into_partial(self: typing.Any):
rs, ws = yield self.make_iostream_pair()
try:
rs.close()
@gen_test
- def test_read_into_zero_bytes(self):
+ def test_read_into_zero_bytes(self: typing.Any):
rs, ws = yield self.make_iostream_pair()
try:
buf = bytearray()
raise NotImplementedError()
@gen.coroutine
- def make_iostream_pair(self, **kwargs):
+ def make_iostream_pair(self: typing.Any, **kwargs):
listener, port = bind_unused_port()
server_stream_fut = Future() # type: Future[IOStream]
raise gen.Return((server_stream, client_stream))
@gen_test
- def test_connection_refused(self):
+ def test_connection_refused(self: typing.Any):
# When a connection is refused, the connect callback should not
# be run. (The kqueue IOLoop used to behave differently from the
# epoll IOLoop in this respect)
self.assertTrue(isinstance(stream.error, ConnectionRefusedError), stream.error)
@gen_test
- def test_gaierror(self):
+ def test_gaierror(self: typing.Any):
# Test that IOStream sets its exc_info on getaddrinfo error.
# It's difficult to reliably trigger a getaddrinfo error;
# some resolvers own't even return errors for malformed names,
self.assertTrue(isinstance(stream.error, socket.gaierror))
@gen_test
- def test_read_until_close_with_error(self):
+ def test_read_until_close_with_error(self: typing.Any):
server, client = yield self.make_iostream_pair()
try:
with mock.patch(
@skipIfNonUnix
@skipPypy3V58
@gen_test
- def test_inline_read_error(self):
+ def test_inline_read_error(self: typing.Any):
# An error on an inline read is raised without logging (on the
# assumption that it will eventually be noticed or logged further
# up the stack).
self.server_stream = None
self.server_accepted = Future() # type: Future[None]
netutil.add_accept_handler(self.listener, self.accept)
- self.client_stream = IOStream(socket.socket())
+ self.client_stream = IOStream(
+ socket.socket()
+ ) # type: typing.Optional[IOStream]
self.io_loop.add_future(
self.client_stream.connect(("127.0.0.1", self.port)), self.stop
)
@gen.coroutine
def client_send_line(self, line):
+ assert self.client_stream is not None
self.client_stream.write(line)
+ assert self.server_stream is not None
recv_line = yield self.server_stream.read_until(b"\r\n")
self.assertEqual(line, recv_line)
@gen.coroutine
def server_send_line(self, line):
+ assert self.server_stream is not None
self.server_stream.write(line)
+ assert self.client_stream is not None
recv_line = yield self.client_stream.read_until(b"\r\n")
self.assertEqual(line, recv_line)
def client_start_tls(self, ssl_options=None, server_hostname=None):
+ assert self.client_stream is not None
client_stream = self.client_stream
self.client_stream = None
return client_stream.start_tls(False, ssl_options, server_hostname)
def server_start_tls(self, ssl_options=None):
+ assert self.server_stream is not None
server_stream = self.server_stream
self.server_stream = None
return server_stream.start_tls(True, ssl_options)
class _ResolverTestMixin(object):
+ resolver = None # type: typing.Any
+
@gen_test
- def test_localhost(self):
+ def test_localhost(self: typing.Any):
addrinfo = yield self.resolver.resolve("localhost", 80, socket.AF_UNSPEC)
self.assertIn((socket.AF_INET, ("127.0.0.1", 80)), addrinfo)
# It is impossible to quickly and consistently generate an error in name
# resolution, so test this case separately, using mocks as needed.
class _ResolverErrorTestMixin(object):
+ resolver = None # type: typing.Any
+
@gen_test
- def test_bad_host(self):
+ def test_bad_host(self: typing.Any):
with self.assertRaises(IOError):
yield self.resolver.resolve("an invalid domain", 80, socket.AF_UNSPEC)
def test_blocking_get_wait(self):
q = queues.Queue() # type: queues.Queue[int]
q.put(0)
- self.io_loop.call_later(0.01, q.put, 1)
- self.io_loop.call_later(0.02, q.put, 2)
+ self.io_loop.call_later(0.01, q.put_nowait, 1)
+ self.io_loop.call_later(0.02, q.put_nowait, 2)
self.assertEqual(0, (yield q.get(timeout=timedelta(seconds=1))))
self.assertEqual(1, (yield q.get(timeout=timedelta(seconds=1))))
def test_blocking_put_wait(self):
q = queues.Queue(1) # type: queues.Queue[int]
q.put_nowait(0)
- self.io_loop.call_later(0.01, q.get)
- self.io_loop.call_later(0.02, q.get)
+
+ def get_and_discard():
+ q.get()
+
+ self.io_loop.call_later(0.01, get_and_discard)
+ self.io_loop.call_later(0.02, get_and_discard)
futures = [q.put(0), q.put(1)]
self.assertFalse(any(f.done() for f in futures))
yield futures
queue_class = queues.Queue
def test_task_done_underflow(self):
- q = self.queue_class()
+ q = self.queue_class() # type: queues.Queue
self.assertRaises(ValueError, q.task_done)
@gen_test
def test_task_done(self):
- q = self.queue_class()
+ q = self.queue_class() # type: queues.Queue
for i in range(100):
q.put_nowait(i)
@gen_test
def test_task_done_delay(self):
# Verify it is task_done(), not get(), that unblocks join().
- q = self.queue_class()
+ q = self.queue_class() # type: queues.Queue
q.put_nowait(0)
- join = q.join()
+ join = asyncio.ensure_future(q.join())
self.assertFalse(join.done())
yield q.get()
self.assertFalse(join.done())
@gen_test
def test_join_empty_queue(self):
- q = self.queue_class()
+ q = self.queue_class() # type: queues.Queue
yield q.join()
yield q.join()
@gen_test
def test_join_timeout(self):
- q = self.queue_class()
+ q = self.queue_class() # type: queues.Queue
q.put(0)
with self.assertRaises(TimeoutError):
yield q.join(timeout=timedelta(seconds=0.01))
result = super(TornadoTextTestRunner, self).run(test)
if result.skipped:
skip_reasons = set(reason for (test, reason) in result.skipped)
- self.stream.write(
+ self.stream.write( # type: ignore
textwrap.fill(
"Some tests were skipped because: %s"
% ", ".join(sorted(skip_reasons))
)
)
- self.stream.write("\n")
+ self.stream.write("\n") # type: ignore
return result
return TornadoTextTestRunner
class SimpleHTTPClientTestMixin(object):
- def get_app(self):
+ def create_client(self, **kwargs):
+ raise NotImplementedError()
+
+ def get_app(self: typing.Any):
# callable objects to finish pending /trigger requests
- self.triggers = collections.deque() # type: typing.Deque[str]
+ self.triggers = (
+ collections.deque()
+ ) # type: typing.Deque[typing.Callable[[], None]]
return Application(
[
url(
gzip=True,
)
- def test_singleton(self):
+ def test_singleton(self: typing.Any):
# Class "constructor" reuses objects on the same IOLoop
self.assertTrue(SimpleAsyncHTTPClient() is SimpleAsyncHTTPClient())
# unless force_instance is used
client2 = io_loop2.run_sync(make_client)
self.assertTrue(client1 is not client2)
- def test_connection_limit(self):
+ def test_connection_limit(self: typing.Any):
with closing(self.create_client(max_clients=2)) as client:
self.assertEqual(client.max_clients, 2)
seen = []
self.assertEqual(len(self.triggers), 0)
@gen_test
- def test_redirect_connection_limit(self):
+ def test_redirect_connection_limit(self: typing.Any):
# following redirects should not consume additional connections
with closing(self.create_client(max_clients=1)) as client:
response = yield client.fetch(self.get_url("/countdown/3"), max_redirects=3)
response.rethrow()
- def test_max_redirects(self):
+ def test_max_redirects(self: typing.Any):
response = self.fetch("/countdown/5", max_redirects=3)
self.assertEqual(302, response.code)
# We requested 5, followed three redirects for 4, 3, 2, then the last
self.assertTrue(response.effective_url.endswith("/countdown/2"))
self.assertTrue(response.headers["Location"].endswith("/countdown/1"))
- def test_header_reuse(self):
+ def test_header_reuse(self: typing.Any):
# Apps may reuse a headers object if they are only passing in constant
# headers like user-agent. The header object should not be modified.
headers = HTTPHeaders({"User-Agent": "Foo"})
self.fetch("/hello", headers=headers)
self.assertEqual(list(headers.get_all()), [("User-Agent", "Foo")])
- def test_see_other_redirect(self):
+ def test_see_other_redirect(self: typing.Any):
for code in (302, 303):
response = self.fetch("/see_other_post", method="POST", body="%d" % code)
self.assertEqual(200, response.code)
@skipOnTravis
@gen_test
- def test_connect_timeout(self):
+ def test_connect_timeout(self: typing.Any):
timeout = 0.1
cleanup_event = Event()
yield gen.sleep(0.2)
@skipOnTravis
- def test_request_timeout(self):
+ def test_request_timeout(self: typing.Any):
timeout = 0.1
if os.name == "nt":
timeout = 0.5
self.io_loop.run_sync(lambda: gen.sleep(0))
@skipIfNoIPv6
- def test_ipv6(self):
+ def test_ipv6(self: typing.Any):
[sock] = bind_sockets(0, "::1", family=socket.AF_INET6)
port = sock.getsockname()[1]
self.http_server.add_socket(sock)
response = self.fetch(url)
self.assertEqual(response.body, b"Hello world!")
- def test_multiple_content_length_accepted(self):
+ def test_multiple_content_length_accepted(self: typing.Any):
response = self.fetch("/content_length?value=2,2")
self.assertEqual(response.body, b"ok")
response = self.fetch("/content_length?value=2,%202,2")
with self.assertRaises(HTTPStreamClosedError):
self.fetch("/content_length?value=2,%202,3", raise_error=True)
- def test_head_request(self):
+ def test_head_request(self: typing.Any):
response = self.fetch("/head", method="HEAD")
self.assertEqual(response.code, 200)
self.assertEqual(response.headers["content-length"], "7")
self.assertFalse(response.body)
- def test_options_request(self):
+ def test_options_request(self: typing.Any):
response = self.fetch("/options", method="OPTIONS")
self.assertEqual(response.code, 200)
self.assertEqual(response.headers["content-length"], "2")
self.assertEqual(response.headers["access-control-allow-origin"], "*")
self.assertEqual(response.body, b"ok")
- def test_no_content(self):
+ def test_no_content(self: typing.Any):
response = self.fetch("/no_content")
self.assertEqual(response.code, 204)
# 204 status shouldn't have a content-length
# in HTTP204NoContentTestCase.
self.assertNotIn("Content-Length", response.headers)
- def test_host_header(self):
+ def test_host_header(self: typing.Any):
host_re = re.compile(b"^127.0.0.1:[0-9]+$")
response = self.fetch("/host_echo")
self.assertTrue(host_re.match(response.body))
response = self.fetch(url)
self.assertTrue(host_re.match(response.body), response.body)
- def test_connection_refused(self):
+ def test_connection_refused(self: typing.Any):
cleanup_func, port = refusing_port()
self.addCleanup(cleanup_func)
with ExpectLog(gen_log, ".*", required=False):
expected_message = os.strerror(errno.ECONNREFUSED)
self.assertTrue(expected_message in str(cm.exception), cm.exception)
- def test_queue_timeout(self):
+ def test_queue_timeout(self: typing.Any):
with closing(self.create_client(max_clients=1)) as client:
# Wait for the trigger request to block, not complete.
fut1 = client.fetch(self.get_url("/trigger"), request_timeout=10)
self.triggers.popleft()()
self.io_loop.run_sync(lambda: fut1)
- def test_no_content_length(self):
+ def test_no_content_length(self: typing.Any):
response = self.fetch("/no_content_length")
if response.body == b"HTTP/1 required":
self.skipTest("requires HTTP/1.x")
yield gen.moment
yield write(b"5678")
- def test_sync_body_producer_chunked(self):
+ def test_sync_body_producer_chunked(self: typing.Any):
response = self.fetch(
"/echo_post", method="POST", body_producer=self.sync_body_producer
)
response.rethrow()
self.assertEqual(response.body, b"12345678")
- def test_sync_body_producer_content_length(self):
+ def test_sync_body_producer_content_length(self: typing.Any):
response = self.fetch(
"/echo_post",
method="POST",
response.rethrow()
self.assertEqual(response.body, b"12345678")
- def test_async_body_producer_chunked(self):
+ def test_async_body_producer_chunked(self: typing.Any):
response = self.fetch(
"/echo_post", method="POST", body_producer=self.async_body_producer
)
response.rethrow()
self.assertEqual(response.body, b"12345678")
- def test_async_body_producer_content_length(self):
+ def test_async_body_producer_content_length(self: typing.Any):
response = self.fetch(
"/echo_post",
method="POST",
response.rethrow()
self.assertEqual(response.body, b"12345678")
- def test_native_body_producer_chunked(self):
+ def test_native_body_producer_chunked(self: typing.Any):
async def body_producer(write):
await write(b"1234")
import asyncio
response.rethrow()
self.assertEqual(response.body, b"12345678")
- def test_native_body_producer_content_length(self):
+ def test_native_body_producer_content_length(self: typing.Any):
async def body_producer(write):
await write(b"1234")
import asyncio
response.rethrow()
self.assertEqual(response.body, b"12345678")
- def test_100_continue(self):
+ def test_100_continue(self: typing.Any):
response = self.fetch(
"/echo_post", method="POST", body=b"1234", expect_100_continue=True
)
self.assertEqual(response.body, b"1234")
- def test_100_continue_early_response(self):
+ def test_100_continue_early_response(self: typing.Any):
def body_producer(write):
raise Exception("should not be called")
)
self.assertEqual(response.code, 403)
- def test_streaming_follow_redirects(self):
+ def test_streaming_follow_redirects(self: typing.Any):
# When following redirects, header and streaming callbacks
# should only be called for the final result.
# TODO(bdarnell): this test belongs in httpclient_test instead of
stream = yield self.client.connect(
host, port, source_ip=source_ip, source_port=source_port
)
+ assert self.server is not None
server_stream = yield self.server.queue.get()
with closing(stream):
stream.write(b"hello")
super(ConnectorTest, self).setUp()
self.connect_futures = (
{}
- ) # type: Dict[Tuple[int, Tuple], Future[ConnectorTest.FakeStream]]
- self.streams = {} # type: Dict[Tuple, ConnectorTest.FakeStream]
+ ) # type: Dict[Tuple[int, typing.Any], Future[ConnectorTest.FakeStream]]
+ self.streams = {} # type: Dict[typing.Any, ConnectorTest.FakeStream]
self.addrinfo = [(AF1, "a"), (AF1, "b"), (AF2, "c"), (AF2, "d")]
def tearDown(self):
self.new_loop.close()
def test_loop(self):
- self.assertIs(self.io_loop.asyncio_loop, self.new_loop)
+ self.assertIs(self.io_loop.asyncio_loop, self.new_loop) # type: ignore
if __name__ == "__main__":
To use, define a nested class named ``Handler``.
"""
+ Handler = None
+
def get_handlers(self):
return [("/", self.Handler)]
# don't call super.__init__
self._cookies = {} # type: typing.Dict[str, bytes]
if key_version is None:
- self.application = ObjectDict(settings=dict(cookie_secret=cookie_secret))
+ self.application = ObjectDict( # type: ignore
+ settings=dict(cookie_secret=cookie_secret)
+ )
else:
- self.application = ObjectDict(
+ self.application = ObjectDict( # type: ignore
settings=dict(cookie_secret=cookie_secret, key_version=key_version)
)
class FinalReturnTest(WebTestCase):
+ final_return = None # type: Future
+
def get_handlers(self):
test = self
class GetCookieHandler(RequestHandler):
def get(self):
- self.write(self.get_cookie("foo", "default"))
+ cookie = self.get_cookie("foo", "default")
+ assert cookie is not None
+ self.write(cookie)
class SetCookieDomainHandler(RequestHandler):
def get(self):
def test_set_cookie_expires_days(self):
response = self.fetch("/set_expires_days")
header = response.headers.get("Set-Cookie")
+ assert header is not None
match = re.match("foo=bar; expires=(?P<expires>.+); Path=/", header)
assert match is not None
for key in self.request.arguments:
if type(key) != str:
raise Exception("incorrect type for key: %r" % type(key))
- for value in self.request.arguments[key]:
- if type(value) != bytes:
- raise Exception("incorrect type for value: %r" % type(value))
- for value in self.get_arguments(key):
- if type(value) != unicode_type:
- raise Exception("incorrect type for value: %r" % type(value))
+ for bvalue in self.request.arguments[key]:
+ if type(bvalue) != bytes:
+ raise Exception("incorrect type for value: %r" % type(bvalue))
+ for svalue in self.get_arguments(key):
+ if type(svalue) != unicode_type:
+ raise Exception("incorrect type for value: %r" % type(svalue))
for arg in path_args:
if type(arg) != unicode_type:
raise Exception("incorrect type for path arg: %r" % type(arg))
class RedirectHandler(RequestHandler):
def get(self):
if self.get_argument("permanent", None) is not None:
- self.redirect("/", permanent=int(self.get_argument("permanent")))
+ self.redirect("/", permanent=bool(int(self.get_argument("permanent"))))
elif self.get_argument("status", None) is not None:
self.redirect("/", status=int(self.get_argument("status")))
else:
elif self.get_argument("source", None) == "body":
method = self.get_body_argument
else:
- method = self.get_argument
+ method = self.get_argument # type: ignore
self.finish(method("foo", "default"))
def get_handlers(self):
class StaticUrlHandler(RequestHandler):
def get(self, path):
- with_v = int(self.get_argument("include_version", 1))
+ with_v = int(self.get_argument("include_version", "1"))
self.write(self.static_url(path, include_version=with_v))
class AbsoluteStaticUrlHandler(StaticUrlHandler):
def get(self):
reason = self.request.arguments.get("reason", [])
self.set_status(
- int(self.get_argument("code")), reason=reason[0] if reason else None
+ int(self.get_argument("code")),
+ reason=to_unicode(reason[0]) if reason else None,
)
def get_http_client(self):
def render(self, x):
return "In MyModule(%s) with handler value %s." % (
x,
- self.handler.value(),
+ typing.cast(UIMethodUIModuleTest.Handler, self.handler).value(),
)
loader = DictLoader(
class AllHTTPMethodsTest(SimpleHandlerTestCase):
class Handler(RequestHandler):
def method(self):
+ assert self.request.method is not None
self.write(self.request.method)
get = delete = options = post = put = method # type: ignore
return SimpleAsyncHTTPClient()
# Test all the slightly different code paths for fixed, chunked, etc bodies.
- def test_flow_control_fixed_body(self):
+ def test_flow_control_fixed_body(self: typing.Any):
response = self.fetch("/", body="abcdefghijklmnopqrstuvwxyz", method="POST")
response.rethrow()
self.assertEqual(
),
)
- def test_flow_control_chunked_body(self):
+ def test_flow_control_chunked_body(self: typing.Any):
chunks = [b"abcd", b"efgh", b"ijkl"]
@gen.coroutine
),
)
- def test_flow_control_compressed_body(self):
+ def test_flow_control_compressed_body(self: typing.Any):
bytesio = BytesIO()
gzip_file = gzip.GzipFile(mode="w", fileobj=bytesio)
gzip_file.write(b"abcdefghijklmnopqrstuvwxyz")
# server should respond gracefully (without logging errors
# because we were unable to write out as many bytes as
# Content-Length said we would)
- self.request.connection.stream.close()
+ self.request.connection.stream.close() # type: ignore
self.write("hello")
else:
# TODO: add a HTTP2-compatible version of this test.
self.write("requires HTTP/1.x")
def test_client_close(self):
- with self.assertRaises((HTTPClientError, unittest.SkipTest)):
+ with self.assertRaises((HTTPClientError, unittest.SkipTest)): # type: ignore
response = self.fetch("/", raise_error=True)
if response.body == b"requires HTTP/1.x":
self.skipTest("requires HTTP/1.x")
self.assertIn("httponly;", response.headers["Set-Cookie"].lower())
self.assertIn("expires=", response.headers["Set-Cookie"].lower())
header = response.headers.get("Set-Cookie")
+ assert header is not None
match = re.match(".*; expires=(?P<expires>.+);.*", header)
assert match is not None
import asyncio
import functools
import traceback
+import typing
import unittest
from tornado.concurrent import Future
try:
# In a websocket context, many RequestHandler methods
# raise RuntimeErrors.
- method()
+ method() # type: ignore
raise Exception("did not get expected exception")
except RuntimeError:
pass
def get_client_compression_options(self):
return None
+ def verify_wire_bytes(self, bytes_in: int, bytes_out: int) -> None:
+ raise NotImplementedError()
+
@gen_test
- def test_message_sizes(self):
+ def test_message_sizes(self: typing.Any):
ws = yield self.ws_connect(
"/echo", compression_options=self.get_client_compression_options()
)
self.verify_wire_bytes(ws.protocol._wire_bytes_in, ws.protocol._wire_bytes_out)
@gen_test
- def test_size_limit(self):
+ def test_size_limit(self: typing.Any):
ws = yield self.ws_connect(
"/limited", compression_options=self.get_client_compression_options()
)
class UncompressedTestMixin(CompressionTestMixin):
"""Specialization of CompressionTestMixin when we expect no compression."""
- def verify_wire_bytes(self, bytes_in, bytes_out):
+ def verify_wire_bytes(self: typing.Any, bytes_in, bytes_out):
# Bytes out includes the 4-byte mask key per message.
self.assertEqual(bytes_out, 3 * (len(self.MESSAGE) + 6))
self.assertEqual(bytes_in, 3 * (len(self.MESSAGE) + 2))
class MaskFunctionMixin(object):
# Subclasses should define self.mask(mask, data)
- def test_mask(self):
+ def mask(self, mask: bytes, data: bytes) -> bytes:
+ raise NotImplementedError()
+
+ def test_mask(self: typing.Any):
self.assertEqual(self.mask(b"abcd", b""), b"")
self.assertEqual(self.mask(b"abcd", b"b"), b"\x03")
self.assertEqual(self.mask(b"abcd", b"54321"), b"TVPVP")
self,
condition: Optional[Callable[..., bool]] = None,
timeout: Optional[float] = None,
- ) -> None:
+ ) -> Any:
"""Runs the `.IOLoop` until stop is called or timeout has passed.
In the event of a timeout, an exception will be thrown. The
# Don't let us accidentally inject bad stuff
raise ValueError("Invalid cookie %r: %r" % (name, value))
if not hasattr(self, "_new_cookie"):
- self._new_cookie = http.cookies.SimpleCookie()
+ self._new_cookie = (
+ http.cookies.SimpleCookie()
+ ) # type: http.cookies.SimpleCookie
if name in self._new_cookie:
del self._new_cookie[name]
self._new_cookie[name] = value
args = [value.status_code, self._request_summary()] + list(value.args)
gen_log.warning(format, *args)
else:
- app_log.error( # type: ignore
+ app_log.error(
"Uncaught exception %s\n%r",
self._request_summary(),
self.request,
- exc_info=(typ, value, tb),
+ exc_info=(typ, value, tb), # type: ignore
)
def _ui_module(self, name: str, module: Type["UIModule"]) -> Callable[..., str]:
rule = super(_ApplicationRouter, self).process_rule(rule)
if isinstance(rule.target, (list, tuple)):
- rule.target = _ApplicationRouter( # type: ignore
- self.application, rule.target
+ rule.target = _ApplicationRouter(
+ self.application, rule.target # type: ignore
)
return rule
self.set_status(426, "Upgrade Required")
self.set_header("Sec-WebSocket-Version", "7, 8, 13")
- stream = None
-
@property
def ping_interval(self) -> Optional[float]:
"""The interval for websocket keep-alive pings.
docs: -r{toxinidir}/docs/requirements.txt
lint: flake8
lint: black==19.10b0
- lint: mypy==0.701
+ lint: mypy==0.740
setenv =
# Treat the extension as mandatory in testing (but not on pypy)