def decode(self, content: bytes) -> typing.List[bytes]:
if self._chunk_size is None:
- return [content]
+ return [content] if content else []
self._buffer.write(content)
if self._buffer.tell() >= self._chunk_size:
yield chunk
decoded = decoder.flush()
for chunk in chunker.decode(decoded):
- yield chunk
+ yield chunk # pragma: nocover
for chunk in chunker.flush():
yield chunk
yield chunk
decoded = decoder.flush()
for chunk in chunker.decode(decoded):
- yield chunk
+ yield chunk # pragma: nocover
for chunk in chunker.flush():
yield chunk
assert parts == [b"Hello, world!"]
+def test_iter_raw_doesnt_return_empty_chunks():
+ def streaming_body_with_empty_chunks():
+ yield b"Hello, "
+ yield b""
+ yield b"world!"
+ yield b""
+
+ response = httpx.Response(200, content=streaming_body_with_empty_chunks())
+
+ parts = [part for part in response.iter_raw()]
+ assert parts == [b"Hello, ", b"world!"]
+
+
def test_iter_raw_on_iterable():
response = httpx.Response(
200,
assert parts == []
+def test_iter_bytes_doesnt_return_empty_chunks():
+ def streaming_body_with_empty_chunks():
+ yield b"Hello, "
+ yield b""
+ yield b"world!"
+ yield b""
+
+ response = httpx.Response(200, content=streaming_body_with_empty_chunks())
+
+ parts = [part for part in response.iter_bytes()]
+ assert parts == [b"Hello, ", b"world!"]
+
+
@pytest.mark.asyncio
async def test_aiter_bytes():
response = httpx.Response(