From 3d30ca1ad9916fa3b0597dd90185b01591190d57 Mon Sep 17 00:00:00 2001 From: Pierce Lopez Date: Wed, 28 Aug 2019 22:56:31 -0400 Subject: [PATCH] httputil.parse_body_arguments: allow incomplete url-escaping support x-www-form-urlencoded body with values consisting of encoded bytes which are not url-encoded into ascii (it seems other web frameworks often support this) add bytes qs support to escape.parse_qs_bytes, leave str qs support for backwards compatibility --- tornado/escape.py | 8 +++--- tornado/httputil.py | 3 ++- tornado/test/httpserver_test.py | 44 ++++++++++++++++++++++++--------- 3 files changed, 39 insertions(+), 16 deletions(-) diff --git a/tornado/escape.py b/tornado/escape.py index b0ec33230..8d2d6bc88 100644 --- a/tornado/escape.py +++ b/tornado/escape.py @@ -145,10 +145,10 @@ def url_unescape( # noqa: F811 def parse_qs_bytes( - qs: str, keep_blank_values: bool = False, strict_parsing: bool = False + qs: Union[str, bytes], keep_blank_values: bool = False, strict_parsing: bool = False ) -> Dict[str, List[bytes]]: - """Parses a query string like urlparse.parse_qs, but returns the - values as byte strings. + """Parses a query string like urlparse.parse_qs, + but takes bytes and returns the values as byte strings. Keys still become type str (interpreted as latin1 in python3!) because it's too painful to keep them as byte strings in @@ -156,6 +156,8 @@ def parse_qs_bytes( """ # This is gross, but python3 doesn't give us another way. # Latin1 is the universal donor of character encodings. + if isinstance(qs, bytes): + qs = qs.decode("latin1") result = urllib.parse.parse_qs( qs, keep_blank_values, strict_parsing, encoding="latin1", errors="strict" ) diff --git a/tornado/httputil.py b/tornado/httputil.py index 26a6c440e..758d30d96 100644 --- a/tornado/httputil.py +++ b/tornado/httputil.py @@ -783,7 +783,8 @@ def parse_body_arguments( ) return try: - uri_arguments = parse_qs_bytes(native_str(body), keep_blank_values=True) + # real charset decoding will happen in RequestHandler.decode_argument() + uri_arguments = parse_qs_bytes(body, keep_blank_values=True) except Exception as e: gen_log.warning("Invalid x-www-form-urlencoded body: %s", e) uri_arguments = {} diff --git a/tornado/test/httpserver_test.py b/tornado/test/httpserver_test.py index 1bc8def83..4ad3ce7e0 100644 --- a/tornado/test/httpserver_test.py +++ b/tornado/test/httpserver_test.py @@ -41,6 +41,7 @@ import ssl import sys import tempfile import unittest +import urllib.parse from io import BytesIO import typing @@ -378,6 +379,19 @@ class TypeCheckHandler(RequestHandler): self.errors[name] = "expected %s, got %s" % (expected_type, actual_type) +class PostEchoHandler(RequestHandler): + def post(self, *path_args): + self.write(dict(echo=self.get_argument("data"))) + + +class PostEchoGBKHandler(PostEchoHandler): + def decode_argument(self, value, name=None): + try: + return value.decode("gbk") + except Exception: + raise HTTPError(400, "invalid gbk bytes: %r" % value) + + class HTTPServerTest(AsyncHTTPTestCase): def get_app(self): return Application( @@ -385,6 +399,8 @@ class HTTPServerTest(AsyncHTTPTestCase): ("/echo", EchoHandler), ("/typecheck", TypeCheckHandler), ("//doubleslash", EchoHandler), + ("/post_utf8", PostEchoHandler), + ("/post_gbk", PostEchoGBKHandler), ] ) @@ -423,18 +439,22 @@ class HTTPServerTest(AsyncHTTPTestCase): self.assertEqual(200, response.code) self.assertEqual(json_decode(response.body), {}) - def test_malformed_body(self): - # parse_qs is pretty forgiving, but it will fail on python 3 - # if the data is not utf8. - with ExpectLog(gen_log, "Invalid x-www-form-urlencoded body"): - response = self.fetch( - "/echo", - method="POST", - headers={"Content-Type": "application/x-www-form-urlencoded"}, - body=b"\xe9", - ) - self.assertEqual(200, response.code) - self.assertEqual(b"{}", response.body) + def test_post_encodings(self): + headers = {"Content-Type": "application/x-www-form-urlencoded"} + uni_text = "chinese: \u5f20\u4e09" + for enc in ("utf8", "gbk"): + for quote in (True, False): + with self.subTest(enc=enc, quote=quote): + bin_text = uni_text.encode(enc) + if quote: + bin_text = urllib.parse.quote(bin_text).encode("ascii") + response = self.fetch( + "/post_" + enc, + method="POST", + headers=headers, + body=(b"data=" + bin_text), + ) + self.assertEqual(json_decode(response.body), {"echo": uni_text}) class HTTPServerRawTest(AsyncHTTPTestCase): -- 2.47.2