--- /dev/null
+try:
+ import brotli
+except ImportError:
+ brotli = None
import typing
from urllib.parse import urlsplit
-from .decoders import IdentityDecoder
+from .decoders import SUPPORTED_DECODERS, Decoder, IdentityDecoder, MultiDecoder
from .exceptions import ResponseClosed, StreamConsumed
self.on_close = on_close
self.is_closed = False
self.is_streamed = False
- self.decoder = IdentityDecoder()
+
+ decoders = [] # type: typing.List[Decoder]
+ for header, value in self.headers:
+ if header.strip().lower() == b"content-encoding":
+ for part in value.split(b","):
+ part = part.strip().lower()
+ decoder_cls = SUPPORTED_DECODERS[part]
+ decoders.append(decoder_cls())
+
+ if len(decoders) == 0:
+ self.decoder = IdentityDecoder() # type: Decoder
+ elif len(decoders) == 1:
+ self.decoder = decoders[0]
+ else:
+ self.decoder = MultiDecoder(decoders)
+
if isinstance(body, bytes):
self.is_closed = True
- self.body = body
+ self.body = self.decoder.decode(body) + self.decoder.flush()
else:
self.body_aiter = body
"""
Handlers for Content-Encoding.
+
+See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
"""
+import typing
+import zlib
+
+from .compat import brotli
+
+
+class Decoder:
+ def decode(self, data: bytes) -> bytes:
+ raise NotImplementedError() # pragma: nocover
+
+ def flush(self) -> bytes:
+ raise NotImplementedError() # pragma: nocover
+
+
+class IdentityDecoder(Decoder):
+ def decode(self, data: bytes) -> bytes:
+ return data
+
+ def flush(self) -> bytes:
+ return b""
+
+
+class DeflateDecoder(Decoder):
+ """
+ Handle 'deflate' decoding.
+
+ See: https://stackoverflow.com/questions/1838699
+ """
+
+ def __init__(self) -> None:
+ self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS)
+
+ def decode(self, data: bytes) -> bytes:
+ return self.decompressor.decompress(data)
+
+ def flush(self) -> bytes:
+ return self.decompressor.flush()
-class IdentityDecoder:
- def decode(self, chunk: bytes) -> bytes:
- return chunk
+class GZipDecoder(Decoder):
+ """
+ Handle 'gzip' decoding.
+
+ See: https://stackoverflow.com/questions/1838699
+ """
+
+ def __init__(self) -> None:
+ self.decompressor = zlib.decompressobj(zlib.MAX_WBITS | 16)
+
+ def decode(self, data: bytes) -> bytes:
+ return self.decompressor.decompress(data)
+
+ def flush(self) -> bytes:
+ return self.decompressor.flush()
+
+
+class BrotliDecoder(Decoder):
+ """
+ Handle 'brotli' decoding.
+
+ Requires `pip install brotlipy`.
+ See: https://brotlipy.readthedocs.io/
+ """
+
+ def __init__(self) -> None:
+ assert (
+ brotli is not None
+ ), "The 'brotlipy' library must be installed to use 'BrotliDecoder'"
+ self.decompressor = brotli.Decompressor()
+
+ def decode(self, data: bytes) -> bytes:
+ return self.decompressor.decompress(data)
def flush(self) -> bytes:
+ self.decompressor.finish()
return b""
-# class DeflateDecoder:
-# pass
-#
-#
-# class GZipDecoder:
-# pass
-#
-#
-# class BrotliDecoder:
-# pass
-#
-#
-# class MultiDecoder:
-# def __init__(self, children):
-# self.children = children
-#
-# def decode(self, chunk: bytes) -> bytes:
-# data = chunk
-# for child in children:
-# data = child.decode(data)
-# return data
-#
-# def flush(self) -> bytes:
-# data = b''
-# for child in children:
-# data = child.decode(data)
-# data = child.flush()
-# return data
+class MultiDecoder(Decoder):
+ """
+ Handle the case where mutliple encodings have been applied.
+ """
+
+ def __init__(self, children: typing.Sequence[Decoder]) -> None:
+ """
+ children should be a sequence of decoders in the order in which
+ each was applied.
+ """
+ # Note that we reverse the order for decoding.
+ self.children = list(reversed(children))
+
+ def decode(self, data: bytes) -> bytes:
+ for child in self.children:
+ data = child.decode(data)
+ return data
+
+ def flush(self) -> bytes:
+ data = b""
+ for child in self.children:
+ data = child.decode(data) + child.flush()
+ return data
+
+
+SUPPORTED_DECODERS = {
+ b"gzip": GZipDecoder,
+ b"deflate": DeflateDecoder,
+ b"identity": IdentityDecoder,
+ b"br": BrotliDecoder,
+}
+
+
+if brotli is None:
+ SUPPORTED_DECODERS.pop(b"br") # pragma: nocover
certifi
h11
+# Optional
+brotlipy
+
+
# Testing
autoflake
black
--- /dev/null
+import zlib
+
+import brotli
+import pytest
+
+import httpcore
+
+
+def test_deflate():
+ body = b"test 123"
+ compressor = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS)
+ compressed_body = compressor.compress(body) + compressor.flush()
+
+ headers = [(b"Content-Encoding", b"deflate")]
+ response = httpcore.Response(200, headers=headers, body=compressed_body)
+ assert response.body == body
+
+
+def test_gzip():
+ body = b"test 123"
+ compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
+ compressed_body = compressor.compress(body) + compressor.flush()
+
+ headers = [(b"Content-Encoding", b"gzip")]
+ response = httpcore.Response(200, headers=headers, body=compressed_body)
+ assert response.body == body
+
+
+def test_brotli():
+ body = b"test 123"
+ compressed_body = brotli.compress(body)
+
+ headers = [(b"Content-Encoding", b"br")]
+ response = httpcore.Response(200, headers=headers, body=compressed_body)
+ assert response.body == body
+
+
+def test_multi():
+ body = b"test 123"
+
+ deflate_compressor = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS)
+ compressed_body = deflate_compressor.compress(body) + deflate_compressor.flush()
+
+ gzip_compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
+ compressed_body = (
+ gzip_compressor.compress(compressed_body) + gzip_compressor.flush()
+ )
+
+ headers = [(b"Content-Encoding", b"deflate, gzip")]
+ response = httpcore.Response(200, headers=headers, body=compressed_body)
+ assert response.body == body
+
+
+def test_multi_with_identity():
+ body = b"test 123"
+ compressed_body = brotli.compress(body)
+
+ headers = [(b"Content-Encoding", b"br, identity")]
+ response = httpcore.Response(200, headers=headers, body=compressed_body)
+ assert response.body == body
+
+ headers = [(b"Content-Encoding", b"identity, br")]
+ response = httpcore.Response(200, headers=headers, body=compressed_body)
+ assert response.body == body
+
+
+@pytest.mark.asyncio
+async def test_streaming():
+ body = b"test 123"
+ compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
+
+ async def compress(body):
+ yield compressor.compress(body)
+ yield compressor.flush()
+
+ headers = [(b"Content-Encoding", b"gzip")]
+ response = httpcore.Response(200, headers=headers, body=compress(body))
+ assert not hasattr(response, "body")
+ assert await response.read() == body