import dis
import code
import glob
+import json
import token
import types
import codeop
import pprint
import signal
+import socket
+import typing
import asyncio
import inspect
+import weakref
+import builtins
+import tempfile
import textwrap
import tokenize
import itertools
import linecache
import _colorize
+from contextlib import closing
from contextlib import contextmanager
from rlcompleter import Completer
from types import CodeType
if cmd == 'end':
return True # end of cmd list
elif cmd == 'EOF':
- print('')
+ self.message('')
return True # end of cmd list
cmdlist = self.commands[self.commands_bnum]
if cmd == 'silent':
complete_ignore = _complete_bpnumber
+ def _prompt_for_confirmation(self, prompt, default):
+ try:
+ reply = input(prompt)
+ except EOFError:
+ reply = default
+ return reply.strip().lower()
+
def do_clear(self, arg):
"""cl(ear) [filename:lineno | bpnumber ...]
clear all breaks at that line in that file.
"""
if not arg:
- try:
- reply = input('Clear all breaks? ')
- except EOFError:
- reply = 'no'
- reply = reply.strip().lower()
+ reply = self._prompt_for_confirmation(
+ 'Clear all breaks? ',
+ default='no',
+ )
if reply in ('y', 'yes'):
bplist = [bp for bp in bdb.Breakpoint.bpbynumber if bp]
self.clear_all_breaks()
self.error('Jump failed: %s' % e)
do_j = do_jump
+ def _create_recursive_debugger(self):
+ return Pdb(self.completekey, self.stdin, self.stdout)
+
def do_debug(self, arg):
"""debug code
self.stop_trace()
globals = self.curframe.f_globals
locals = self.curframe.f_locals
- p = Pdb(self.completekey, self.stdin, self.stdout)
+ p = self._create_recursive_debugger()
p.prompt = "(%s) " % self.prompt.strip()
self.message("ENTERING RECURSIVE DEBUGGER")
try:
pdb.message(header)
pdb.set_trace(sys._getframe().f_back, commands=commands)
+# Remote PDB
+
+class _PdbServer(Pdb):
+ def __init__(self, sockfile, owns_sockfile=True, **kwargs):
+ self._owns_sockfile = owns_sockfile
+ self._interact_state = None
+ self._sockfile = sockfile
+ self._command_name_cache = []
+ self._write_failed = False
+ super().__init__(**kwargs)
+
+ @staticmethod
+ def protocol_version():
+ # By default, assume a client and server are compatible if they run
+ # the same Python major.minor version. We'll try to keep backwards
+ # compatibility between patch versions of a minor version if possible.
+ # If we do need to change the protocol in a patch version, we'll change
+ # `revision` to the patch version where the protocol changed.
+ # We can ignore compatibility for pre-release versions; sys.remote_exec
+ # can't attach to a pre-release version except from that same version.
+ v = sys.version_info
+ revision = 0
+ return int(f"{v.major:02X}{v.minor:02X}{revision:02X}F0", 16)
+
+ def _ensure_valid_message(self, msg):
+ # Ensure the message conforms to our protocol.
+ # If anything needs to be changed here for a patch release of Python,
+ # the 'revision' in protocol_version() should be updated.
+ match msg:
+ case {"message": str(), "type": str()}:
+ # Have the client show a message. The client chooses how to
+ # format the message based on its type. The currently defined
+ # types are "info" and "error". If a message has a type the
+ # client doesn't recognize, it must be treated as "info".
+ pass
+ case {"help": str()}:
+ # Have the client show the help for a given argument.
+ pass
+ case {"prompt": str(), "state": str()}:
+ # Have the client display the given prompt and wait for a reply
+ # from the user. If the client recognizes the state it may
+ # enable mode-specific features like multi-line editing.
+ # If it doesn't recognize the state it must prompt for a single
+ # line only and send it directly to the server. A server won't
+ # progress until it gets a "reply" or "signal" message, but can
+ # process "complete" requests while waiting for the reply.
+ pass
+ case {
+ "completions": list(completions)
+ } if all(isinstance(c, str) for c in completions):
+ # Return valid completions for a client's "complete" request.
+ pass
+ case {
+ "command_list": list(command_list)
+ } if all(isinstance(c, str) for c in command_list):
+ # Report the list of legal PDB commands to the client.
+ # Due to aliases this list is not static, but the client
+ # needs to know it for multi-line editing.
+ pass
+ case _:
+ raise AssertionError(
+ f"PDB message doesn't follow the schema! {msg}"
+ )
+
+ def _send(self, **kwargs):
+ self._ensure_valid_message(kwargs)
+ json_payload = json.dumps(kwargs)
+ try:
+ self._sockfile.write(json_payload.encode() + b"\n")
+ self._sockfile.flush()
+ except OSError:
+ # This means that the client has abruptly disconnected, but we'll
+ # handle that the next time we try to read from the client instead
+ # of trying to handle it from everywhere _send() may be called.
+ # Track this with a flag rather than assuming readline() will ever
+ # return an empty string because the socket may be half-closed.
+ self._write_failed = True
+
+ @typing.override
+ def message(self, msg, end="\n"):
+ self._send(message=str(msg) + end, type="info")
+
+ @typing.override
+ def error(self, msg):
+ self._send(message=str(msg), type="error")
+
+ def _get_input(self, prompt, state) -> str:
+ # Before displaying a (Pdb) prompt, send the list of PDB commands
+ # unless we've already sent an up-to-date list.
+ if state == "pdb" and not self._command_name_cache:
+ self._command_name_cache = self.completenames("", "", 0, 0)
+ self._send(command_list=self._command_name_cache)
+ self._send(prompt=prompt, state=state)
+ return self._read_reply()
+
+ def _read_reply(self):
+ # Loop until we get a 'reply' or 'signal' from the client,
+ # processing out-of-band 'complete' requests as they arrive.
+ while True:
+ if self._write_failed:
+ raise EOFError
+
+ msg = self._sockfile.readline()
+ if not msg:
+ raise EOFError
+
+ try:
+ payload = json.loads(msg)
+ except json.JSONDecodeError:
+ self.error(f"Disconnecting: client sent invalid JSON {msg}")
+ raise EOFError
+
+ match payload:
+ case {"reply": str(reply)}:
+ return reply
+ case {"signal": str(signal)}:
+ if signal == "INT":
+ raise KeyboardInterrupt
+ elif signal == "EOF":
+ raise EOFError
+ else:
+ self.error(
+ f"Received unrecognized signal: {signal}"
+ )
+ # Our best hope of recovering is to pretend we
+ # got an EOF to exit whatever mode we're in.
+ raise EOFError
+ case {
+ "complete": {
+ "text": str(text),
+ "line": str(line),
+ "begidx": int(begidx),
+ "endidx": int(endidx),
+ }
+ }:
+ items = self._complete_any(text, line, begidx, endidx)
+ self._send(completions=items)
+ continue
+ # Valid JSON, but doesn't meet the schema.
+ self.error(f"Ignoring invalid message from client: {msg}")
+
+ def _complete_any(self, text, line, begidx, endidx):
+ if begidx == 0:
+ return self.completenames(text, line, begidx, endidx)
+
+ cmd = self.parseline(line)[0]
+ if cmd:
+ compfunc = getattr(self, "complete_" + cmd, self.completedefault)
+ else:
+ compfunc = self.completedefault
+ return compfunc(text, line, begidx, endidx)
+
+ def cmdloop(self, intro=None):
+ self.preloop()
+ if intro is not None:
+ self.intro = intro
+ if self.intro:
+ self.message(str(self.intro))
+ stop = None
+ while not stop:
+ if self._interact_state is not None:
+ try:
+ reply = self._get_input(prompt=">>> ", state="interact")
+ except KeyboardInterrupt:
+ # Match how KeyboardInterrupt is handled in a REPL
+ self.message("\nKeyboardInterrupt")
+ except EOFError:
+ self.message("\n*exit from pdb interact command*")
+ self._interact_state = None
+ else:
+ self._run_in_python_repl(reply)
+ continue
+
+ if not self.cmdqueue:
+ try:
+ state = "commands" if self.commands_defining else "pdb"
+ reply = self._get_input(prompt=self.prompt, state=state)
+ except EOFError:
+ reply = "EOF"
+
+ self.cmdqueue.append(reply)
+
+ line = self.cmdqueue.pop(0)
+ line = self.precmd(line)
+ stop = self.onecmd(line)
+ stop = self.postcmd(stop, line)
+ self.postloop()
+
+ def postloop(self):
+ super().postloop()
+ if self.quitting:
+ self.detach()
+
+ def detach(self):
+ # Detach the debugger and close the socket without raising BdbQuit
+ self.quitting = False
+ if self._owns_sockfile:
+ # Don't try to reuse this instance, it's not valid anymore.
+ Pdb._last_pdb_instance = None
+ try:
+ self._sockfile.close()
+ except OSError:
+ # close() can fail if the connection was broken unexpectedly.
+ pass
+
+ def do_debug(self, arg):
+ # Clear our cached list of valid commands; the recursive debugger might
+ # send its own differing list, and so ours needs to be re-sent.
+ self._command_name_cache = []
+ return super().do_debug(arg)
+
+ def do_alias(self, arg):
+ # Clear our cached list of valid commands; one might be added.
+ self._command_name_cache = []
+ return super().do_alias(arg)
+
+ def do_unalias(self, arg):
+ # Clear our cached list of valid commands; one might be removed.
+ self._command_name_cache = []
+ return super().do_unalias(arg)
+
+ def do_help(self, arg):
+ # Tell the client to render the help, since it might need a pager.
+ self._send(help=arg)
+
+ do_h = do_help
+
+ def _interact_displayhook(self, obj):
+ # Like the default `sys.displayhook` except sending a socket message.
+ if obj is not None:
+ self.message(repr(obj))
+ builtins._ = obj
+
+ def _run_in_python_repl(self, lines):
+ # Run one 'interact' mode code block against an existing namespace.
+ assert self._interact_state
+ save_displayhook = sys.displayhook
+ try:
+ sys.displayhook = self._interact_displayhook
+ code_obj = self._interact_state["compiler"](lines + "\n")
+ if code_obj is None:
+ raise SyntaxError("Incomplete command")
+ exec(code_obj, self._interact_state["ns"])
+ except:
+ self._error_exc()
+ finally:
+ sys.displayhook = save_displayhook
+
+ def do_interact(self, arg):
+ # Prepare to run 'interact' mode code blocks, and trigger the client
+ # to start treating all input as Python commands, not PDB ones.
+ self.message("*pdb interact start*")
+ self._interact_state = dict(
+ compiler=codeop.CommandCompiler(),
+ ns={**self.curframe.f_globals, **self.curframe.f_locals},
+ )
+
+ @typing.override
+ def _create_recursive_debugger(self):
+ return _PdbServer(self._sockfile, owns_sockfile=False)
+
+ @typing.override
+ def _prompt_for_confirmation(self, prompt, default):
+ try:
+ return self._get_input(prompt=prompt, state="confirm")
+ except (EOFError, KeyboardInterrupt):
+ return default
+
+ def do_run(self, arg):
+ self.error("remote PDB cannot restart the program")
+
+ do_restart = do_run
+
+ def _error_exc(self):
+ if self._interact_state and isinstance(sys.exception(), SystemExit):
+ # If we get a SystemExit in 'interact' mode, exit the REPL.
+ self._interact_state = None
+ ret = super()._error_exc()
+ self.message("*exit from pdb interact command*")
+ return ret
+ else:
+ return super()._error_exc()
+
+ def default(self, line):
+ # Unlike Pdb, don't prompt for more lines of a multi-line command.
+ # The remote needs to send us the whole block in one go.
+ try:
+ candidate = line.removeprefix("!") + "\n"
+ if codeop.compile_command(candidate, "<stdin>", "single") is None:
+ raise SyntaxError("Incomplete command")
+ return super().default(candidate)
+ except:
+ self._error_exc()
+
+
+class _PdbClient:
+ def __init__(self, pid, sockfile, interrupt_script):
+ self.pid = pid
+ self.sockfile = sockfile
+ self.interrupt_script = interrupt_script
+ self.pdb_instance = Pdb()
+ self.pdb_commands = set()
+ self.completion_matches = []
+ self.state = "dumb"
+ self.write_failed = False
+
+ def _ensure_valid_message(self, msg):
+ # Ensure the message conforms to our protocol.
+ # If anything needs to be changed here for a patch release of Python,
+ # the 'revision' in protocol_version() should be updated.
+ match msg:
+ case {"reply": str()}:
+ # Send input typed by a user at a prompt to the remote PDB.
+ pass
+ case {"signal": "EOF"}:
+ # Tell the remote PDB that the user pressed ^D at a prompt.
+ pass
+ case {"signal": "INT"}:
+ # Tell the remote PDB that the user pressed ^C at a prompt.
+ pass
+ case {
+ "complete": {
+ "text": str(),
+ "line": str(),
+ "begidx": int(),
+ "endidx": int(),
+ }
+ }:
+ # Ask the remote PDB what completions are valid for the given
+ # parameters, using readline's completion protocol.
+ pass
+ case _:
+ raise AssertionError(
+ f"PDB message doesn't follow the schema! {msg}"
+ )
+
+ def _send(self, **kwargs):
+ self._ensure_valid_message(kwargs)
+ json_payload = json.dumps(kwargs)
+ try:
+ self.sockfile.write(json_payload.encode() + b"\n")
+ self.sockfile.flush()
+ except OSError:
+ # This means that the client has abruptly disconnected, but we'll
+ # handle that the next time we try to read from the client instead
+ # of trying to handle it from everywhere _send() may be called.
+ # Track this with a flag rather than assuming readline() will ever
+ # return an empty string because the socket may be half-closed.
+ self.write_failed = True
+
+ def read_command(self, prompt):
+ reply = input(prompt)
+
+ if self.state == "dumb":
+ # No logic applied whatsoever, just pass the raw reply back.
+ return reply
+
+ prefix = ""
+ if self.state == "pdb":
+ # PDB command entry mode
+ cmd = self.pdb_instance.parseline(reply)[0]
+ if cmd in self.pdb_commands or reply.strip() == "":
+ # Recognized PDB command, or blank line repeating last command
+ return reply
+
+ # Otherwise, explicit or implicit exec command
+ if reply.startswith("!"):
+ prefix = "!"
+ reply = reply.removeprefix(prefix).lstrip()
+
+ if codeop.compile_command(reply + "\n", "<stdin>", "single") is not None:
+ # Valid single-line statement
+ return prefix + reply
+
+ # Otherwise, valid first line of a multi-line statement
+ continue_prompt = "...".ljust(len(prompt))
+ while codeop.compile_command(reply, "<stdin>", "single") is None:
+ reply += "\n" + input(continue_prompt)
+
+ return prefix + reply
+
+ @contextmanager
+ def readline_completion(self, completer):
+ try:
+ import readline
+ except ImportError:
+ yield
+ return
+
+ old_completer = readline.get_completer()
+ try:
+ readline.set_completer(completer)
+ if readline.backend == "editline":
+ # libedit uses "^I" instead of "tab"
+ command_string = "bind ^I rl_complete"
+ else:
+ command_string = "tab: complete"
+ readline.parse_and_bind(command_string)
+ yield
+ finally:
+ readline.set_completer(old_completer)
+
+ def cmdloop(self):
+ with self.readline_completion(self.complete):
+ while not self.write_failed:
+ try:
+ if not (payload_bytes := self.sockfile.readline()):
+ break
+ except KeyboardInterrupt:
+ self.send_interrupt()
+ continue
+
+ try:
+ payload = json.loads(payload_bytes)
+ except json.JSONDecodeError:
+ print(
+ f"*** Invalid JSON from remote: {payload_bytes}",
+ flush=True,
+ )
+ continue
+
+ self.process_payload(payload)
+
+ def send_interrupt(self):
+ print(
+ "\n*** Program will stop at the next bytecode instruction."
+ " (Use 'cont' to resume)."
+ )
+ sys.remote_exec(self.pid, self.interrupt_script)
+
+ def process_payload(self, payload):
+ match payload:
+ case {
+ "command_list": command_list
+ } if all(isinstance(c, str) for c in command_list):
+ self.pdb_commands = set(command_list)
+ case {"message": str(msg), "type": str(msg_type)}:
+ if msg_type == "error":
+ print("***", msg, flush=True)
+ else:
+ print(msg, end="", flush=True)
+ case {"help": str(arg)}:
+ self.pdb_instance.do_help(arg)
+ case {"prompt": str(prompt), "state": str(state)}:
+ if state not in ("pdb", "interact"):
+ state = "dumb"
+ self.state = state
+ self.prompt_for_reply(prompt)
+ case _:
+ raise RuntimeError(f"Unrecognized payload {payload}")
+
+ def prompt_for_reply(self, prompt):
+ while True:
+ try:
+ payload = {"reply": self.read_command(prompt)}
+ except EOFError:
+ payload = {"signal": "EOF"}
+ except KeyboardInterrupt:
+ payload = {"signal": "INT"}
+ except Exception as exc:
+ msg = traceback.format_exception_only(exc)[-1].strip()
+ print("***", msg, flush=True)
+ continue
+
+ self._send(**payload)
+ return
+
+ def complete(self, text, state):
+ import readline
+
+ if state == 0:
+ self.completion_matches = []
+ if self.state not in ("pdb", "interact"):
+ return None
+
+ origline = readline.get_line_buffer()
+ line = origline.lstrip()
+ stripped = len(origline) - len(line)
+ begidx = readline.get_begidx() - stripped
+ endidx = readline.get_endidx() - stripped
+
+ msg = {
+ "complete": {
+ "text": text,
+ "line": line,
+ "begidx": begidx,
+ "endidx": endidx,
+ }
+ }
+
+ self._send(**msg)
+ if self.write_failed:
+ return None
+
+ payload = self.sockfile.readline()
+ if not payload:
+ return None
+
+ payload = json.loads(payload)
+ if "completions" not in payload:
+ raise RuntimeError(
+ f"Failed to get valid completions. Got: {payload}"
+ )
+
+ self.completion_matches = payload["completions"]
+ try:
+ return self.completion_matches[state]
+ except IndexError:
+ return None
+
+
+def _connect(host, port, frame, commands, version):
+ with closing(socket.create_connection((host, port))) as conn:
+ sockfile = conn.makefile("rwb")
+
+ remote_pdb = _PdbServer(sockfile)
+ weakref.finalize(remote_pdb, sockfile.close)
+
+ if Pdb._last_pdb_instance is not None:
+ remote_pdb.error("Another PDB instance is already attached.")
+ elif version != remote_pdb.protocol_version():
+ target_ver = f"0x{remote_pdb.protocol_version():08X}"
+ attach_ver = f"0x{version:08X}"
+ remote_pdb.error(
+ f"The target process is running a Python version that is"
+ f" incompatible with this PDB module."
+ f"\nTarget process pdb protocol version: {target_ver}"
+ f"\nLocal pdb module's protocol version: {attach_ver}"
+ )
+ else:
+ remote_pdb.rcLines.extend(commands.splitlines())
+ remote_pdb.set_trace(frame=frame)
+
+
+def attach(pid, commands=()):
+ """Attach to a running process with the given PID."""
+ with closing(socket.create_server(("localhost", 0))) as server:
+ port = server.getsockname()[1]
+
+ with tempfile.NamedTemporaryFile("w", delete_on_close=False) as connect_script:
+ connect_script.write(
+ textwrap.dedent(
+ f"""
+ import pdb, sys
+ pdb._connect(
+ host="localhost",
+ port={port},
+ frame=sys._getframe(1),
+ commands={json.dumps("\n".join(commands))},
+ version={_PdbServer.protocol_version()},
+ )
+ """
+ )
+ )
+ connect_script.close()
+ sys.remote_exec(pid, connect_script.name)
+
+ # TODO Add a timeout? Or don't bother since the user can ^C?
+ client_sock, _ = server.accept()
+
+ with closing(client_sock):
+ sockfile = client_sock.makefile("rwb")
+
+ with closing(sockfile):
+ with tempfile.NamedTemporaryFile("w", delete_on_close=False) as interrupt_script:
+ interrupt_script.write(
+ 'import pdb, sys\n'
+ 'if inst := pdb.Pdb._last_pdb_instance:\n'
+ ' inst.set_trace(sys._getframe(1))\n'
+ )
+ interrupt_script.close()
+
+ _PdbClient(pid, sockfile, interrupt_script.name).cmdloop()
+
+
# Post-Mortem interface
def post_mortem(t=None):
def main():
import argparse
- parser = argparse.ArgumentParser(usage="%(prog)s [-h] [-c command] (-m module | pyfile) [args ...]",
+ parser = argparse.ArgumentParser(usage="%(prog)s [-h] [-c command] (-m module | -p pid | pyfile) [args ...]",
description=_usage,
formatter_class=argparse.RawDescriptionHelpFormatter,
allow_abbrev=False)
parser.add_argument('-c', '--command', action='append', default=[], metavar='command', dest='commands',
help='pdb commands to execute as if given in a .pdbrc file')
parser.add_argument('-m', metavar='module', dest='module')
+ parser.add_argument('-p', '--pid', type=int, help="attach to the specified PID", default=None)
if len(sys.argv) == 1:
# If no arguments were given (python -m pdb), print the whole help message.
opts, args = parser.parse_known_args()
- if opts.module:
+ if opts.pid:
+ # If attaching to a remote pid, unrecognized arguments are not allowed.
+ # This will raise an error if there are extra unrecognized arguments.
+ opts = parser.parse_args()
+ if opts.module:
+ parser.error("argument -m: not allowed with argument --pid")
+ attach(opts.pid, opts.commands)
+ return
+ elif opts.module:
# If a module is being debugged, we consider the arguments after "-m module" to
# be potential arguments to the module itself. We need to parse the arguments
# before "-m" to check if there is any invalid argument.
--- /dev/null
+import io
+import json
+import os
+import signal
+import socket
+import subprocess
+import sys
+import tempfile
+import textwrap
+import threading
+import unittest
+import unittest.mock
+from contextlib import contextmanager
+from pathlib import Path
+from test.support import is_wasi, os_helper
+from test.support.os_helper import temp_dir, TESTFN, unlink
+from typing import Dict, List, Optional, Tuple, Union, Any
+
+import pdb
+from pdb import _PdbServer, _PdbClient
+
+
+class MockSocketFile:
+ """Mock socket file for testing _PdbServer without actual socket connections."""
+
+ def __init__(self):
+ self.input_queue = []
+ self.output_buffer = []
+
+ def write(self, data: bytes) -> None:
+ """Simulate write to socket."""
+ self.output_buffer.append(data)
+
+ def flush(self) -> None:
+ """No-op flush implementation."""
+ pass
+
+ def readline(self) -> bytes:
+ """Read a line from the prepared input queue."""
+ if not self.input_queue:
+ return b""
+ return self.input_queue.pop(0)
+
+ def close(self) -> None:
+ """Close the mock socket file."""
+ pass
+
+ def add_input(self, data: dict) -> None:
+ """Add input that will be returned by readline."""
+ self.input_queue.append(json.dumps(data).encode() + b"\n")
+
+ def get_output(self) -> List[dict]:
+ """Get the output that was written by the object being tested."""
+ results = []
+ for data in self.output_buffer:
+ if isinstance(data, bytes) and data.endswith(b"\n"):
+ try:
+ results.append(json.loads(data.decode().strip()))
+ except json.JSONDecodeError:
+ pass # Ignore non-JSON output
+ self.output_buffer = []
+ return results
+
+
+class RemotePdbTestCase(unittest.TestCase):
+ """Tests for the _PdbServer class."""
+
+ def setUp(self):
+ self.sockfile = MockSocketFile()
+ self.pdb = _PdbServer(self.sockfile)
+
+ # Mock some Bdb attributes that are lazily created when tracing starts
+ self.pdb.botframe = None
+ self.pdb.quitting = False
+
+ # Create a frame for testing
+ self.test_globals = {'a': 1, 'b': 2, '__pdb_convenience_variables': {'x': 100}}
+ self.test_locals = {'c': 3, 'd': 4}
+
+ # Create a simple test frame
+ frame_info = unittest.mock.Mock()
+ frame_info.f_globals = self.test_globals
+ frame_info.f_locals = self.test_locals
+ frame_info.f_lineno = 42
+ frame_info.f_code = unittest.mock.Mock()
+ frame_info.f_code.co_filename = "test_file.py"
+ frame_info.f_code.co_name = "test_function"
+
+ self.pdb.curframe = frame_info
+
+ def test_message_and_error(self):
+ """Test message and error methods send correct JSON."""
+ self.pdb.message("Test message")
+ self.pdb.error("Test error")
+
+ outputs = self.sockfile.get_output()
+ self.assertEqual(len(outputs), 2)
+ self.assertEqual(outputs[0], {"message": "Test message\n", "type": "info"})
+ self.assertEqual(outputs[1], {"message": "Test error", "type": "error"})
+
+ def test_read_command(self):
+ """Test reading commands from the socket."""
+ # Add test input
+ self.sockfile.add_input({"reply": "help"})
+
+ # Read the command
+ cmd = self.pdb._read_reply()
+ self.assertEqual(cmd, "help")
+
+ def test_read_command_EOF(self):
+ """Test reading EOF command."""
+ # Simulate socket closure
+ self.pdb._write_failed = True
+ with self.assertRaises(EOFError):
+ self.pdb._read_reply()
+
+ def test_completion(self):
+ """Test handling completion requests."""
+ # Mock completenames to return specific values
+ with unittest.mock.patch.object(self.pdb, 'completenames',
+ return_value=["continue", "clear"]):
+
+ # Add a completion request
+ self.sockfile.add_input({
+ "complete": {
+ "text": "c",
+ "line": "c",
+ "begidx": 0,
+ "endidx": 1
+ }
+ })
+
+ # Add a regular command to break the loop
+ self.sockfile.add_input({"reply": "help"})
+
+ # Read command - this should process the completion request first
+ cmd = self.pdb._read_reply()
+
+ # Verify completion response was sent
+ outputs = self.sockfile.get_output()
+ self.assertEqual(len(outputs), 1)
+ self.assertEqual(outputs[0], {"completions": ["continue", "clear"]})
+
+ # The actual command should be returned
+ self.assertEqual(cmd, "help")
+
+ def test_do_help(self):
+ """Test that do_help sends the help message."""
+ self.pdb.do_help("break")
+
+ outputs = self.sockfile.get_output()
+ self.assertEqual(len(outputs), 1)
+ self.assertEqual(outputs[0], {"help": "break"})
+
+ def test_interact_mode(self):
+ """Test interaction mode setup and execution."""
+ # First set up interact mode
+ self.pdb.do_interact("")
+
+ # Verify _interact_state is properly initialized
+ self.assertIsNotNone(self.pdb._interact_state)
+ self.assertIsInstance(self.pdb._interact_state, dict)
+
+ # Test running code in interact mode
+ with unittest.mock.patch.object(self.pdb, '_error_exc') as mock_error:
+ self.pdb._run_in_python_repl("print('test')")
+ mock_error.assert_not_called()
+
+ # Test with syntax error
+ self.pdb._run_in_python_repl("if:")
+ mock_error.assert_called_once()
+
+ def test_registering_commands(self):
+ """Test registering breakpoint commands."""
+ # Mock get_bpbynumber
+ with unittest.mock.patch.object(self.pdb, 'get_bpbynumber'):
+ # Queue up some input to send
+ self.sockfile.add_input({"reply": "commands 1"})
+ self.sockfile.add_input({"reply": "silent"})
+ self.sockfile.add_input({"reply": "print('hi')"})
+ self.sockfile.add_input({"reply": "end"})
+ self.sockfile.add_input({"signal": "EOF"})
+
+ # Run the PDB command loop
+ self.pdb.cmdloop()
+
+ outputs = self.sockfile.get_output()
+ self.assertIn('command_list', outputs[0])
+ self.assertEqual(outputs[1], {"prompt": "(Pdb) ", "state": "pdb"})
+ self.assertEqual(outputs[2], {"prompt": "(com) ", "state": "commands"})
+ self.assertEqual(outputs[3], {"prompt": "(com) ", "state": "commands"})
+ self.assertEqual(outputs[4], {"prompt": "(com) ", "state": "commands"})
+ self.assertEqual(outputs[5], {"prompt": "(Pdb) ", "state": "pdb"})
+ self.assertEqual(outputs[6], {"message": "\n", "type": "info"})
+ self.assertEqual(len(outputs), 7)
+
+ self.assertEqual(
+ self.pdb.commands[1],
+ ["_pdbcmd_silence_frame_status", "print('hi')"],
+ )
+
+ def test_detach(self):
+ """Test the detach method."""
+ with unittest.mock.patch.object(self.sockfile, 'close') as mock_close:
+ self.pdb.detach()
+ mock_close.assert_called_once()
+ self.assertFalse(self.pdb.quitting)
+
+ def test_cmdloop(self):
+ """Test the command loop with various commands."""
+ # Mock onecmd to track command execution
+ with unittest.mock.patch.object(self.pdb, 'onecmd', return_value=False) as mock_onecmd:
+ # Add commands to the queue
+ self.pdb.cmdqueue = ['help', 'list']
+
+ # Add a command from the socket for when cmdqueue is empty
+ self.sockfile.add_input({"reply": "next"})
+
+ # Add a second command to break the loop
+ self.sockfile.add_input({"reply": "quit"})
+
+ # Configure onecmd to exit the loop on "quit"
+ def side_effect(line):
+ return line == 'quit'
+ mock_onecmd.side_effect = side_effect
+
+ # Run the command loop
+ self.pdb.quitting = False # Set this by hand because we don't want to really call set_trace()
+ self.pdb.cmdloop()
+
+ # Should have processed 4 commands: 2 from cmdqueue, 2 from socket
+ self.assertEqual(mock_onecmd.call_count, 4)
+ mock_onecmd.assert_any_call('help')
+ mock_onecmd.assert_any_call('list')
+ mock_onecmd.assert_any_call('next')
+ mock_onecmd.assert_any_call('quit')
+
+ # Check if prompt was sent to client
+ outputs = self.sockfile.get_output()
+ prompts = [o for o in outputs if 'prompt' in o]
+ self.assertEqual(len(prompts), 2) # Should have sent 2 prompts
+
+
+@unittest.skipIf(is_wasi, "WASI does not support TCP sockets")
+class PdbConnectTestCase(unittest.TestCase):
+ """Tests for the _connect mechanism using direct socket communication."""
+
+ def setUp(self):
+ # Create a server socket that will wait for the debugger to connect
+ self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.server_sock.bind(('127.0.0.1', 0)) # Let OS assign port
+ self.server_sock.listen(1)
+ self.port = self.server_sock.getsockname()[1]
+
+ def _create_script(self, script=None):
+ # Create a file for subprocess script
+ if script is None:
+ script = textwrap.dedent(
+ f"""
+ import pdb
+ import sys
+ import time
+
+ def foo():
+ x = 42
+ return bar()
+
+ def bar():
+ return 42
+
+ def connect_to_debugger():
+ # Create a frame to debug
+ def dummy_function():
+ x = 42
+ # Call connect to establish connection
+ # with the test server
+ frame = sys._getframe() # Get the current frame
+ pdb._connect(
+ host='127.0.0.1',
+ port={self.port},
+ frame=frame,
+ commands="",
+ version=pdb._PdbServer.protocol_version(),
+ )
+ return x # This line won't be reached in debugging
+
+ return dummy_function()
+
+ result = connect_to_debugger()
+ foo()
+ print(f"Function returned: {{result}}")
+ """)
+
+ self.script_path = TESTFN + "_connect_test.py"
+ with open(self.script_path, 'w') as f:
+ f.write(script)
+
+ def tearDown(self):
+ self.server_sock.close()
+ try:
+ unlink(self.script_path)
+ except OSError:
+ pass
+
+ def _connect_and_get_client_file(self):
+ """Helper to start subprocess and get connected client file."""
+ # Start the subprocess that will connect to our socket
+ process = subprocess.Popen(
+ [sys.executable, self.script_path],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ text=True
+ )
+
+ # Accept the connection from the subprocess
+ client_sock, _ = self.server_sock.accept()
+ client_file = client_sock.makefile('rwb')
+ self.addCleanup(client_file.close)
+ self.addCleanup(client_sock.close)
+
+ return process, client_file
+
+ def _read_until_prompt(self, client_file):
+ """Helper to read messages until a prompt is received."""
+ messages = []
+ while True:
+ data = client_file.readline()
+ if not data:
+ break
+ msg = json.loads(data.decode())
+ messages.append(msg)
+ if 'prompt' in msg:
+ break
+ return messages
+
+ def _send_command(self, client_file, command):
+ """Helper to send a command to the debugger."""
+ client_file.write(json.dumps({"reply": command}).encode() + b"\n")
+ client_file.flush()
+
+ def _send_interrupt(self, pid):
+ """Helper to send an interrupt signal to the debugger."""
+ # with tempfile.NamedTemporaryFile("w", delete_on_close=False) as interrupt_script:
+ interrupt_script = TESTFN + "_interrupt_script.py"
+ with open(interrupt_script, 'w') as f:
+ f.write(
+ 'import pdb, sys\n'
+ 'print("Hello, world!")\n'
+ 'if inst := pdb.Pdb._last_pdb_instance:\n'
+ ' inst.set_trace(sys._getframe(1))\n'
+ )
+ self.addCleanup(unlink, interrupt_script)
+ try:
+ sys.remote_exec(pid, interrupt_script)
+ except PermissionError:
+ self.skipTest("Insufficient permissions to execute code in remote process")
+
+ def test_connect_and_basic_commands(self):
+ """Test connecting to a remote debugger and sending basic commands."""
+ self._create_script()
+ process, client_file = self._connect_and_get_client_file()
+
+ with process:
+ # We should receive initial data from the debugger
+ data = client_file.readline()
+ initial_data = json.loads(data.decode())
+ self.assertIn('message', initial_data)
+ self.assertIn('pdb._connect', initial_data['message'])
+
+ # First, look for command_list message
+ data = client_file.readline()
+ command_list = json.loads(data.decode())
+ self.assertIn('command_list', command_list)
+
+ # Then, look for the first prompt
+ data = client_file.readline()
+ prompt_data = json.loads(data.decode())
+ self.assertIn('prompt', prompt_data)
+ self.assertEqual(prompt_data['state'], 'pdb')
+
+ # Send 'bt' (backtrace) command
+ self._send_command(client_file, "bt")
+
+ # Check for response - we should get some stack frames
+ messages = self._read_until_prompt(client_file)
+
+ # Extract text messages containing stack info
+ text_msg = [msg['message'] for msg in messages
+ if 'message' in msg and 'connect_to_debugger' in msg['message']]
+ got_stack_info = bool(text_msg)
+
+ expected_stacks = [
+ "<module>",
+ "connect_to_debugger",
+ ]
+
+ for stack, msg in zip(expected_stacks, text_msg, strict=True):
+ self.assertIn(stack, msg)
+
+ self.assertTrue(got_stack_info, "Should have received stack trace information")
+
+ # Send 'c' (continue) command to let the program finish
+ self._send_command(client_file, "c")
+
+ # Wait for process to finish
+ stdout, _ = process.communicate(timeout=5)
+
+ # Check if we got the expected output
+ self.assertIn("Function returned: 42", stdout)
+ self.assertEqual(process.returncode, 0)
+
+ def test_breakpoints(self):
+ """Test setting and hitting breakpoints."""
+ self._create_script()
+ process, client_file = self._connect_and_get_client_file()
+ with process:
+ # Skip initial messages until we get to the prompt
+ self._read_until_prompt(client_file)
+
+ # Set a breakpoint at the return statement
+ self._send_command(client_file, "break bar")
+ messages = self._read_until_prompt(client_file)
+ bp_msg = next(msg['message'] for msg in messages if 'message' in msg)
+ self.assertIn("Breakpoint", bp_msg)
+
+ # Continue execution until breakpoint
+ self._send_command(client_file, "c")
+ messages = self._read_until_prompt(client_file)
+
+ # Verify we hit the breakpoint
+ hit_msg = next(msg['message'] for msg in messages if 'message' in msg)
+ self.assertIn("bar()", hit_msg)
+
+ # Check breakpoint list
+ self._send_command(client_file, "b")
+ messages = self._read_until_prompt(client_file)
+ list_msg = next(msg['message'] for msg in reversed(messages) if 'message' in msg)
+ self.assertIn("1 breakpoint", list_msg)
+ self.assertIn("breakpoint already hit 1 time", list_msg)
+
+ # Clear breakpoint
+ self._send_command(client_file, "clear 1")
+ messages = self._read_until_prompt(client_file)
+ clear_msg = next(msg['message'] for msg in reversed(messages) if 'message' in msg)
+ self.assertIn("Deleted breakpoint", clear_msg)
+
+ # Continue to end
+ self._send_command(client_file, "c")
+ stdout, _ = process.communicate(timeout=5)
+
+ self.assertIn("Function returned: 42", stdout)
+ self.assertEqual(process.returncode, 0)
+
+ def test_keyboard_interrupt(self):
+ """Test that sending keyboard interrupt breaks into pdb."""
+ synchronizer_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ synchronizer_sock.bind(('127.0.0.1', 0)) # Let OS assign port
+ synchronizer_sock.settimeout(5)
+ synchronizer_sock.listen(1)
+ self.addCleanup(synchronizer_sock.close)
+ sync_port = synchronizer_sock.getsockname()[1]
+
+ script = textwrap.dedent(f"""
+ import time
+ import sys
+ import socket
+ import pdb
+ def bar():
+ frame = sys._getframe() # Get the current frame
+ pdb._connect(
+ host='127.0.0.1',
+ port={self.port},
+ frame=frame,
+ commands="",
+ version=pdb._PdbServer.protocol_version(),
+ )
+ print("Connected to debugger")
+ iterations = 10
+ socket.create_connection(('127.0.0.1', {sync_port})).close()
+ while iterations > 0:
+ print("Iteration", iterations)
+ time.sleep(1)
+ iterations -= 1
+ return 42
+
+ if __name__ == "__main__":
+ print("Function returned:", bar())
+ """)
+ self._create_script(script=script)
+ process, client_file = self._connect_and_get_client_file()
+
+ with process:
+
+ # Skip initial messages until we get to the prompt
+ self._read_until_prompt(client_file)
+
+ # Continue execution
+ self._send_command(client_file, "c")
+
+ # Wait until execution has continued
+ synchronizer_sock.accept()[0].close()
+
+ # Inject a script to interrupt the running process
+ self._send_interrupt(process.pid)
+ messages = self._read_until_prompt(client_file)
+
+ # Verify we got the keyboard interrupt message
+ interrupt_msg = next(msg['message'] for msg in messages if 'message' in msg)
+ self.assertIn("bar()", interrupt_msg)
+
+ # Continue to end
+ self._send_command(client_file, "iterations = 0")
+ self._send_command(client_file, "c")
+ stdout, _ = process.communicate(timeout=5)
+ self.assertIn("Function returned: 42", stdout)
+ self.assertEqual(process.returncode, 0)
+
+ def test_handle_eof(self):
+ """Test that EOF signal properly exits the debugger."""
+ self._create_script()
+ process, client_file = self._connect_and_get_client_file()
+
+ with process:
+ # Skip initial messages until we get to the prompt
+ self._read_until_prompt(client_file)
+
+ # Send EOF signal to exit the debugger
+ client_file.write(json.dumps({"signal": "EOF"}).encode() + b"\n")
+ client_file.flush()
+
+ # The process should complete normally after receiving EOF
+ stdout, stderr = process.communicate(timeout=5)
+
+ # Verify process completed correctly
+ self.assertIn("Function returned: 42", stdout)
+ self.assertEqual(process.returncode, 0)
+ self.assertEqual(stderr, "")
+
+ def test_protocol_version(self):
+ """Test that incompatible protocol versions are properly detected."""
+ # Create a script using an incompatible protocol version
+ script = textwrap.dedent(f'''
+ import sys
+ import pdb
+
+ def run_test():
+ frame = sys._getframe()
+
+ # Use a fake version number that's definitely incompatible
+ fake_version = 0x01010101 # A fake version that doesn't match any real Python version
+
+ # Connect with the wrong version
+ pdb._connect(
+ host='127.0.0.1',
+ port={self.port},
+ frame=frame,
+ commands="",
+ version=fake_version,
+ )
+
+ # This should print if the debugger detaches correctly
+ print("Debugger properly detected version mismatch")
+ return True
+
+ if __name__ == "__main__":
+ print("Test result:", run_test())
+ ''')
+ self._create_script(script=script)
+ process, client_file = self._connect_and_get_client_file()
+
+ with process:
+ # First message should be an error about protocol version mismatch
+ data = client_file.readline()
+ message = json.loads(data.decode())
+
+ self.assertIn('message', message)
+ self.assertEqual(message['type'], 'error')
+ self.assertIn('incompatible', message['message'])
+ self.assertIn('protocol version', message['message'])
+
+ # The process should complete normally
+ stdout, stderr = process.communicate(timeout=5)
+
+ # Verify the process completed successfully
+ self.assertIn("Test result: True", stdout)
+ self.assertIn("Debugger properly detected version mismatch", stdout)
+ self.assertEqual(process.returncode, 0)
+
+ def test_help_system(self):
+ """Test that the help system properly sends help text to the client."""
+ self._create_script()
+ process, client_file = self._connect_and_get_client_file()
+
+ with process:
+ # Skip initial messages until we get to the prompt
+ self._read_until_prompt(client_file)
+
+ # Request help for different commands
+ help_commands = ["help", "help break", "help continue", "help pdb"]
+
+ for cmd in help_commands:
+ self._send_command(client_file, cmd)
+
+ # Look for help message
+ data = client_file.readline()
+ message = json.loads(data.decode())
+
+ self.assertIn('help', message)
+
+ if cmd == "help":
+ # Should just contain the command itself
+ self.assertEqual(message['help'], "")
+ else:
+ # Should contain the specific command we asked for help with
+ command = cmd.split()[1]
+ self.assertEqual(message['help'], command)
+
+ # Skip to the next prompt
+ self._read_until_prompt(client_file)
+
+ # Continue execution to finish the program
+ self._send_command(client_file, "c")
+
+ stdout, stderr = process.communicate(timeout=5)
+ self.assertIn("Function returned: 42", stdout)
+ self.assertEqual(process.returncode, 0)
+
+ def test_multi_line_commands(self):
+ """Test that multi-line commands work properly over remote connection."""
+ self._create_script()
+ process, client_file = self._connect_and_get_client_file()
+
+ with process:
+ # Skip initial messages until we get to the prompt
+ self._read_until_prompt(client_file)
+
+ # Send a multi-line command
+ multi_line_commands = [
+ # Define a function
+ "def test_func():\n return 42",
+
+ # For loop
+ "for i in range(3):\n print(i)",
+
+ # If statement
+ "if True:\n x = 42\nelse:\n x = 0",
+
+ # Try/except
+ "try:\n result = 10/2\n print(result)\nexcept ZeroDivisionError:\n print('Error')",
+
+ # Class definition
+ "class TestClass:\n def __init__(self):\n self.value = 100\n def get_value(self):\n return self.value"
+ ]
+
+ for cmd in multi_line_commands:
+ self._send_command(client_file, cmd)
+ self._read_until_prompt(client_file)
+
+ # Test executing the defined function
+ self._send_command(client_file, "test_func()")
+ messages = self._read_until_prompt(client_file)
+
+ # Find the result message
+ result_msg = next(msg['message'] for msg in messages if 'message' in msg)
+ self.assertIn("42", result_msg)
+
+ # Test creating an instance of the defined class
+ self._send_command(client_file, "obj = TestClass()")
+ self._read_until_prompt(client_file)
+
+ # Test calling a method on the instance
+ self._send_command(client_file, "obj.get_value()")
+ messages = self._read_until_prompt(client_file)
+
+ # Find the result message
+ result_msg = next(msg['message'] for msg in messages if 'message' in msg)
+ self.assertIn("100", result_msg)
+
+ # Continue execution to finish
+ self._send_command(client_file, "c")
+
+ stdout, stderr = process.communicate(timeout=5)
+ self.assertIn("Function returned: 42", stdout)
+ self.assertEqual(process.returncode, 0)
+
+if __name__ == "__main__":
+ unittest.main()