return result
+class LineDecoder:
+ """
+ Handles incrementally reading lines from text.
+
+ Uses universal line decoding, supporting any of `\n`, `\r`, or `\r\n`
+ as line endings, normalizing to `\n`.
+ """
+
+ def __init__(self) -> None:
+ self.buffer = ""
+
+ def decode(self, text: str) -> typing.List[str]:
+ lines = []
+
+ if text.startswith("\n") and self.buffer and self.buffer[-1] == "\r":
+ # Handle the case where we have an "\r\n" split across
+ # our previous input, and our new chunk.
+ lines.append(self.buffer[:-1] + "\n")
+ self.buffer = ""
+ text = text[1:]
+
+ while text:
+ num_chars = len(text)
+ for idx in range(num_chars):
+ char = text[idx]
+ next_char = None if idx + 1 == num_chars else text[idx + 1]
+ if char == "\n":
+ lines.append(self.buffer + text[: idx + 1])
+ self.buffer = ""
+ text = text[idx + 1 :]
+ break
+ elif char == "\r" and next_char == "\n":
+ lines.append(self.buffer + text[:idx] + "\n")
+ self.buffer = ""
+ text = text[idx + 2 :]
+ break
+ elif char == "\r" and next_char is not None:
+ lines.append(self.buffer + text[:idx] + "\n")
+ self.buffer = ""
+ text = text[idx + 1 :]
+ break
+ elif next_char is None:
+ self.buffer = text
+ text = ""
+ break
+
+ return lines
+
+ def flush(self) -> typing.List[str]:
+ if self.buffer.endswith("\r"):
+ # Handle the case where we had a trailing '\r', which could have
+ # been a '\r\n' pair.
+ lines = [self.buffer[:-1] + "\n"]
+ elif self.buffer:
+ lines = [self.buffer]
+ else:
+ lines = []
+ self.buffer = ""
+ return lines
+
+
SUPPORTED_DECODERS = {
"identity": IdentityDecoder,
"gzip": GZipDecoder,
SUPPORTED_DECODERS,
Decoder,
IdentityDecoder,
+ LineDecoder,
MultiDecoder,
TextDecoder,
)
yield decoder.decode(chunk)
yield decoder.flush()
+ async def stream_lines(self) -> typing.AsyncIterator[str]:
+ decoder = LineDecoder()
+ async for text in self.stream_text():
+ for line in decoder.decode(text):
+ yield line
+ for line in decoder.flush():
+ yield line
+
async def raw(self) -> typing.AsyncIterator[bytes]:
"""
A byte-iterator over the raw response content.
assert content == "Hello, world!"
+@pytest.mark.asyncio
+async def test_stream_lines():
+ response = httpx.Response(200, content=b"Hello,\nworld!")
+
+ await response.read()
+
+ content = []
+ async for line in response.stream_lines():
+ content.append(line)
+ assert content == ["Hello,\n", "world!"]
+
+
@pytest.mark.asyncio
async def test_stream_interface_after_read():
response = httpx.Response(200, content=b"Hello, world!")
DeflateDecoder,
GZipDecoder,
IdentityDecoder,
+ LineDecoder,
TextDecoder,
)
assert decoder.flush() == ""
+def test_line_decoder_nl():
+ decoder = LineDecoder()
+ assert decoder.decode("") == []
+ assert decoder.decode("a\n\nb\nc") == ["a\n", "\n", "b\n"]
+ assert decoder.flush() == ["c"]
+
+ decoder = LineDecoder()
+ assert decoder.decode("") == []
+ assert decoder.decode("a\n\nb\nc\n") == ["a\n", "\n", "b\n", "c\n"]
+ assert decoder.flush() == []
+
+
+def test_line_decoder_cr():
+ decoder = LineDecoder()
+ assert decoder.decode("") == []
+ assert decoder.decode("a\r\rb\rc") == ["a\n", "\n", "b\n"]
+ assert decoder.flush() == ["c"]
+
+ decoder = LineDecoder()
+ assert decoder.decode("") == []
+ assert decoder.decode("a\r\rb\rc\r") == ["a\n", "\n", "b\n"]
+ assert decoder.flush() == ["c\n"]
+
+
+def test_line_decoder_crnl():
+ decoder = LineDecoder()
+ assert decoder.decode("") == []
+ assert decoder.decode("a\r\n\r\nb\r\nc") == ["a\n", "\n", "b\n"]
+ assert decoder.flush() == ["c"]
+
+ decoder = LineDecoder()
+ assert decoder.decode("") == []
+ assert decoder.decode("a\r\n\r\nb\r\nc\r\n") == ["a\n", "\n", "b\n", "c\n"]
+ assert decoder.flush() == []
+
+ decoder = LineDecoder()
+ assert decoder.decode("") == []
+ assert decoder.decode("a\r") == []
+ assert decoder.decode("\n\r\nb\r\nc") == ["a\n", "\n", "b\n"]
+ assert decoder.flush() == ["c"]
+
+
def test_invalid_content_encoding_header():
headers = [(b"Content-Encoding", b"invalid-header")]
body = b"test 123"