self.is_closed = False
self.is_streamed = False
- decoders = [] # type: typing.List[Decoder]
- value = self.headers.get("content-encoding", "identity")
- for part in value.split(","):
- part = part.strip().lower()
- decoder_cls = SUPPORTED_DECODERS[part]
- decoders.append(decoder_cls())
-
- if len(decoders) == 0:
- self.decoder = IdentityDecoder() # type: Decoder
- elif len(decoders) == 1:
- self.decoder = decoders[0]
- else:
- self.decoder = MultiDecoder(decoders)
-
if isinstance(body, bytes):
self.is_closed = True
self.body = self.decoder.decode(body) + self.decoder.flush()
@property
def url(self) -> typing.Optional[URL]:
+ """
+ Returns the URL for which the request was made.
+
+ Requires that `request` was provided when instantiating the response.
+ """
return None if self.request is None else self.request.url
+ @property
+ def decoder(self) -> Decoder:
+ """
+ Returns a decoder instance which can be used to decode the raw byte
+ content, depending on the Content-Encoding used in the response.
+ """
+ if not hasattr(self, "_decoder"):
+ decoders = [] # type: typing.List[Decoder]
+ value = self.headers.get("content-encoding", "identity")
+ for part in value.split(","):
+ part = part.strip().lower()
+ decoder_cls = SUPPORTED_DECODERS[part]
+ decoders.append(decoder_cls())
+
+ if len(decoders) == 1:
+ self._decoder = decoders[0]
+ elif len(decoders) > 1:
+ self._decoder = MultiDecoder(decoders)
+ else:
+ self._decoder = IdentityDecoder()
+
+ return self._decoder
+
async def read(self) -> bytes:
"""
Read and return the response content.