def decode(self, text: str) -> typing.List[str]:
lines = []
- if text.startswith("\n") and self.buffer and self.buffer[-1] == "\r":
- # Handle the case where we have an "\r\n" split across
- # our previous input, and our new chunk.
- lines.append(self.buffer[:-1] + "\n")
- self.buffer = ""
- text = text[1:]
+ if text and self.buffer and self.buffer[-1] == "\r":
+ if text.startswith("\n"):
+ # Handle the case where we have an "\r\n" split across
+ # our previous input, and our new chunk.
+ lines.append(self.buffer[:-1] + "\n")
+ self.buffer = ""
+ text = text[1:]
+ else:
+ # Handle the case where we have "\r" at the end of our
+ # previous input.
+ lines.append(self.buffer[:-1] + "\n")
+ self.buffer = ""
while text:
num_chars = len(text)
assert decoder.flush() == ["c\n"]
# Issue #1033
- # TODO: This seems like another bug; fix expectations and results.
decoder = LineDecoder()
assert decoder.decode("") == []
assert decoder.decode("12345\r") == []
- assert decoder.decode("foo ") == []
+ assert decoder.decode("foo ") == ["12345\n"]
assert decoder.decode("bar ") == []
assert decoder.decode("baz\r") == []
- assert decoder.flush() == ["12345\rfoo bar baz\n"]
+ assert decoder.flush() == ["foo bar baz\n"]
def test_line_decoder_crnl():