been read. The result is true if the stream is still open.
"""
if self.params.decompress:
- delegate = _GzipMessageDelegate(delegate, self.params.chunk_size)
+ delegate = _GzipMessageDelegate(
+ delegate, self.params.chunk_size, self._max_body_size
+ )
return self._read_message(delegate)
async def _read_message(self, delegate: httputil.HTTPMessageDelegate) -> bool:
class _GzipMessageDelegate(httputil.HTTPMessageDelegate):
"""Wraps an `HTTPMessageDelegate` to decode ``Content-Encoding: gzip``."""
- def __init__(self, delegate: httputil.HTTPMessageDelegate, chunk_size: int) -> None:
+ def __init__(
+ self,
+ delegate: httputil.HTTPMessageDelegate,
+ chunk_size: int,
+ max_body_size: int,
+ ) -> None:
self._delegate = delegate
self._chunk_size = chunk_size
+ self._max_body_size = max_body_size
+ self._decompressed_body_size = 0
self._decompressor: GzipDecompressor | None = None
def headers_received(
compressed_data, self._chunk_size
)
if decompressed:
+ self._decompressed_body_size += len(decompressed)
+ if self._decompressed_body_size > self._max_body_size:
+ raise httputil.HTTPInputError("decompressed body too large")
ret = self._delegate.data_received(decompressed)
if ret is not None:
await ret
class GzipTest(GzipBaseTest, AsyncHTTPTestCase):
def get_httpserver_options(self):
- return dict(decompress_request=True)
+ return dict(decompress_request=True, max_body_size=100)
def test_gzip(self):
response = self.post_gzip("foo=bar")
)
self.assertEqual(json_decode(response.body), {"foo": ["bar"]})
+ def test_size_limit(self):
+ with ExpectLog(gen_log, ".*decompressed body too large", level=logging.INFO):
+ self.post_gzip("x" * 101)
+
class GzipUnsupportedTest(GzipBaseTest, AsyncHTTPTestCase):
def test_gzip_unsupported(self):