import io
import time
+import itertools
import json
import os
import signal
import threading
import unittest
import unittest.mock
-from contextlib import contextmanager
+from contextlib import contextmanager, redirect_stdout, ExitStack
from pathlib import Path
from test.support import is_wasi, os_helper, requires_subprocess, SHORT_TIMEOUT
from test.support.os_helper import temp_dir, TESTFN, unlink
return results
+class MockDebuggerSocket:
+ """Mock file-like simulating a connection to a _RemotePdb instance"""
+
+ def __init__(self, incoming):
+ self.incoming = iter(incoming)
+ self.outgoing = []
+ self.buffered = bytearray()
+
+ def write(self, data: bytes) -> None:
+ """Simulate write to socket."""
+ self.buffered += data
+
+ def flush(self) -> None:
+ """Ensure each line is valid JSON."""
+ lines = self.buffered.splitlines(keepends=True)
+ self.buffered.clear()
+ for line in lines:
+ assert line.endswith(b"\n")
+ self.outgoing.append(json.loads(line))
+
+ def readline(self) -> bytes:
+ """Read a line from the prepared input queue."""
+ # Anything written must be flushed before trying to read,
+ # since the read will be dependent upon the last write.
+ assert not self.buffered
+ try:
+ item = next(self.incoming)
+ if not isinstance(item, bytes):
+ item = json.dumps(item).encode()
+ return item + b"\n"
+ except StopIteration:
+ return b""
+
+ def close(self) -> None:
+ """No-op close implementation."""
+ pass
+
+
+class PdbClientTestCase(unittest.TestCase):
+ """Tests for the _PdbClient class."""
+
+ def do_test(
+ self,
+ *,
+ incoming,
+ simulate_failure=None,
+ expected_outgoing=None,
+ expected_completions=None,
+ expected_exception=None,
+ expected_stdout="",
+ expected_stdout_substring="",
+ expected_state=None,
+ ):
+ if expected_outgoing is None:
+ expected_outgoing = []
+ if expected_completions is None:
+ expected_completions = []
+ if expected_state is None:
+ expected_state = {}
+
+ expected_state.setdefault("write_failed", False)
+ messages = [m for source, m in incoming if source == "server"]
+ prompts = [m["prompt"] for source, m in incoming if source == "user"]
+ sockfile = MockDebuggerSocket(messages)
+ stdout = io.StringIO()
+
+ if simulate_failure:
+ sockfile.write = unittest.mock.Mock()
+ sockfile.flush = unittest.mock.Mock()
+ if simulate_failure == "write":
+ sockfile.write.side_effect = OSError("write failed")
+ elif simulate_failure == "flush":
+ sockfile.flush.side_effect = OSError("flush failed")
+
+ input_iter = (m for source, m in incoming if source == "user")
+ completions = []
+
+ def mock_input(prompt):
+ message = next(input_iter, None)
+ if message is None:
+ raise EOFError
+
+ if req := message.get("completion_request"):
+ readline_mock = unittest.mock.Mock()
+ readline_mock.get_line_buffer.return_value = req["line"]
+ readline_mock.get_begidx.return_value = req["begidx"]
+ readline_mock.get_endidx.return_value = req["endidx"]
+ unittest.mock.seal(readline_mock)
+ with unittest.mock.patch.dict(sys.modules, {"readline": readline_mock}):
+ for param in itertools.count():
+ prefix = req["line"][req["begidx"] : req["endidx"]]
+ completion = client.complete(prefix, param)
+ if completion is None:
+ break
+ completions.append(completion)
+
+ reply = message["input"]
+ if isinstance(reply, BaseException):
+ raise reply
+ return reply
+
+ with ExitStack() as stack:
+ input_mock = stack.enter_context(
+ unittest.mock.patch("pdb.input", side_effect=mock_input)
+ )
+ stack.enter_context(redirect_stdout(stdout))
+
+ client = _PdbClient(
+ pid=0,
+ sockfile=sockfile,
+ interrupt_script="/a/b.py",
+ )
+
+ if expected_exception is not None:
+ exception = expected_exception["exception"]
+ msg = expected_exception["msg"]
+ stack.enter_context(self.assertRaises(exception, msg=msg))
+
+ client.cmdloop()
+
+ actual_outgoing = sockfile.outgoing
+ if simulate_failure:
+ actual_outgoing += [
+ json.loads(msg.args[0]) for msg in sockfile.write.mock_calls
+ ]
+
+ self.assertEqual(sockfile.outgoing, expected_outgoing)
+ self.assertEqual(completions, expected_completions)
+ if expected_stdout_substring and not expected_stdout:
+ self.assertIn(expected_stdout_substring, stdout.getvalue())
+ else:
+ self.assertEqual(stdout.getvalue(), expected_stdout)
+ input_mock.assert_has_calls([unittest.mock.call(p) for p in prompts])
+ actual_state = {k: getattr(client, k) for k in expected_state}
+ self.assertEqual(actual_state, expected_state)
+
+ def test_remote_immediately_closing_the_connection(self):
+ """Test the behavior when the remote closes the connection immediately."""
+ incoming = []
+ expected_outgoing = []
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=expected_outgoing,
+ )
+
+ def test_handling_command_list(self):
+ """Test handling the command_list message."""
+ incoming = [
+ ("server", {"command_list": ["help", "list", "continue"]}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[],
+ expected_state={
+ "pdb_commands": {"help", "list", "continue"},
+ },
+ )
+
+ def test_handling_info_message(self):
+ """Test handling a message payload with type='info'."""
+ incoming = [
+ ("server", {"message": "Some message or other\n", "type": "info"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[],
+ expected_stdout="Some message or other\n",
+ )
+
+ def test_handling_error_message(self):
+ """Test handling a message payload with type='error'."""
+ incoming = [
+ ("server", {"message": "Some message or other.", "type": "error"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[],
+ expected_stdout="*** Some message or other.\n",
+ )
+
+ def test_handling_other_message(self):
+ """Test handling a message payload with an unrecognized type."""
+ incoming = [
+ ("server", {"message": "Some message.\n", "type": "unknown"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[],
+ expected_stdout="Some message.\n",
+ )
+
+ def test_handling_help_for_command(self):
+ """Test handling a request to display help for a command."""
+ incoming = [
+ ("server", {"help": "ll"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[],
+ expected_stdout_substring="Usage: ll | longlist",
+ )
+
+ def test_handling_help_without_a_specific_topic(self):
+ """Test handling a request to display a help overview."""
+ incoming = [
+ ("server", {"help": ""}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[],
+ expected_stdout_substring="type help <topic>",
+ )
+
+ def test_handling_help_pdb(self):
+ """Test handling a request to display the full PDB manual."""
+ incoming = [
+ ("server", {"help": "pdb"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[],
+ expected_stdout_substring=">>> import pdb",
+ )
+
+ def test_handling_pdb_prompts(self):
+ """Test responding to pdb's normal prompts."""
+ incoming = [
+ ("server", {"command_list": ["b"]}),
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ ("user", {"prompt": "(Pdb) ", "input": "lst ["}),
+ ("user", {"prompt": "... ", "input": "0 ]"}),
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ ("user", {"prompt": "(Pdb) ", "input": ""}),
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ ("user", {"prompt": "(Pdb) ", "input": "b ["}),
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ ("user", {"prompt": "(Pdb) ", "input": "! b ["}),
+ ("user", {"prompt": "... ", "input": "b ]"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {"reply": "lst [\n0 ]"},
+ {"reply": ""},
+ {"reply": "b ["},
+ {"reply": "!b [\nb ]"},
+ ],
+ expected_state={"state": "pdb"},
+ )
+
+ def test_handling_interact_prompts(self):
+ """Test responding to pdb's interact mode prompts."""
+ incoming = [
+ ("server", {"command_list": ["b"]}),
+ ("server", {"prompt": ">>> ", "state": "interact"}),
+ ("user", {"prompt": ">>> ", "input": "lst ["}),
+ ("user", {"prompt": "... ", "input": "0 ]"}),
+ ("server", {"prompt": ">>> ", "state": "interact"}),
+ ("user", {"prompt": ">>> ", "input": ""}),
+ ("server", {"prompt": ">>> ", "state": "interact"}),
+ ("user", {"prompt": ">>> ", "input": "b ["}),
+ ("user", {"prompt": "... ", "input": "b ]"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {"reply": "lst [\n0 ]"},
+ {"reply": ""},
+ {"reply": "b [\nb ]"},
+ ],
+ expected_state={"state": "interact"},
+ )
+
+ def test_retry_pdb_prompt_on_syntax_error(self):
+ """Test re-prompting after a SyntaxError in a Python expression."""
+ incoming = [
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ ("user", {"prompt": "(Pdb) ", "input": " lst ["}),
+ ("user", {"prompt": "(Pdb) ", "input": "lst ["}),
+ ("user", {"prompt": "... ", "input": " 0 ]"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {"reply": "lst [\n 0 ]"},
+ ],
+ expected_stdout_substring="*** IndentationError",
+ expected_state={"state": "pdb"},
+ )
+
+ def test_retry_interact_prompt_on_syntax_error(self):
+ """Test re-prompting after a SyntaxError in a Python expression."""
+ incoming = [
+ ("server", {"prompt": ">>> ", "state": "interact"}),
+ ("user", {"prompt": ">>> ", "input": "!lst ["}),
+ ("user", {"prompt": ">>> ", "input": "lst ["}),
+ ("user", {"prompt": "... ", "input": " 0 ]"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {"reply": "lst [\n 0 ]"},
+ ],
+ expected_stdout_substring="*** SyntaxError",
+ expected_state={"state": "interact"},
+ )
+
+ def test_handling_unrecognized_prompt_type(self):
+ """Test fallback to "dumb" single-line mode for unknown states."""
+ incoming = [
+ ("server", {"prompt": "Do it? ", "state": "confirm"}),
+ ("user", {"prompt": "Do it? ", "input": "! ["}),
+ ("server", {"prompt": "Do it? ", "state": "confirm"}),
+ ("user", {"prompt": "Do it? ", "input": "echo hello"}),
+ ("server", {"prompt": "Do it? ", "state": "confirm"}),
+ ("user", {"prompt": "Do it? ", "input": ""}),
+ ("server", {"prompt": "Do it? ", "state": "confirm"}),
+ ("user", {"prompt": "Do it? ", "input": "echo goodbye"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {"reply": "! ["},
+ {"reply": "echo hello"},
+ {"reply": ""},
+ {"reply": "echo goodbye"},
+ ],
+ expected_state={"state": "dumb"},
+ )
+
+ def test_keyboard_interrupt_at_prompt(self):
+ """Test signaling when a prompt gets a KeyboardInterrupt."""
+ incoming = [
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ ("user", {"prompt": "(Pdb) ", "input": KeyboardInterrupt()}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {"signal": "INT"},
+ ],
+ expected_state={"state": "pdb"},
+ )
+
+ def test_eof_at_prompt(self):
+ """Test signaling when a prompt gets an EOFError."""
+ incoming = [
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ ("user", {"prompt": "(Pdb) ", "input": EOFError()}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {"signal": "EOF"},
+ ],
+ expected_state={"state": "pdb"},
+ )
+
+ def test_unrecognized_json_message(self):
+ """Test failing after getting an unrecognized payload."""
+ incoming = [
+ ("server", {"monty": "python"}),
+ ("server", {"message": "Some message or other\n", "type": "info"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[],
+ expected_exception={
+ "exception": RuntimeError,
+ "msg": 'Unrecognized payload b\'{"monty": "python"}\'',
+ },
+ )
+
+ def test_continuing_after_getting_a_non_json_payload(self):
+ """Test continuing after getting a non JSON payload."""
+ incoming = [
+ ("server", b"spam"),
+ ("server", {"message": "Something", "type": "info"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[],
+ expected_stdout="\n".join(
+ [
+ "*** Invalid JSON from remote: b'spam\\n'",
+ "Something",
+ ]
+ ),
+ )
+
+ def test_write_failing(self):
+ """Test terminating if write fails due to a half closed socket."""
+ incoming = [
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ ("user", {"prompt": "(Pdb) ", "input": KeyboardInterrupt()}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[{"signal": "INT"}],
+ simulate_failure="write",
+ expected_state={"write_failed": True},
+ )
+
+ def test_flush_failing(self):
+ """Test terminating if flush fails due to a half closed socket."""
+ incoming = [
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ ("user", {"prompt": "(Pdb) ", "input": KeyboardInterrupt()}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[{"signal": "INT"}],
+ simulate_failure="flush",
+ expected_state={"write_failed": True},
+ )
+
+ def test_completion_in_pdb_state(self):
+ """Test requesting tab completions at a (Pdb) prompt."""
+ # GIVEN
+ incoming = [
+ ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
+ (
+ "user",
+ {
+ "prompt": "(Pdb) ",
+ "completion_request": {
+ "line": " mod._",
+ "begidx": 8,
+ "endidx": 9,
+ },
+ "input": "print(\n mod.__name__)",
+ },
+ ),
+ ("server", {"completions": ["__name__", "__file__"]}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {
+ "complete": {
+ "text": "_",
+ "line": "mod._",
+ "begidx": 4,
+ "endidx": 5,
+ }
+ },
+ {"reply": "print(\n mod.__name__)"},
+ ],
+ expected_completions=["__name__", "__file__"],
+ expected_state={"state": "pdb"},
+ )
+
+ def test_completion_in_interact_state(self):
+ """Test requesting tab completions at a >>> prompt."""
+ incoming = [
+ ("server", {"prompt": ">>> ", "state": "interact"}),
+ (
+ "user",
+ {
+ "prompt": ">>> ",
+ "completion_request": {
+ "line": " mod.__",
+ "begidx": 8,
+ "endidx": 10,
+ },
+ "input": "print(\n mod.__name__)",
+ },
+ ),
+ ("server", {"completions": ["__name__", "__file__"]}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {
+ "complete": {
+ "text": "__",
+ "line": "mod.__",
+ "begidx": 4,
+ "endidx": 6,
+ }
+ },
+ {"reply": "print(\n mod.__name__)"},
+ ],
+ expected_completions=["__name__", "__file__"],
+ expected_state={"state": "interact"},
+ )
+
+ def test_completion_in_unknown_state(self):
+ """Test requesting tab completions at an unrecognized prompt."""
+ incoming = [
+ ("server", {"command_list": ["p"]}),
+ ("server", {"prompt": "Do it? ", "state": "confirm"}),
+ (
+ "user",
+ {
+ "prompt": "Do it? ",
+ "completion_request": {
+ "line": "_",
+ "begidx": 0,
+ "endidx": 1,
+ },
+ "input": "__name__",
+ },
+ ),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {"reply": "__name__"},
+ ],
+ expected_state={"state": "dumb"},
+ )
+
+ def test_write_failure_during_completion(self):
+ """Test failing to write to the socket to request tab completions."""
+ incoming = [
+ ("server", {"prompt": ">>> ", "state": "interact"}),
+ (
+ "user",
+ {
+ "prompt": ">>> ",
+ "completion_request": {
+ "line": "xy",
+ "begidx": 0,
+ "endidx": 2,
+ },
+ "input": "xyz",
+ },
+ ),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {
+ "complete": {
+ "text": "xy",
+ "line": "xy",
+ "begidx": 0,
+ "endidx": 2,
+ }
+ },
+ {"reply": "xyz"},
+ ],
+ simulate_failure="write",
+ expected_completions=[],
+ expected_state={"state": "interact", "write_failed": True},
+ )
+
+ def test_flush_failure_during_completion(self):
+ """Test failing to flush to the socket to request tab completions."""
+ incoming = [
+ ("server", {"prompt": ">>> ", "state": "interact"}),
+ (
+ "user",
+ {
+ "prompt": ">>> ",
+ "completion_request": {
+ "line": "xy",
+ "begidx": 0,
+ "endidx": 2,
+ },
+ "input": "xyz",
+ },
+ ),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {
+ "complete": {
+ "text": "xy",
+ "line": "xy",
+ "begidx": 0,
+ "endidx": 2,
+ }
+ },
+ {"reply": "xyz"},
+ ],
+ simulate_failure="flush",
+ expected_completions=[],
+ expected_state={"state": "interact", "write_failed": True},
+ )
+
+ def test_read_failure_during_completion(self):
+ """Test failing to read tab completions from the socket."""
+ incoming = [
+ ("server", {"prompt": ">>> ", "state": "interact"}),
+ (
+ "user",
+ {
+ "prompt": ">>> ",
+ "completion_request": {
+ "line": "xy",
+ "begidx": 0,
+ "endidx": 2,
+ },
+ "input": "xyz",
+ },
+ ),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {
+ "complete": {
+ "text": "xy",
+ "line": "xy",
+ "begidx": 0,
+ "endidx": 2,
+ }
+ },
+ {"reply": "xyz"},
+ ],
+ expected_completions=[],
+ expected_state={"state": "interact"},
+ )
+
+ def test_reading_invalid_json_during_completion(self):
+ """Test receiving invalid JSON when getting tab completions."""
+ incoming = [
+ ("server", {"prompt": ">>> ", "state": "interact"}),
+ (
+ "user",
+ {
+ "prompt": ">>> ",
+ "completion_request": {
+ "line": "xy",
+ "begidx": 0,
+ "endidx": 2,
+ },
+ "input": "xyz",
+ },
+ ),
+ ("server", b'{"completions": '),
+ ("user", {"prompt": ">>> ", "input": "xyz"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {
+ "complete": {
+ "text": "xy",
+ "line": "xy",
+ "begidx": 0,
+ "endidx": 2,
+ }
+ },
+ {"reply": "xyz"},
+ ],
+ expected_stdout_substring="*** json.decoder.JSONDecodeError",
+ expected_completions=[],
+ expected_state={"state": "interact"},
+ )
+
+ def test_reading_empty_json_during_completion(self):
+ """Test receiving an empty JSON object when getting tab completions."""
+ incoming = [
+ ("server", {"prompt": ">>> ", "state": "interact"}),
+ (
+ "user",
+ {
+ "prompt": ">>> ",
+ "completion_request": {
+ "line": "xy",
+ "begidx": 0,
+ "endidx": 2,
+ },
+ "input": "xyz",
+ },
+ ),
+ ("server", {}),
+ ("user", {"prompt": ">>> ", "input": "xyz"}),
+ ]
+ self.do_test(
+ incoming=incoming,
+ expected_outgoing=[
+ {
+ "complete": {
+ "text": "xy",
+ "line": "xy",
+ "begidx": 0,
+ "endidx": 2,
+ }
+ },
+ {"reply": "xyz"},
+ ],
+ expected_stdout=(
+ "*** RuntimeError: Failed to get valid completions."
+ " Got: {}\n"
+ ),
+ expected_completions=[],
+ expected_state={"state": "interact"},
+ )
+
+
class RemotePdbTestCase(unittest.TestCase):
"""Tests for the _PdbServer class."""