brotli = None
-class Decoder:
+class ContentDecoder:
def decode(self, data: bytes) -> bytes:
raise NotImplementedError() # pragma: nocover
raise NotImplementedError() # pragma: nocover
-class IdentityDecoder(Decoder):
+class IdentityDecoder(ContentDecoder):
"""
Handle unencoded data.
"""
return b""
-class DeflateDecoder(Decoder):
+class DeflateDecoder(ContentDecoder):
"""
Handle 'deflate' decoding.
raise ValueError(str(exc))
-class GZipDecoder(Decoder):
+class GZipDecoder(ContentDecoder):
"""
Handle 'gzip' decoding.
raise ValueError(str(exc))
-class BrotliDecoder(Decoder):
+class BrotliDecoder(ContentDecoder):
"""
Handle 'brotli' decoding.
raise ValueError(str(exc))
-class MultiDecoder(Decoder):
+class MultiDecoder(ContentDecoder):
"""
Handle the case where multiple encodings have been applied.
"""
- def __init__(self, children: typing.Sequence[Decoder]) -> None:
+ def __init__(self, children: typing.Sequence[ContentDecoder]) -> None:
"""
'children' should be a sequence of decoders in the order in which
each was applied.
from ._content_streams import ByteStream, ContentStream, encode
from ._decoders import (
SUPPORTED_DECODERS,
- Decoder,
+ ContentDecoder,
IdentityDecoder,
LineDecoder,
MultiDecoder,
"""
return chardet.detect(self.content)["encoding"]
- @property
- def decoder(self) -> Decoder:
+ def _get_content_decoder(self) -> ContentDecoder:
"""
Returns a decoder instance which can be used to decode the raw byte
content, depending on the Content-Encoding used in the response.
"""
if not hasattr(self, "_decoder"):
- decoders: typing.List[Decoder] = []
+ decoders: typing.List[ContentDecoder] = []
values = self.headers.get_list("content-encoding", split_commas=True)
for value in values:
value = value.strip().lower()
if hasattr(self, "_content"):
yield self._content
else:
+ decoder = self._get_content_decoder()
with self._wrap_decoder_errors():
for chunk in self.iter_raw():
- yield self.decoder.decode(chunk)
- yield self.decoder.flush()
+ yield decoder.decode(chunk)
+ yield decoder.flush()
def iter_text(self) -> typing.Iterator[str]:
"""
if hasattr(self, "_content"):
yield self._content
else:
+ decoder = self._get_content_decoder()
with self._wrap_decoder_errors():
async for chunk in self.aiter_raw():
- yield self.decoder.decode(chunk)
- yield self.decoder.flush()
+ yield decoder.decode(chunk)
+ yield decoder.flush()
async def aiter_text(self) -> typing.AsyncIterator[str]:
"""