"""
Handles incrementally reading lines from text.
- Uses universal line decoding, supporting any of `\n`, `\r`, or `\r\n`
- as line endings, normalizing to `\n`.
+ Has the same behaviour as the stdllib splitlines, but handling the input iteratively.
"""
def __init__(self) -> None:
- self.buffer = ""
+ self.buffer: typing.List[str] = []
+ self.trailing_cr: bool = False
def decode(self, text: str) -> typing.List[str]:
- lines = []
-
- if text and self.buffer and self.buffer[-1] == "\r":
- if text.startswith("\n"):
- # Handle the case where we have an "\r\n" split across
- # our previous input, and our new chunk.
- lines.append(self.buffer[:-1] + "\n")
- self.buffer = ""
- text = text[1:]
- else:
- # Handle the case where we have "\r" at the end of our
- # previous input.
- lines.append(self.buffer[:-1] + "\n")
- self.buffer = ""
-
- while text:
- num_chars = len(text)
- for idx in range(num_chars):
- char = text[idx]
- next_char = None if idx + 1 == num_chars else text[idx + 1]
- if char == "\n":
- lines.append(self.buffer + text[: idx + 1])
- self.buffer = ""
- text = text[idx + 1 :]
- break
- elif char == "\r" and next_char == "\n":
- lines.append(self.buffer + text[:idx] + "\n")
- self.buffer = ""
- text = text[idx + 2 :]
- break
- elif char == "\r" and next_char is not None:
- lines.append(self.buffer + text[:idx] + "\n")
- self.buffer = ""
- text = text[idx + 1 :]
- break
- elif next_char is None:
- self.buffer += text
- text = ""
- break
+ # See https://docs.python.org/3/library/stdtypes.html#str.splitlines
+ NEWLINE_CHARS = "\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029"
+
+ # We always push a trailing `\r` into the next decode iteration.
+ if self.trailing_cr:
+ text = "\r" + text
+ self.trailing_cr = False
+ if text.endswith("\r"):
+ self.trailing_cr = True
+ text = text[:-1]
+
+ if not text:
+ return []
+
+ trailing_newline = text[-1] in NEWLINE_CHARS
+ lines = text.splitlines()
+
+ if len(lines) == 1 and not trailing_newline:
+ # No new lines, buffer the input and continue.
+ self.buffer.append(lines[0])
+ return []
+
+ if self.buffer:
+ # Include any existing buffer in the first portion of the
+ # splitlines result.
+ lines = ["".join(self.buffer) + lines[0]] + lines[1:]
+ self.buffer = []
+
+ if not trailing_newline:
+ # If the last segment of splitlines is not newline terminated,
+ # then drop it from our output and start a new buffer.
+ self.buffer = [lines.pop()]
return lines
def flush(self) -> typing.List[str]:
- if self.buffer.endswith("\r"):
- # Handle the case where we had a trailing '\r', which could have
- # been a '\r\n' pair.
- lines = [self.buffer[:-1] + "\n"]
- elif self.buffer:
- lines = [self.buffer]
- else:
- lines = []
- self.buffer = ""
+ if not self.buffer and not self.trailing_cr:
+ return []
+
+ lines = ["".join(self.buffer)]
+ self.buffer = []
+ self.trailing_cr = False
return lines
def test_line_decoder_nl():
decoder = LineDecoder()
assert decoder.decode("") == []
- assert decoder.decode("a\n\nb\nc") == ["a\n", "\n", "b\n"]
+ assert decoder.decode("a\n\nb\nc") == ["a", "", "b"]
assert decoder.flush() == ["c"]
decoder = LineDecoder()
assert decoder.decode("") == []
- assert decoder.decode("a\n\nb\nc\n") == ["a\n", "\n", "b\n", "c\n"]
+ assert decoder.decode("a\n\nb\nc\n") == ["a", "", "b", "c"]
assert decoder.flush() == []
# Issue #1033
decoder = LineDecoder()
assert decoder.decode("") == []
- assert decoder.decode("12345\n") == ["12345\n"]
+ assert decoder.decode("12345\n") == ["12345"]
assert decoder.decode("foo ") == []
assert decoder.decode("bar ") == []
- assert decoder.decode("baz\n") == ["foo bar baz\n"]
+ assert decoder.decode("baz\n") == ["foo bar baz"]
assert decoder.flush() == []
def test_line_decoder_cr():
decoder = LineDecoder()
assert decoder.decode("") == []
- assert decoder.decode("a\r\rb\rc") == ["a\n", "\n", "b\n"]
+ assert decoder.decode("a\r\rb\rc") == ["a", "", "b"]
assert decoder.flush() == ["c"]
decoder = LineDecoder()
assert decoder.decode("") == []
- assert decoder.decode("a\r\rb\rc\r") == ["a\n", "\n", "b\n"]
- assert decoder.flush() == ["c\n"]
+ assert decoder.decode("a\r\rb\rc\r") == ["a", "", "b"]
+ assert decoder.flush() == ["c"]
# Issue #1033
decoder = LineDecoder()
assert decoder.decode("") == []
assert decoder.decode("12345\r") == []
- assert decoder.decode("foo ") == ["12345\n"]
+ assert decoder.decode("foo ") == ["12345"]
assert decoder.decode("bar ") == []
assert decoder.decode("baz\r") == []
- assert decoder.flush() == ["foo bar baz\n"]
+ assert decoder.flush() == ["foo bar baz"]
def test_line_decoder_crnl():
decoder = LineDecoder()
assert decoder.decode("") == []
- assert decoder.decode("a\r\n\r\nb\r\nc") == ["a\n", "\n", "b\n"]
+ assert decoder.decode("a\r\n\r\nb\r\nc") == ["a", "", "b"]
assert decoder.flush() == ["c"]
decoder = LineDecoder()
assert decoder.decode("") == []
- assert decoder.decode("a\r\n\r\nb\r\nc\r\n") == ["a\n", "\n", "b\n", "c\n"]
+ assert decoder.decode("a\r\n\r\nb\r\nc\r\n") == ["a", "", "b", "c"]
assert decoder.flush() == []
decoder = LineDecoder()
assert decoder.decode("") == []
assert decoder.decode("a\r") == []
- assert decoder.decode("\n\r\nb\r\nc") == ["a\n", "\n", "b\n"]
+ assert decoder.decode("\n\r\nb\r\nc") == ["a", "", "b"]
assert decoder.flush() == ["c"]
# Issue #1033
decoder = LineDecoder()
assert decoder.decode("") == []
- assert decoder.decode("12345\r\n") == ["12345\n"]
+ assert decoder.decode("12345\r\n") == ["12345"]
assert decoder.decode("foo ") == []
assert decoder.decode("bar ") == []
- assert decoder.decode("baz\r\n") == ["foo bar baz\n"]
+ assert decoder.decode("baz\r\n") == ["foo bar baz"]
assert decoder.flush() == []