def decode(self, content: str) -> typing.List[str]:
if self._chunk_size is None:
- return [content]
+ return [content] if content else []
self._buffer.write(content)
if self._buffer.tell() >= self._chunk_size:
text = text[:-1]
if not text:
- return []
+ # NOTE: the edge case input of empty text doesn't occur in practice,
+ # because other httpx internals filter out this value
+ return [] # pragma: no cover
trailing_newline = text[-1] in NEWLINE_CHARS
lines = text.splitlines()
yield chunk
text_content = decoder.flush()
for chunk in chunker.decode(text_content):
- yield chunk
+ yield chunk # pragma: no cover
for chunk in chunker.flush():
yield chunk
yield chunk
text_content = decoder.flush()
for chunk in chunker.decode(text_content):
- yield chunk
+ yield chunk # pragma: no cover
for chunk in chunker.flush():
yield chunk
assert response.text == ""
+@pytest.mark.parametrize(
+ ["data", "expected"],
+ [((b"Hello,", b" world!"), ["Hello,", " world!"])],
+)
+def test_streaming_text_decoder(
+ data: typing.Iterable[bytes], expected: typing.List[str]
+) -> None:
+ response = httpx.Response(200, content=iter(data))
+ assert list(response.iter_text()) == expected
+
+
def test_line_decoder_nl():
response = httpx.Response(200, content=[b""])
assert list(response.iter_lines()) == []