E = sys.monitoring.events
all_events = 0
for event, cb_name in self.EVENT_CALLBACK_MAP.items():
- callback = getattr(self, f'{cb_name}_callback')
+ callback = self.callback_wrapper(getattr(self, f'{cb_name}_callback'), event)
sys.monitoring.register_callback(self._tool_id, event, callback)
if event != E.INSTRUCTION:
all_events |= event
if sys.monitoring.get_tool(self._tool_id) == self._name:
sys.monitoring.restart_events()
- def callback_wrapper(func):
+ def callback_wrapper(self, func, event):
import functools
@functools.wraps(func)
- def wrapper(self, *args):
+ def wrapper(*args):
if self._tracing_thread != threading.current_thread():
return
try:
frame = sys._getframe().f_back
- ret = func(self, frame, *args)
+ ret = func(frame, *args)
if self._enabled and frame.f_trace:
self.update_local_events()
- if self._disable_current_event:
+ if (
+ self._disable_current_event
+ and event not in (E.PY_THROW, E.PY_UNWIND, E.RAISE)
+ ):
return sys.monitoring.DISABLE
else:
return ret
return wrapper
- @callback_wrapper
def call_callback(self, frame, code, *args):
local_tracefunc = self._tracefunc(frame, 'call', None)
if local_tracefunc is not None:
if self._enabled:
sys.monitoring.set_local_events(self._tool_id, code, self.LOCAL_EVENTS)
- @callback_wrapper
def return_callback(self, frame, code, offset, retval):
if frame.f_trace:
frame.f_trace(frame, 'return', retval)
- @callback_wrapper
def unwind_callback(self, frame, code, *args):
if frame.f_trace:
frame.f_trace(frame, 'return', None)
- @callback_wrapper
def line_callback(self, frame, code, *args):
if frame.f_trace and frame.f_trace_lines:
frame.f_trace(frame, 'line', None)
- @callback_wrapper
def jump_callback(self, frame, code, inst_offset, dest_offset):
if dest_offset > inst_offset:
return sys.monitoring.DISABLE
if frame.f_trace and frame.f_trace_lines:
frame.f_trace(frame, 'line', None)
- @callback_wrapper
def exception_callback(self, frame, code, offset, exc):
if frame.f_trace:
if exc.__traceback__ and hasattr(exc.__traceback__, 'tb_frame'):
tb = tb.tb_next
frame.f_trace(frame, 'exception', (type(exc), exc, exc.__traceback__))
- @callback_wrapper
def opcode_callback(self, frame, code, offset):
if frame.f_trace and frame.f_trace_opcodes:
frame.f_trace(frame, 'opcode', None)
(Pdb) continue
"""
+def test_pdb_breakpoint_with_throw():
+ """GH-132536: PY_THROW event should not be turned off
+
+ >>> reset_Breakpoint()
+
+ >>> def gen():
+ ... yield 0
+
+ >>> def test_function():
+ ... import pdb; pdb.Pdb(nosigint=True, readrc=False).set_trace()
+ ... g = gen()
+ ... try:
+ ... g.throw(TypeError)
+ ... except TypeError:
+ ... pass
+
+ >>> with PdbTestInput([
+ ... 'b 7',
+ ... 'continue',
+ ... 'clear 1',
+ ... 'continue',
+ ... ]):
+ ... test_function()
+ > <doctest test.test_pdb.test_pdb_breakpoint_with_throw[2]>(2)test_function()
+ -> import pdb; pdb.Pdb(nosigint=True, readrc=False).set_trace()
+ (Pdb) b 7
+ Breakpoint 1 at <doctest test.test_pdb.test_pdb_breakpoint_with_throw[2]>:7
+ (Pdb) continue
+ > <doctest test.test_pdb.test_pdb_breakpoint_with_throw[2]>(7)test_function()
+ -> pass
+ (Pdb) clear 1
+ Deleted breakpoint 1 at <doctest test.test_pdb.test_pdb_breakpoint_with_throw[2]>:7
+ (Pdb) continue
+ """
+
def test_pdb_multiline_statement():
"""Test for multiline statement