black
flake8
-mypy
+mypy>=0.941
+types-pycurl
tox
filelock==3.0.12
flake8==3.8.4
mccabe==0.6.1
+mypy==0.941
mypy-extensions==0.4.3
-mypy==0.790
packaging==20.4
pathspec==0.8.0
pluggy==0.13.1
regex==2020.10.28
six==1.15.0
toml==0.10.1
+tomli==2.0.1
tox==3.20.1
typed-ast==1.4.1
-typing-extensions==3.7.4.3
+types-pycurl==7.44.7
+typing-extensions==4.1.1
virtualenv==20.1.0
def full_url(self) -> str:
"""Reconstructs the full URL for this request."""
- return self.protocol + "://" + self.host + self.uri
+ return self.protocol + "://" + self.host + self.uri # type: ignore[operator]
def request_time(self) -> float:
"""Returns the amount of time it took for this request to execute."""
# 3) msgfmt tornado_test.po -o tornado_test.mo
# 4) Put the file in the proper location: $LANG/LC_MESSAGES
-_("school")
-pgettext("law", "right")
-pgettext("good", "right")
-pgettext("organization", "club", "clubs", 1)
-pgettext("stick", "club", "clubs", 1)
+_("school") # type: ignore[name-defined]
+pgettext("law", "right") # type: ignore[name-defined]
+pgettext("good", "right") # type: ignore[name-defined]
+pgettext("organization", "club", "clubs", 1) # type: ignore[name-defined]
+pgettext("stick", "club", "clubs", 1) # type: ignore[name-defined]
class InvalidGzipHandler(RequestHandler):
- def get(self):
+ def get(self) -> None:
# set Content-Encoding manually to avoid automatic gzip encoding
self.set_header("Content-Type", "text/plain")
self.set_header("Content-Encoding", "gzip")
# Triggering the potential bug seems to depend on input length.
# This length is taken from the bad-response example reported in
# https://github.com/tornadoweb/tornado/pull/2875 (uncompressed).
- body = "".join("Hello World {}\n".format(i) for i in range(9000))[:149051]
- body = gzip.compress(body.encode(), compresslevel=6) + b"\00"
+ text = "".join("Hello World {}\n".format(i) for i in range(9000))[:149051]
+ body = gzip.compress(text.encode(), compresslevel=6) + b"\00"
self.write(body)
from concurrent.futures import ThreadPoolExecutor
from concurrent import futures
+from collections.abc import Generator
import contextlib
import datetime
import functools
skipIfNonUnix,
skipOnTravis,
)
+from tornado.concurrent import Future
import typing
pc.stop()
self.assertEqual(count, 3)
- def test_periodic_coro(self):
+ def test_periodic_coro(self) -> None:
counts = [0, 0]
- pc = None
@gen.coroutine
- def callback() -> None:
+ def callback() -> "Generator[Future[None], object, None]":
counts[0] += 1
yield gen.sleep(0.025)
counts[1] += 1
self.assertEqual(counts[0], 3)
self.assertEqual(counts[1], 3)
- def test_periodic_async(self):
+ def test_periodic_async(self) -> None:
counts = [0, 0]
- pc = None
async def callback() -> None:
counts[0] += 1
# windows, making this check redundant with skipIfNonUnix, but
# we sometimes enable it on other platforms for testing.
io_loop = IOLoop.current()
- if isinstance(io_loop.selector_loop, AddThreadSelectorEventLoop):
+ if isinstance(
+ io_loop.selector_loop, # type: ignore[attr-defined]
+ AddThreadSelectorEventLoop,
+ ):
self.skipTest("AddThreadSelectorEventLoop not supported")
server, client = yield self.make_iostream_pair()
try:
import textwrap
import unittest
-from tornado.escape import utf8, to_unicode
from tornado import gen
from tornado.iostream import IOStream
from tornado.log import app_log
# processes, each of which prints its task id to stdout (a single
# byte, so we don't have to worry about atomicity of the shared
# stdout stream) and then exits.
- def run_subproc(self, code):
- proc = subprocess.Popen(
- sys.executable, stdin=subprocess.PIPE, stdout=subprocess.PIPE
- )
- proc.stdin.write(utf8(code))
- proc.stdin.close()
- proc.wait()
- stdout = proc.stdout.read()
- proc.stdout.close()
- if proc.returncode != 0:
- raise RuntimeError(
- "Process returned %d. stdout=%r" % (proc.returncode, stdout)
+ def run_subproc(self, code: str) -> str:
+ try:
+ result = subprocess.run(
+ sys.executable,
+ capture_output=True,
+ input=code,
+ encoding="utf8",
+ check=True,
)
- return to_unicode(stdout)
+ except subprocess.CalledProcessError as e:
+ raise RuntimeError(
+ f"Process returned {e.returncode} stdout={e.stdout}"
+ ) from e
+ return result.stdout
def test_single(self):
# As a sanity check, run the single-process version through this test
pass
test = Test("test_foo")
- self.assertIs(inspect.unwrap(test.test_foo), test.test_foo.orig_method)
+ self.assertIs(
+ inspect.unwrap(test.test_foo),
+ test.test_foo.orig_method, # type: ignore[attr-defined]
+ )
class SetUpTearDownTest(unittest.TestCase):
try:
self.orig_loop = asyncio.get_event_loop()
except RuntimeError:
- self.orig_loop = None
+ self.orig_loop = None # type: ignore[assignment]
self.new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.new_loop)
super().setUp()
if inspect.iscoroutinefunction(f):
coro = pre_coroutine
else:
- coro = gen.coroutine(pre_coroutine)
+ coro = gen.coroutine(pre_coroutine) # type: ignore[assignment]
@functools.wraps(coro)
def post_coroutine(self, *args, **kwargs):
request_time = 1000.0 * request.request_time()
assert request.method is not None
assert request.uri is not None
- summary = request.method + " " + request.uri + " (" + request.remote_ip + ")"
+ summary = (
+ request.method # type: ignore[operator]
+ + " "
+ + request.uri
+ + " ("
+ + request.remote_ip
+ + ")"
+ )
log_method("%d %s %.2fms", status_code, summary, request_time)