for line in self.logcat_process.stdout:
self.logcat_queue.put(line.rstrip("\n"))
self.logcat_process.stdout.close()
+
self.logcat_thread = Thread(target=logcat_thread)
self.logcat_thread.start()
- from ctypes import CDLL, c_char_p, c_int
- android_log_write = getattr(CDLL("liblog.so"), "__android_log_write")
- android_log_write.argtypes = (c_int, c_char_p, c_char_p)
- ANDROID_LOG_INFO = 4
-
- # Separate tests using a marker line with a different tag.
- tag, message = "python.test", f"{self.id()} {time()}"
- android_log_write(
- ANDROID_LOG_INFO, tag.encode("UTF-8"), message.encode("UTF-8"))
- self.assert_log("I", tag, message, skip=True, timeout=5)
+ try:
+ from ctypes import CDLL, c_char_p, c_int
+ android_log_write = getattr(CDLL("liblog.so"), "__android_log_write")
+ android_log_write.argtypes = (c_int, c_char_p, c_char_p)
+ ANDROID_LOG_INFO = 4
+
+ # Separate tests using a marker line with a different tag.
+ tag, message = "python.test", f"{self.id()} {time()}"
+ android_log_write(
+ ANDROID_LOG_INFO, tag.encode("UTF-8"), message.encode("UTF-8"))
+ self.assert_log("I", tag, message, skip=True)
+ except:
+ # If setUp throws an exception, tearDown is not automatically
+ # called. Avoid leaving a dangling thread which would keep the
+ # Python process alive indefinitely.
+ self.tearDown()
+ raise
def assert_logs(self, level, tag, expected, **kwargs):
for line in expected:
self.assert_log(level, tag, line, **kwargs)
- def assert_log(self, level, tag, expected, *, skip=False, timeout=0.5):
- deadline = time() + timeout
+ def assert_log(self, level, tag, expected, *, skip=False):
+ deadline = time() + LOOPBACK_TIMEOUT
while True:
try:
line = self.logcat_queue.get(timeout=(deadline - time()))
except queue.Empty:
- self.fail(f"line not found: {expected!r}")
+ raise self.failureException(
+ f"line not found: {expected!r}"
+ ) from None
if match := re.fullmatch(fr"(.)/{tag}: (.*)", line):
try:
self.assertEqual(level, match[1])
self.logcat_process.wait(LOOPBACK_TIMEOUT)
self.logcat_thread.join(LOOPBACK_TIMEOUT)
+ # Avoid an irrelevant warning about threading._dangling.
+ self.logcat_thread = None
+
@contextmanager
def unbuffered(self, stream):
stream.reconfigure(write_through=True)