subtests += 1
self.assertEqual(subtests, len(data2)) # All subtests ran?
+ def _capture_exception(self):
+ """Call run.print_exception() and return its stderr output."""
+ with captured_stderr() as output:
+ with mock.patch.object(run, 'cleanup_traceback') as ct:
+ ct.side_effect = lambda t, e: t
+ run.print_exception()
+ return output.getvalue()
+
+ @force_not_colorized
+ def test_print_exception_group_nested(self):
+ try:
+ try:
+ raise ExceptionGroup('inner', [ValueError('v1')])
+ except ExceptionGroup as inner:
+ raise ExceptionGroup('outer', [inner, TypeError('t1')])
+ except ExceptionGroup:
+ tb = self._capture_exception()
+
+ self.assertIn('ExceptionGroup: outer (2 sub-exceptions)', tb)
+ self.assertIn('ExceptionGroup: inner', tb)
+ self.assertIn('ValueError: v1', tb)
+ self.assertIn('TypeError: t1', tb)
+ # Verify tree structure characters.
+ self.assertIn('+-+---------------- 1 ----------------', tb)
+ self.assertIn('+---------------- 2 ----------------', tb)
+ self.assertIn('+------------------------------------', tb)
+
+ @force_not_colorized
+ def test_print_exception_group_chaining(self):
+ # __cause__ on a sub-exception exercises the prefixed
+ # chaining-message path (margin chars on separator lines).
+ sub = TypeError('t1')
+ sub.__cause__ = ValueError('original')
+ try:
+ raise ExceptionGroup('eg1', [sub])
+ except ExceptionGroup:
+ tb = self._capture_exception()
+ self.assertIn('ValueError: original', tb)
+ self.assertIn('| The above exception was the direct cause', tb)
+ self.assertIn('ExceptionGroup: eg1', tb)
+
+ # __context__ (implicit chaining) on a sub-exception.
+ sub = TypeError('t2')
+ sub.__context__ = ValueError('first')
+ try:
+ raise ExceptionGroup('eg2', [sub])
+ except ExceptionGroup:
+ tb = self._capture_exception()
+ self.assertIn('ValueError: first', tb)
+ self.assertIn('| During handling of the above exception', tb)
+ self.assertIn('ExceptionGroup: eg2', tb)
+
+ @force_not_colorized
+ def test_print_exception_group_seen(self):
+ shared = ValueError('shared')
+ try:
+ raise ExceptionGroup('eg', [shared, shared])
+ except ExceptionGroup:
+ tb = self._capture_exception()
+
+ self.assertIn('ValueError: shared', tb)
+ self.assertIn('<exception ValueError has printed>', tb)
+
+ @force_not_colorized
+ def test_print_exception_group_max_width(self):
+ excs = [ValueError(f'v{i}') for i in range(20)]
+ try:
+ raise ExceptionGroup('eg', excs)
+ except ExceptionGroup:
+ tb = self._capture_exception()
+
+ self.assertIn('+---------------- 15 ----------------', tb)
+ self.assertIn('+---------------- ... ----------------', tb)
+ self.assertIn('and 5 more exceptions', tb)
+ self.assertNotIn('+---------------- 16 ----------------', tb)
+
+ @force_not_colorized
+ def test_print_exception_group_max_depth(self):
+ def make_nested(depth):
+ if depth == 0:
+ return ValueError('leaf')
+ return ExceptionGroup(f'level{depth}',
+ [make_nested(depth - 1)])
+
+ try:
+ raise make_nested(15)
+ except ExceptionGroup:
+ tb = self._capture_exception()
+
+ self.assertIn('... (max_group_depth is 10)', tb)
+ self.assertIn('ExceptionGroup: level15', tb)
+ self.assertNotIn('ValueError: leaf', tb)
+
# StdioFile tests.
class S(str):
sys.last_type, sys.last_value, sys.last_traceback = excinfo
sys.last_exc = val
seen = set()
+ exclude = ("run.py", "rpc.py", "threading.py", "queue.py",
+ "debugger_r.py", "bdb.py")
+ max_group_width = 15
+ max_group_depth = 10
+ group_depth = 0
+
+ def print_exc_group(typ, exc, tb, prefix=""):
+ nonlocal group_depth
+ group_depth += 1
+ prefix2 = prefix or " "
+ if group_depth > max_group_depth:
+ print(f"{prefix2}| ... (max_group_depth is {max_group_depth})",
+ file=efile)
+ group_depth -= 1
+ return
+ if tb:
+ if not prefix:
+ print(" + Exception Group Traceback (most recent call last):", file=efile)
+ else:
+ print(f"{prefix}| Exception Group Traceback (most recent call last):", file=efile)
+ tbe = traceback.extract_tb(tb)
+ cleanup_traceback(tbe, exclude)
+ for line in traceback.format_list(tbe):
+ for subline in line.rstrip().splitlines():
+ print(f"{prefix2}| {subline}", file=efile)
+ lines = get_message_lines(typ, exc, tb)
+ for line in lines:
+ print(f"{prefix2}| {line}", end="", file=efile)
+ num_excs = len(exc.exceptions)
+ if num_excs <= max_group_width:
+ n = num_excs
+ else:
+ n = max_group_width + 1
+ for i, sub in enumerate(exc.exceptions[:n], 1):
+ truncated = (i > max_group_width)
+ first_line_pre = "+-" if i == 1 else " "
+ title = str(i) if not truncated else '...'
+ print(f"{prefix2}{first_line_pre}+---------------- {title} ----------------", file=efile)
+ if truncated:
+ remaining = num_excs - max_group_width
+ plural = 's' if remaining > 1 else ''
+ print(f"{prefix2} | and {remaining} more exception{plural}",
+ file=efile)
+ need_print_underline = True
+ elif id(sub) not in seen:
+ if not prefix:
+ print_exc(type(sub), sub, sub.__traceback__, " ")
+ else:
+ print_exc(type(sub), sub, sub.__traceback__, prefix + " ")
+ need_print_underline = not isinstance(sub, BaseExceptionGroup)
+ else:
+ print(f"{prefix2} | <exception {type(sub).__name__} has printed>", file=efile)
+ need_print_underline = True
+ if need_print_underline and i == n:
+ print(f"{prefix2} +------------------------------------", file=efile)
+ group_depth -= 1
- def print_exc(typ, exc, tb):
+ def print_exc(typ, exc, tb, prefix=""):
seen.add(id(exc))
context = exc.__context__
cause = exc.__cause__
+ prefix2 = f"{prefix}| " if prefix else ""
if cause is not None and id(cause) not in seen:
- print_exc(type(cause), cause, cause.__traceback__)
- print("\nThe above exception was the direct cause "
- "of the following exception:\n", file=efile)
+ print_exc(type(cause), cause, cause.__traceback__, prefix)
+ print(f"{prefix2}\n{prefix2}The above exception was the direct cause "
+ f"of the following exception:\n{prefix2}", file=efile)
elif (context is not None and
not exc.__suppress_context__ and
id(context) not in seen):
- print_exc(type(context), context, context.__traceback__)
- print("\nDuring handling of the above exception, "
- "another exception occurred:\n", file=efile)
- if tb:
- tbe = traceback.extract_tb(tb)
- print('Traceback (most recent call last):', file=efile)
- exclude = ("run.py", "rpc.py", "threading.py", "queue.py",
- "debugger_r.py", "bdb.py")
- cleanup_traceback(tbe, exclude)
- traceback.print_list(tbe, file=efile)
- lines = get_message_lines(typ, exc, tb)
- for line in lines:
- print(line, end='', file=efile)
+ print_exc(type(context), context, context.__traceback__, prefix)
+ print(f"{prefix2}\n{prefix2}During handling of the above exception, "
+ f"another exception occurred:\n{prefix2}", file=efile)
+ if isinstance(exc, BaseExceptionGroup):
+ print_exc_group(typ, exc, tb, prefix=prefix)
+ else:
+ if tb:
+ print(f"{prefix2}Traceback (most recent call last):", file=efile)
+ tbe = traceback.extract_tb(tb)
+ cleanup_traceback(tbe, exclude)
+ if prefix:
+ for line in traceback.format_list(tbe):
+ for subline in line.rstrip().splitlines():
+ print(f"{prefix}| {subline}", file=efile)
+ else:
+ traceback.print_list(tbe, file=efile)
+ lines = get_message_lines(typ, exc, tb)
+ for line in lines:
+ print(f"{prefix2}{line}", end="", file=efile)
print_exc(typ, val, tb)