start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine],
headers: httputil.HTTPHeaders,
) -> Optional[Awaitable[None]]:
- if headers.get("Content-Encoding") == "gzip":
+ if headers.get("Content-Encoding", "").lower() == "gzip":
self._decompressor = GzipDecompressor()
# Downstream delegates will only see uncompressed data,
# so rename the content-encoding header.
response = self.post_gzip("foo=bar")
self.assertEqual(json_decode(response.body), {u"foo": [u"bar"]})
+ def test_gzip_case_insensitive(self):
+ # https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.2.1
+ bytesio = BytesIO()
+ gzip_file = gzip.GzipFile(mode="w", fileobj=bytesio)
+ gzip_file.write(utf8("foo=bar"))
+ gzip_file.close()
+ compressed_body = bytesio.getvalue()
+ response = self.fetch(
+ "/",
+ method="POST",
+ body=compressed_body,
+ headers={"Content-Encoding": "GZIP"},
+ )
+ self.assertEqual(json_decode(response.body), {u"foo": [u"bar"]})
+
class GzipUnsupportedTest(GzipBaseTest, AsyncHTTPTestCase):
def test_gzip_unsupported(self):