# via sphinx
babel==2.17.0
# via sphinx
-black==25.1.0
+black==26.3.1
# via -r requirements.in
build==1.2.2.post1
# via pip-tools
# sphinx
# tox
# wheel
-pathspec==0.12.1
+pathspec==1.0.4
# via black
pip-tools==7.4.1
# via -r requirements.in
# via
# build
# pip-tools
+pytokens==0.4.1
+ # via black
requests==2.32.4
# via sphinx
roman-numerals-py==3.1.0
while self._free_list and self._requests:
started += 1
curl = self._free_list.pop()
- (request, callback, queue_start_time) = self._requests.popleft()
+ request, callback, queue_start_time = self._requests.popleft()
# TODO: Don't smuggle extra data on an attribute of the Curl object.
curl.info = { # type: ignore
"headers": httputil.HTTPHeaders(),
if header_line.startswith("HTTP/"):
headers.clear()
try:
- (_version, _code, reason) = httputil.parse_response_start_line(
+ _version, _code, reason = httputil.parse_response_start_line(
header_line
)
header_line = "X-Http-Reason: %s" % reason
from tornado.escape import native_str, parse_qs_bytes, utf8, to_unicode
from tornado.util import ObjectDict, unicode_type
-
# responses is unused in this file, but we re-export it to other files.
# Reference it so pyflakes doesn't complain.
responses
`logging` module. For example, you may wish to send ``tornado.access`` logs
to a separate file for analysis.
"""
+
import logging
import logging.handlers
import sys
#
# The last line of each such test's "main" program should be
# exec(open("run_twice_magic.py").read())
- self.write_files(
- {
- "run_twice_magic.py": """
+ self.write_files({"run_twice_magic.py": """
import os
import sys
tornado.autoreload._reload()
else:
os._exit(0)
- """
- }
- )
+ """})
def tearDown(self):
try:
from tornado.web import Application, RequestHandler
from tornado import gen
-
try:
import pycurl
except ImportError:
request_data = yield stream.read_until(b"\r\n\r\n")
if b"HTTP/1." not in request_data:
self.skipTest("requires HTTP/1.x")
- yield stream.write(
- b"""\
+ yield stream.write(b"""\
HTTP/1.1 200 OK
Transfer-Encoding: chunked
2
0
-""".replace(
- b"\n", b"\r\n"
- )
- )
+""".replace(b"\n", b"\r\n"))
stream.close()
netutil.add_accept_handler(sock, accept_callback) # type: ignore
request_data = yield stream.read_until(b"\r\n\r\n")
if b"HTTP/1." not in request_data:
self.skipTest("requires HTTP/1.x")
- yield stream.write(
- b"""\
+ yield stream.write(b"""\
HTTP/1.1 200 OK
X-XSS-Protection: 1;
\tmode=block
-""".replace(
- b"\n", b"\r\n"
- )
- )
+""".replace(b"\n", b"\r\n"))
stream.close()
netutil.add_accept_handler(sock, accept_callback) # type: ignore
def test_chunked_request_body(self):
# Chunked requests are not widely supported and we don't have a way
# to generate them in AsyncHTTPClient, but HTTPServer will read them.
- self.stream.write(
- b"""\
+ self.stream.write(b"""\
POST /echo HTTP/1.1
Host: 127.0.0.1
Transfer-Encoding: chunked
bar
0
-""".replace(
- b"\n", b"\r\n"
- )
- )
+""".replace(b"\n", b"\r\n"))
start_line, headers, response = self.io_loop.run_sync(
lambda: read_stream_body(self.stream)
)
def test_chunked_request_uppercase(self):
# As per RFC 2616 section 3.6, "Transfer-Encoding" header's value is
# case-insensitive.
- self.stream.write(
- b"""\
+ self.stream.write(b"""\
POST /echo HTTP/1.1
Host: 127.0.0.1
Transfer-Encoding: Chunked
bar
0
-""".replace(
- b"\n", b"\r\n"
- )
- )
+""".replace(b"\n", b"\r\n"))
start_line, headers, response = self.io_loop.run_sync(
lambda: read_stream_body(self.stream)
)
def test_chunked_request_body_invalid_size(self):
# Only hex digits are allowed in chunk sizes. Python's int() function
# also accepts underscores, so make sure we reject them here.
- self.stream.write(
- b"""\
+ self.stream.write(b"""\
POST /echo HTTP/1.1
Host: 127.0.0.1
Transfer-Encoding: chunked
1234567890abcdef1234567890
0
-""".replace(
- b"\n", b"\r\n"
- )
- )
+""".replace(b"\n", b"\r\n"))
with ExpectLog(gen_log, ".*invalid chunk size", level=logging.INFO):
start_line, headers, response = self.io_loop.run_sync(
lambda: read_stream_body(self.stream)
def test_chunked_request_body_duplicate_header(self):
# Repeated Transfer-Encoding headers should be an error (and not confuse
# the chunked-encoding detection to mess up framing).
- self.stream.write(
- b"""\
+ self.stream.write(b"""\
POST /echo HTTP/1.1
Host: 127.0.0.1
Transfer-Encoding: chunked
ok
0
-"""
- )
+""")
with ExpectLog(
gen_log,
".*Unsupported Transfer-Encoding chunked,chunked",
def test_chunked_request_body_unsupported_transfer_encoding(self):
# We don't support transfer-encodings other than chunked.
- self.stream.write(
- b"""\
+ self.stream.write(b"""\
POST /echo HTTP/1.1
Host: 127.0.0.1
Transfer-Encoding: gzip, chunked
ok
0
-"""
- )
+""")
with ExpectLog(
gen_log, ".*Unsupported Transfer-Encoding gzip, chunked", level=logging.INFO
):
def test_chunked_request_body_transfer_encoding_and_content_length(self):
# Transfer-encoding and content-length are mutually exclusive
- self.stream.write(
- b"""\
+ self.stream.write(b"""\
POST /echo HTTP/1.1
Host: 127.0.0.1
Transfer-Encoding: chunked
ok
0
-"""
- )
+""")
with ExpectLog(
gen_log,
".*Message with both Transfer-Encoding and Content-Length",
level=logging.INFO,
):
yield stream.connect(("127.0.0.1", self.get_http_port()))
- stream.write(
- utf8(
- textwrap.dedent(
- f"""\
+ stream.write(utf8(textwrap.dedent(f"""\
POST /echo HTTP/1.1
Host: 127.0.0.1
Content-Length: {value}
Connection: close
1234567890
- """
- ).replace("\n", "\r\n")
- )
- )
+ """).replace("\n", "\r\n")))
yield stream.read_until_close()
@gen_test
Content-Disposition: form-data; name="files"; filename="ab.txt"
Foo
---1234--""".replace(
- b"\n", b"\r\n"
- )
+--1234--""".replace(b"\n", b"\r\n")
args, files = form_data_args()
parse_multipart_form_data(b"1234", data, args, files)
file = files["files"][0]
Content-Disposition: form-data; name=files; filename=ab.txt
Foo
---1234--""".replace(
- b"\n", b"\r\n"
- )
+--1234--""".replace(b"\n", b"\r\n")
args, files = form_data_args()
parse_multipart_form_data(b"1234", data, args, files)
file = files["files"][0]
Content-Disposition: form-data; name="files"; filename="%s"
Foo
---1234--""" % filename.replace(
- "\\", "\\\\"
- ).replace(
- '"', '\\"'
- )
+--1234--""" % filename.replace("\\", "\\\\").replace('"', '\\"')
data = utf8(str_data.replace("\n", "\r\n"))
args, files = form_data_args()
parse_multipart_form_data(b"1234", data, args, files)
Content-Disposition: form-data; name="files"; filename="%s"
Foo
---1234--""" % filename.replace(
- "\\", "\\\\"
- ).replace(
- '"', '\\"'
- )
+--1234--""" % filename.replace("\\", "\\\\").replace('"', '\\"')
data = utf8(str_data.replace("\n", "\r\n"))
args, files = form_data_args()
with self.assertRaises(HTTPInputError) as cm:
Content-Disposition: form-data; name="files"; filename="ab.txt"; filename*=UTF-8''%C3%A1b.txt
Foo
---1234--""".replace(
- b"\n", b"\r\n"
- )
+--1234--""".replace(b"\n", b"\r\n")
args, files = form_data_args()
parse_multipart_form_data(b"1234", data, args, files)
file = files["files"][0]
Content-Disposition: form-data; name="files"; filename="测试.txt"
Foo
---1234--""".encode(
- "utf-8"
- ).replace(
- b"\n", b"\r\n"
- )
+--1234--""".encode("utf-8").replace(b"\n", b"\r\n")
args, files = form_data_args()
parse_multipart_form_data(b"1234", data, args, files)
file = files["files"][0]
Content-Disposition: form-data; name="files"; filename="ab.txt"
Foo
---1234--""".replace(
- b"\n", b"\r\n"
- )
+--1234--""".replace(b"\n", b"\r\n")
args, files = form_data_args()
parse_multipart_form_data(b'"1234"', data, args, files)
file = files["files"][0]
--1234
Foo
---1234--""".replace(
- b"\n", b"\r\n"
- )
+--1234--""".replace(b"\n", b"\r\n")
args, files = form_data_args()
with self.assertRaises(
HTTPInputError, msg="multipart/form-data missing headers"
Content-Disposition: invalid; name="files"; filename="ab.txt"
Foo
---1234--""".replace(
- b"\n", b"\r\n"
- )
+--1234--""".replace(b"\n", b"\r\n")
args, files = form_data_args()
with self.assertRaises(HTTPInputError, msg="Invalid multipart/form-data"):
parse_multipart_form_data(b"1234", data, args, files)
--1234
Content-Disposition: form-data; name="files"; filename="ab.txt"
-Foo--1234--""".replace(
- b"\n", b"\r\n"
- )
+Foo--1234--""".replace(b"\n", b"\r\n")
args, files = form_data_args()
with self.assertRaises(HTTPInputError, msg="Invalid multipart/form-data"):
parse_multipart_form_data(b"1234", data, args, files)
Content-Disposition: form-data; filename="ab.txt"
Foo
---1234--""".replace(
- b"\n", b"\r\n"
- )
+--1234--""".replace(b"\n", b"\r\n")
args, files = form_data_args()
with self.assertRaises(
HTTPInputError, msg="multipart/form-data value missing name"
Foo
--1234--
-""".replace(
- b"\n", b"\r\n"
- )
+""".replace(b"\n", b"\r\n")
args, files = form_data_args()
parse_multipart_form_data(b"1234", data, args, files)
file = files["files"][0]
body = b"""--1234
Content-Disposition: form-data; name="files"; filename="ab.txt"
---1234--""".replace(
- b"\n", b"\r\n"
- )
+--1234--""".replace(b"\n", b"\r\n")
config = ParseMultipartConfig()
args, files = form_data_args()
parse_multipart_form_data(boundary, body, args, files, config=config)
Foo: even
more
lines
-""".replace(
- "\n", "\r\n"
- )
+""".replace("\n", "\r\n")
headers = HTTPHeaders.parse(data)
self.assertEqual(headers["asdf"], "qwer zxcv")
self.assertEqual(headers.get_list("asdf"), ["qwer zxcv"])
from tornado.options import define, add_parse_callback, options
from tornado.test.util import ABT_SKIP_MESSAGE
-
TEST_MODULES = [
"tornado.httputil.doctests",
"tornado.iostream.doctests",
def test_listen_single(self):
# As a sanity check, run the single-process version through this test
# harness too.
- code = textwrap.dedent(
- """
+ code = textwrap.dedent("""
import asyncio
from tornado.tcpserver import TCPServer
asyncio.run(main())
print('012', end='')
- """
- )
+ """)
out, err = self.run_subproc(code)
self.assertEqual("".join(sorted(out)), "012")
self.assertEqual(err, "")
def test_bind_start(self):
- code = textwrap.dedent(
- """
+ code = textwrap.dedent("""
import warnings
from tornado.ioloop import IOLoop
server.start(3)
IOLoop.current().run_sync(lambda: None)
print(task_id(), end='')
- """
- )
+ """)
out, err = self.run_subproc(code)
self.assertEqual("".join(sorted(out)), "012")
self.assertEqual(err, "")
def test_add_sockets(self):
- code = textwrap.dedent(
- """
+ code = textwrap.dedent("""
import asyncio
from tornado.netutil import bind_sockets
from tornado.process import fork_processes, task_id
server.add_sockets(sockets)
asyncio.run(post_fork_main())
print(task_id(), end='')
- """
- )
+ """)
out, err = self.run_subproc(code)
self.assertEqual("".join(sorted(out)), "012")
self.assertEqual(err, "")
def test_listen_multi_reuse_port(self):
- code = textwrap.dedent(
- """
+ code = textwrap.dedent("""
import asyncio
import socket
from tornado.netutil import bind_sockets
server.listen(port, address='127.0.0.1', reuse_port=True)
asyncio.run(main())
print(task_id(), end='')
- """
- )
+ """)
out, err = self.run_subproc(code)
self.assertEqual("".join(sorted(out)), "012")
self.assertEqual(err, "")
self.assertEqual(template.generate(), b"")
def test_try(self):
- template = Template(
- utf8(
- """{% try %}
+ template = Template(utf8("""{% try %}
try{% set y = 1/x %}
{% except %}-except
{% else %}-else
{% finally %}-finally
-{% end %}"""
- )
- )
+{% end %}"""))
self.assertEqual(template.generate(x=1), b"\ntry\n-else\n-finally\n")
self.assertEqual(template.generate(x=0), b"\ntry-except\n-finally\n")
self.assertEqual(template.generate(), b"foo")
def test_break_continue(self):
- template = Template(
- utf8(
- """\
+ template = Template(utf8("""\
{% for i in range(10) %}
{% if i == 2 %}
{% continue %}
{% if i == 6 %}
{% break %}
{% end %}
-{% end %}"""
- )
- )
+{% end %}"""))
result = template.generate()
# remove extraneous whitespace
result = b"".join(result.split())
class StackTraceTest(unittest.TestCase):
def test_error_line_number_expression(self):
- loader = DictLoader(
- {
- "test.html": """one
+ loader = DictLoader({"test.html": """one
two{{1/0}}
three
- """
- }
- )
+ """})
try:
loader.load("test.html").generate()
self.fail("did not get expected exception")
self.assertTrue("# test.html:2" in traceback.format_exc())
def test_error_line_number_directive(self):
- loader = DictLoader(
- {
- "test.html": """one
+ loader = DictLoader({"test.html": """one
two{%if 1/0%}
three{%end%}
- """
- }
- )
+ """})
try:
loader.load("test.html").generate()
self.fail("did not get expected exception")
# Whitespace including newlines is allowed within template tags
# and directives, and this is one way to avoid long lines while
# keeping extra whitespace out of the rendered output.
- loader = DictLoader(
- {
- "foo.txt": """\
+ loader = DictLoader({"foo.txt": """\
{% for i in items
%}{% if i > 0 %}, {% end %}{#
#}{{i
}}{% end
-%}"""
- }
- )
+%}"""})
self.assertEqual(
loader.load("foo.txt").generate(items=range(5)), b"0, 1, 2, 3, 4"
)
self.assertEqual(loader.load("bar.txt").generate(), b" bar ")
def test_whitespace_directive(self):
- loader = DictLoader(
- {
- "foo.html": """\
+ loader = DictLoader({"foo.html": """\
{% whitespace oneline %}
{% for i in range(3) %}
{{ i }}
{% end %}
{% whitespace all %}
pre\tformatted
-"""
- }
- )
+"""})
self.assertEqual(
loader.load("foo.html").generate(), b" 0 1 2 \n pre\tformatted\n"
)
#
# The annotations future became available in python 3.7 but has been replaced by PEP 649, so
# it should remain supported but off-by-default for the foreseeable future.
- code1 = textwrap.dedent(
- """
+ code1 = textwrap.dedent("""
from __future__ import annotations
from tornado.util import exec_in
exec_in(code2, globals())
- """
- )
+ """)
- code2 = textwrap.dedent(
- """
+ code2 = textwrap.dedent("""
def f(x: int) -> int:
return x + 1
output[0] = f.__annotations__
- """
- )
+ """)
# Make a mutable container to pass the result back to the caller
output = [None]