import calendar
import collections.abc
import copy
+import dataclasses
import datetime
import email.utils
from functools import lru_cache
return int(val)
+@dataclasses.dataclass
+class ParseMultipartConfig:
+ """This class configures the parsing of ``multipart/form-data`` request bodies.
+
+ Its primary purpose is to place limits on the size and complexity of request messages
+ to avoid potential denial-of-service attacks.
+
+ .. versionadded:: 6.5.5
+ """
+
+ enabled: bool = True
+ """Set this to false to disable the parsing of ``multipart/form-data`` requests entirely.
+
+ This may be desirable for applications that do not need to handle this format, since
+ multipart request have a history of DoS vulnerabilities in Tornado. Multipart requests
+ are used primarily for ``<input type="file">`` in HTML forms, or in APIs that mimic this
+ format. File uploads that use the HTTP ``PUT`` method generally do not use the multipart
+ format.
+ """
+
+ max_parts: int = 100
+ """The maximum number of parts accepted in a multipart request.
+
+ Each ``<input>`` element in an HTML form corresponds to at least one "part".
+ """
+
+ max_part_header_size: int = 10 * 1024
+ """The maximum size of the headers for each part of a multipart request.
+
+ The header for a part contains the name of the form field and optionally the filename
+ and content type of the uploaded file.
+ """
+
+
+@dataclasses.dataclass
+class ParseBodyConfig:
+ """This class configures the parsing of request bodies.
+
+ .. versionadded:: 6.5.5
+ """
+
+ multipart: ParseMultipartConfig = dataclasses.field(
+ default_factory=ParseMultipartConfig
+ )
+ """Configuration for ``multipart/form-data`` request bodies."""
+
+
+_DEFAULT_PARSE_BODY_CONFIG = ParseBodyConfig()
+
+
+def set_parse_body_config(config: ParseBodyConfig) -> None:
+ r"""Sets the **global** default configuration for parsing request bodies.
+
+ This global setting is provided as a stopgap for applications that need to raise the limits
+ introduced in Tornado 6.5.5, or who wish to disable the parsing of multipart/form-data bodies
+ entirely. Non-global configuration for this functionality will be introduced in a future
+ release.
+
+ >>> content_type = "multipart/form-data; boundary=foo"
+ >>> multipart_body = b"--foo--\r\n"
+ >>> parse_body_arguments(content_type, multipart_body, {}, {})
+ >>> multipart_config = ParseMultipartConfig(enabled=False)
+ >>> config = ParseBodyConfig(multipart=multipart_config)
+ >>> set_parse_body_config(config)
+ >>> parse_body_arguments(content_type, multipart_body, {}, {})
+ Traceback (most recent call last):
+ ...
+ tornado.httputil.HTTPInputError: ...: multipart/form-data parsing is disabled
+ >>> set_parse_body_config(ParseBodyConfig()) # reset to defaults
+
+ .. versionadded:: 6.5.5
+ """
+ global _DEFAULT_PARSE_BODY_CONFIG
+ _DEFAULT_PARSE_BODY_CONFIG = config
+
+
def parse_body_arguments(
content_type: str,
body: bytes,
arguments: Dict[str, List[bytes]],
files: Dict[str, List[HTTPFile]],
headers: Optional[HTTPHeaders] = None,
+ *,
+ config: Optional[ParseBodyConfig] = None,
) -> None:
"""Parses a form request body.
and ``files`` parameters are dictionaries that will be updated
with the parsed contents.
"""
+ if config is None:
+ config = _DEFAULT_PARSE_BODY_CONFIG
if content_type.startswith("application/x-www-form-urlencoded"):
if headers and "Content-Encoding" in headers:
raise HTTPInputError(
)
try:
fields = content_type.split(";")
+ if fields[0].strip() != "multipart/form-data":
+ # This catches "Content-Type: multipart/form-dataxyz"
+ raise HTTPInputError("Invalid content type")
for field in fields:
k, sep, v = field.strip().partition("=")
if k == "boundary" and v:
- parse_multipart_form_data(utf8(v), body, arguments, files)
+ parse_multipart_form_data(
+ utf8(v), body, arguments, files, config=config.multipart
+ )
break
else:
raise HTTPInputError("multipart boundary not found")
data: bytes,
arguments: Dict[str, List[bytes]],
files: Dict[str, List[HTTPFile]],
+ *,
+ config: Optional[ParseMultipartConfig] = None,
) -> None:
"""Parses a ``multipart/form-data`` body.
Now recognizes non-ASCII filenames in RFC 2231/5987
(``filename*=``) format.
"""
+ if config is None:
+ config = _DEFAULT_PARSE_BODY_CONFIG.multipart
+ if not config.enabled:
+ raise HTTPInputError("multipart/form-data parsing is disabled")
# The standard allows for the boundary to be quoted in the header,
# although it's rare (it happens at least for google app engine
# xmpp). I think we're also supposed to handle backslash-escapes
if final_boundary_index == -1:
raise HTTPInputError("Invalid multipart/form-data: no final boundary found")
parts = data[:final_boundary_index].split(b"--" + boundary + b"\r\n")
+ if len(parts) > config.max_parts:
+ raise HTTPInputError("multipart/form-data has too many parts")
for part in parts:
if not part:
continue
eoh = part.find(b"\r\n\r\n")
if eoh == -1:
raise HTTPInputError("multipart/form-data missing headers")
+ if eoh > config.max_part_header_size:
+ raise HTTPInputError("multipart/form-data part header too large")
headers = HTTPHeaders.parse(part[:eoh].decode("utf-8"), _chars_are_bytes=False)
disp_header = headers.get("Content-Disposition", "")
disposition, disp_params = _parse_header(disp_header)
# type: () -> unittest.TestSuite
import doctest
- return doctest.DocTestSuite()
+ return doctest.DocTestSuite(optionflags=doctest.ELLIPSIS)
_netloc_re = re.compile(r"^(.+):(\d+)$")
qs_to_qsl,
HTTPInputError,
HTTPFile,
+ ParseMultipartConfig,
)
from tornado.escape import utf8, native_str
from tornado.log import gen_log
return time.perf_counter() - start
d1 = f(1_000)
+ # Note that headers larger than this are blocked by the default configuration.
d2 = f(10_000)
if d2 / d1 > 20:
self.fail(f"Disposition param parsing is not linear: {d1=} vs {d2=}")
+ def test_multipart_config(self):
+ boundary = b"1234"
+ body = b"""--1234
+Content-Disposition: form-data; name="files"; filename="ab.txt"
+
+--1234--""".replace(
+ b"\n", b"\r\n"
+ )
+ config = ParseMultipartConfig()
+ args, files = form_data_args()
+ parse_multipart_form_data(boundary, body, args, files, config=config)
+ self.assertEqual(files["files"][0]["filename"], "ab.txt")
+
+ config_no_parts = ParseMultipartConfig(max_parts=0)
+ with self.assertRaises(HTTPInputError) as cm:
+ parse_multipart_form_data(
+ boundary, body, args, files, config=config_no_parts
+ )
+ self.assertIn("too many parts", str(cm.exception))
+
+ config_small_headers = ParseMultipartConfig(max_part_header_size=10)
+ with self.assertRaises(HTTPInputError) as cm:
+ parse_multipart_form_data(
+ boundary, body, args, files, config=config_small_headers
+ )
+ self.assertIn("header too large", str(cm.exception))
+
+ config_disabled = ParseMultipartConfig(enabled=False)
+ with self.assertRaises(HTTPInputError) as cm:
+ parse_multipart_form_data(
+ boundary, body, args, files, config=config_disabled
+ )
+ self.assertIn("multipart/form-data parsing is disabled", str(cm.exception))
+
class HTTPHeadersTest(unittest.TestCase):
def test_multi_line(self):