assert False, f"forbidden message appeared in log {self.path}: {msg}"
+class LineReader:
+ """
+ >>> import io
+
+ >>> file = io.StringIO("complete line\\n")
+ >>> line_reader = LineReader(file)
+ >>> for line in line_reader.readlines():
+ ... print(line.strip())
+ complete line
+
+ >>> file = io.StringIO("complete line\\nand then incomplete line")
+ >>> line_reader = LineReader(file)
+ >>> for line in line_reader.readlines():
+ ... print(line.strip())
+ complete line
+
+ >>> file = io.StringIO("complete line\\nand then another complete line\\n")
+ >>> line_reader = LineReader(file)
+ >>> for line in line_reader.readlines():
+ ... print(line.strip())
+ complete line
+ and then another complete line
+
+ >>> file = io.StringIO()
+ >>> line_reader = LineReader(file)
+ >>> for chunk in (
+ ... "first line\\nsecond line\\nthi",
+ ... "rd ",
+ ... "line\\nfour",
+ ... "th line\\n\\nfifth line\\n"
+ ... ):
+ ... print("=== OUTER ITERATION ===")
+ ... pos = file.tell()
+ ... print(chunk, end="", file=file)
+ ... _ = file.seek(pos)
+ ... for line in line_reader.readlines():
+ ... print("--- inner iteration ---")
+ ... print(line.strip() or "<blank>")
+ === OUTER ITERATION ===
+ --- inner iteration ---
+ first line
+ --- inner iteration ---
+ second line
+ === OUTER ITERATION ===
+ === OUTER ITERATION ===
+ --- inner iteration ---
+ third line
+ === OUTER ITERATION ===
+ --- inner iteration ---
+ fourth line
+ --- inner iteration ---
+ <blank>
+ --- inner iteration ---
+ fifth line
+ """
+
+ def __init__(self, stream: TextIO):
+ self._stream = stream
+ self._linebuf = ""
+
+ def readline(self) -> Optional[str]:
+ """
+ Wrapper around io.readline() function to handle unfinished lines.
+
+ If a line ends with newline character, it's returned immediately.
+ If a line doesn't end with a newline character, the read contents are
+ buffered until the next call of this function and None is returned
+ instead.
+ """
+ read = self._stream.readline()
+ if not read.endswith("\n"):
+ self._linebuf += read
+ return None
+ read = self._linebuf + read
+ self._linebuf = ""
+ return read
+
+ def readlines(self) -> Iterator[str]:
+ """
+ Wrapper around io.readline() which only returns finished lines.
+ """
+ while True:
+ line = self.readline()
+ if line is None:
+ return
+ yield line
+
+
class WatchLog(abc.ABC):
"""
Wait for a log message to appear in a text file.
...
isctest.log.watchlog.WatchLogException: timeout must be greater than 0
"""
- self._fd = None # type: Optional[TextIO]
+ self._fd: Optional[TextIO] = None
+ self._reader: Optional[LineReader] = None
self._path = path
self._wait_function_called = False
- self._linebuf = ""
if timeout <= 0.0:
raise WatchLogException("timeout must be greater than 0")
self._timeout = timeout
self._deadline = 0.0
- def _readline(self) -> Optional[str]:
- """
- Wrapper around io.readline() function to handle unfinished lines.
-
- If a line ends with newline character, it's returned immediately.
- If a line doesn't end with a newline character, the read contents are
- buffered until the next call of this function and None is returned
- instead.
- """
- if not self._fd:
- raise WatchLogException("file to watch isn't open")
- read = self._fd.readline()
- if not read.endswith("\n"):
- self._linebuf += read
- return None
- read = self._linebuf + read
- self._linebuf = ""
- return read
-
- def _readlines(self) -> Iterator[str]:
- """
- Wrapper around io.readline() which only returns finished lines.
- """
- while True:
- line = self._readline()
- if line is None:
- return
- yield line
-
def _setup_wait(self, patterns: OneOrMore[FlexPattern]) -> List[Pattern]:
self._wait_function_called = True
self._deadline = time.monotonic() + self._timeout
return patterns
def _wait_for_match(self, regexes: List[Pattern]) -> Match:
+ if not self._reader:
+ raise WatchLogException(
+ "use WatchLog as context manager before calling wait_for_*() functions"
+ )
while time.monotonic() < self._deadline:
- for line in self._readlines():
+ for line in self._reader.readlines():
for regex in regexes:
match = regex.search(line)
if match:
def __enter__(self) -> Any:
self._fd = open(self._path, encoding="utf-8")
self._seek_on_enter()
+ self._reader = LineReader(self._fd)
return self
@abc.abstractmethod
def __exit__(self, *_: Any) -> None:
if not self._wait_function_called:
raise WatchLogException("wait_for_*() was not called")
- if self._fd:
- self._fd.close()
+ self._reader = None
+ assert self._fd
+ self._fd.close()
class WatchLogFromStart(WatchLog):
"""
def _seek_on_enter(self) -> None:
- if self._fd:
- self._fd.seek(0, os.SEEK_END)
+ assert self._fd
+ self._fd.seek(0, os.SEEK_END)