import io
import itertools
import json
import os
import re
import signal
import socket
import subprocess
import sys
import textwrap
import unittest
import unittest.mock
from contextlib import closing, contextmanager, redirect_stdout, redirect_stderr, ExitStack
from test.support import is_wasi, cpython_only, force_color, requires_subprocess, SHORT_TIMEOUT, subTests
from test.support.os_helper import TESTFN, unlink
from typing import List

import pdb
from pdb import _PdbServer, _PdbClient


if not sys.is_remote_debug_enabled():
    raise unittest.SkipTest('remote debugging is disabled')


@contextmanager
def kill_on_error(proc):
    """Context manager killing the subprocess if a Python exception is raised."""
    with proc:
        try:
            yield proc
        except:
            proc.kill()
            raise


class MockSocketFile:
    """Mock socket file for testing _PdbServer without actual socket connections."""

    def __init__(self):
        self.input_queue = []
        self.output_buffer = []

    def write(self, data: bytes) -> None:
        """Simulate write to socket."""
        self.output_buffer.append(data)

    def flush(self) -> None:
        """No-op flush implementation."""
        pass

    def readline(self) -> bytes:
        """Read a line from the prepared input queue."""
        if not self.input_queue:
            return b""
        return self.input_queue.pop(0)

    def close(self) -> None:
        """Close the mock socket file."""
        pass

    def add_input(self, data: dict) -> None:
        """Add input that will be returned by readline."""
        self.input_queue.append(json.dumps(data).encode() + b"\n")

    def get_output(self) -> List[dict]:
        """Get the output that was written by the object being tested."""
        results = []
        for data in self.output_buffer:
            if isinstance(data, bytes) and data.endswith(b"\n"):
                try:
                    results.append(json.loads(data.decode().strip()))
                except json.JSONDecodeError:
                    pass  # Ignore non-JSON output
        self.output_buffer = []
        return results


class PdbClientTestCase(unittest.TestCase):
    """Tests for the _PdbClient class."""

    def do_test(
        self,
        *,
        incoming,
        simulate_send_failure=False,
        simulate_sigint_during_stdout_write=False,
        use_interrupt_socket=False,
        expected_outgoing=None,
        expected_outgoing_signals=None,
        expected_completions=None,
        expected_exception=None,
        expected_stdout="",
        expected_stdout_substring="",
        expected_state=None,
    ):
        if expected_outgoing is None:
            expected_outgoing = []
        if expected_outgoing_signals is None:
            expected_outgoing_signals = []
        if expected_completions is None:
            expected_completions = []
        if expected_state is None:
            expected_state = {}

        expected_state.setdefault("write_failed", False)
        messages = [m for source, m in incoming if source == "server"]
        prompts = [m["prompt"] for source, m in incoming if source == "user"]

        input_iter = (m for source, m in incoming if source == "user")
        completions = []

        def mock_input(prompt):
            message = next(input_iter, None)
            if message is None:
                raise EOFError

            if req := message.get("completion_request"):
                readline_mock = unittest.mock.Mock()
                readline_mock.get_line_buffer.return_value = req["line"]
                readline_mock.get_begidx.return_value = req["begidx"]
                readline_mock.get_endidx.return_value = req["endidx"]
                unittest.mock.seal(readline_mock)
                with unittest.mock.patch.dict(sys.modules, {"readline": readline_mock}):
                    for param in itertools.count():
                        prefix = req["line"][req["begidx"] : req["endidx"]]
                        completion = client.complete(prefix, param)
                        if completion is None:
                            break
                        completions.append(completion)

            reply = message["input"]
            if isinstance(reply, BaseException):
                raise reply
            if isinstance(reply, str):
                return reply
            return reply()

        with ExitStack() as stack:
            client_sock, server_sock = socket.socketpair()
            stack.enter_context(closing(client_sock))
            stack.enter_context(closing(server_sock))

            server_sock = unittest.mock.Mock(wraps=server_sock)

            client_sock.sendall(
                b"".join(
                    (m if isinstance(m, bytes) else json.dumps(m).encode()) + b"\n"
                    for m in messages
                )
            )
            client_sock.shutdown(socket.SHUT_WR)

            if simulate_send_failure:
                server_sock.sendall = unittest.mock.Mock(
                    side_effect=OSError("sendall failed")
                )
                client_sock.shutdown(socket.SHUT_RD)

            stdout = io.StringIO()

            if simulate_sigint_during_stdout_write:
                orig_stdout_write = stdout.write

                def sigint_stdout_write(s):
                    signal.raise_signal(signal.SIGINT)
                    return orig_stdout_write(s)

                stdout.write = sigint_stdout_write

            input_mock = stack.enter_context(
                unittest.mock.patch("pdb.input", side_effect=mock_input)
            )
            stack.enter_context(redirect_stdout(stdout))

            if use_interrupt_socket:
                interrupt_sock = unittest.mock.Mock(spec=socket.socket)
                mock_kill = None
            else:
                interrupt_sock = None
                mock_kill = stack.enter_context(
                    unittest.mock.patch("os.kill", spec=os.kill)
                )

            client = _PdbClient(
                pid=12345,
                server_socket=server_sock,
                interrupt_sock=interrupt_sock,
            )

            if expected_exception is not None:
                exception = expected_exception["exception"]
                msg = expected_exception["msg"]
                stack.enter_context(self.assertRaises(exception, msg=msg))

            client.cmdloop()

        sent_msgs = [msg.args[0] for msg in server_sock.sendall.mock_calls]
        for msg in sent_msgs:
            assert msg.endswith(b"\n")
        actual_outgoing = [json.loads(msg) for msg in sent_msgs]

        self.assertEqual(actual_outgoing, expected_outgoing)
        self.assertEqual(completions, expected_completions)
        if expected_stdout_substring and not expected_stdout:
            self.assertIn(expected_stdout_substring, stdout.getvalue())
        else:
            self.assertEqual(stdout.getvalue(), expected_stdout)
        input_mock.assert_has_calls([unittest.mock.call(p) for p in prompts])
        actual_state = {k: getattr(client, k) for k in expected_state}
        self.assertEqual(actual_state, expected_state)

        if use_interrupt_socket:
            outgoing_signals = [
                signal.Signals(int.from_bytes(call.args[0]))
                for call in interrupt_sock.sendall.call_args_list
            ]
        else:
            assert mock_kill is not None
            outgoing_signals = []
            for call in mock_kill.call_args_list:
                pid, signum = call.args
                self.assertEqual(pid, 12345)
                outgoing_signals.append(signal.Signals(signum))
        self.assertEqual(outgoing_signals, expected_outgoing_signals)

    def test_remote_immediately_closing_the_connection(self):
        """Test the behavior when the remote closes the connection immediately."""
        incoming = []
        expected_outgoing = []
        self.do_test(
            incoming=incoming,
            expected_outgoing=expected_outgoing,
        )

    def test_handling_command_list(self):
        """Test handling the command_list message."""
        incoming = [
            ("server", {"command_list": ["help", "list", "continue"]}),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[],
            expected_state={
                "pdb_commands": {"help", "list", "continue"},
            },
        )

    def test_handling_info_message(self):
        """Test handling a message payload with type='info'."""
        incoming = [
            ("server", {"message": "Some message or other\n", "type": "info"}),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[],
            expected_stdout="Some message or other\n",
        )

    def test_handling_error_message(self):
        """Test handling a message payload with type='error'."""
        incoming = [
            ("server", {"message": "Some message or other.", "type": "error"}),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[],
            expected_stdout="*** Some message or other.\n",
        )

    def test_handling_other_message(self):
        """Test handling a message payload with an unrecognized type."""
        incoming = [
            ("server", {"message": "Some message.\n", "type": "unknown"}),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[],
            expected_stdout="Some message.\n",
        )

    @unittest.skipIf(sys.flags.optimize >= 2, "Help not available for -OO")
    @subTests(
        "help_request,expected_substring",
        [
            # a request to display help for a command
            ({"help": "ll"}, "Usage: ll | longlist"),
            # a request to display a help overview
            ({"help": ""}, "type help <topic>"),
            # a request to display the full PDB manual
            ({"help": "pdb"}, ">>> import pdb"),
        ],
    )
    def test_handling_help_when_available(self, help_request, expected_substring):
        """Test handling help requests when help is available."""
        incoming = [
            ("server", help_request),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[],
            expected_stdout_substring=expected_substring,
        )

    @unittest.skipIf(sys.flags.optimize < 2, "Needs -OO")
    @subTests(
        "help_request,expected_substring",
        [
            # a request to display help for a command
            ({"help": "ll"}, "No help for 'll'"),
            # a request to display a help overview
            ({"help": ""}, "Undocumented commands"),
            # a request to display the full PDB manual
            ({"help": "pdb"}, "No help for 'pdb'"),
        ],
    )
    def test_handling_help_when_not_available(self, help_request, expected_substring):
        """Test handling help requests when help is not available."""
        incoming = [
            ("server", help_request),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[],
            expected_stdout_substring=expected_substring,
        )

    def test_handling_pdb_prompts(self):
        """Test responding to pdb's normal prompts."""
        incoming = [
            ("server", {"command_list": ["b"]}),
            ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
            ("user", {"prompt": "(Pdb) ", "input": "lst ["}),
            ("user", {"prompt": "...   ", "input": "0 ]"}),
            ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
            ("user", {"prompt": "(Pdb) ", "input": ""}),
            ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
            ("user", {"prompt": "(Pdb) ", "input": "b ["}),
            ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
            ("user", {"prompt": "(Pdb) ", "input": "! b ["}),
            ("user", {"prompt": "...   ", "input": "b ]"}),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[
                {"reply": "lst [\n0 ]"},
                {"reply": ""},
                {"reply": "b ["},
                {"reply": "!b [\nb ]"},
            ],
            expected_state={"state": "pdb"},
        )

    def test_handling_interact_prompts(self):
        """Test responding to pdb's interact mode prompts."""
        incoming = [
            ("server", {"command_list": ["b"]}),
            ("server", {"prompt": ">>> ", "state": "interact"}),
            ("user", {"prompt": ">>> ", "input": "lst ["}),
            ("user", {"prompt": "... ", "input": "0 ]"}),
            ("server", {"prompt": ">>> ", "state": "interact"}),
            ("user", {"prompt": ">>> ", "input": ""}),
            ("server", {"prompt": ">>> ", "state": "interact"}),
            ("user", {"prompt": ">>> ", "input": "b ["}),
            ("user", {"prompt": "... ", "input": "b ]"}),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[
                {"reply": "lst [\n0 ]"},
                {"reply": ""},
                {"reply": "b [\nb ]"},
            ],
            expected_state={"state": "interact"},
        )

    def test_retry_pdb_prompt_on_syntax_error(self):
        """Test re-prompting after a SyntaxError in a Python expression."""
        incoming = [
            ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
            ("user", {"prompt": "(Pdb) ", "input": " lst ["}),
            ("user", {"prompt": "(Pdb) ", "input": "lst ["}),
            ("user", {"prompt": "...   ", "input": " 0 ]"}),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[
                {"reply": "lst [\n 0 ]"},
            ],
            expected_stdout_substring="*** IndentationError",
            expected_state={"state": "pdb"},
        )

    def test_retry_interact_prompt_on_syntax_error(self):
        """Test re-prompting after a SyntaxError in a Python expression."""
        incoming = [
            ("server", {"prompt": ">>> ", "state": "interact"}),
            ("user", {"prompt": ">>> ", "input": "!lst ["}),
            ("user", {"prompt": ">>> ", "input": "lst ["}),
            ("user", {"prompt": "... ", "input": " 0 ]"}),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[
                {"reply": "lst [\n 0 ]"},
            ],
            expected_stdout_substring="*** SyntaxError",
            expected_state={"state": "interact"},
        )

    def test_handling_unrecognized_prompt_type(self):
        """Test fallback to "dumb" single-line mode for unknown states."""
        incoming = [
            ("server", {"prompt": "Do it? ", "state": "confirm"}),
            ("user", {"prompt": "Do it? ", "input": "! ["}),
            ("server", {"prompt": "Do it? ", "state": "confirm"}),
            ("user", {"prompt": "Do it? ", "input": "echo hello"}),
            ("server", {"prompt": "Do it? ", "state": "confirm"}),
            ("user", {"prompt": "Do it? ", "input": ""}),
            ("server", {"prompt": "Do it? ", "state": "confirm"}),
            ("user", {"prompt": "Do it? ", "input": "echo goodbye"}),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[
                {"reply": "! ["},
                {"reply": "echo hello"},
                {"reply": ""},
                {"reply": "echo goodbye"},
            ],
            expected_state={"state": "dumb"},
        )

    def test_sigint_at_prompt(self):
        """Test signaling when a prompt gets interrupted."""
        incoming = [
            ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
            (
                "user",
                {
                    "prompt": "(Pdb) ",
                    "input": lambda: signal.raise_signal(signal.SIGINT),
                },
            ),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[
                {"signal": "INT"},
            ],
            expected_state={"state": "pdb"},
        )

    def test_sigint_at_continuation_prompt(self):
        """Test signaling when a continuation prompt gets interrupted."""
        incoming = [
            ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
            ("user", {"prompt": "(Pdb) ", "input": "if True:"}),
            (
                "user",
                {
                    "prompt": "...   ",
                    "input": lambda: signal.raise_signal(signal.SIGINT),
                },
            ),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[
                {"signal": "INT"},
            ],
            expected_state={"state": "pdb"},
        )

    def test_sigint_when_writing(self):
        """Test siginaling when sys.stdout.write() gets interrupted."""
        incoming = [
            ("server", {"message": "Some message or other\n", "type": "info"}),
        ]
        for use_interrupt_socket in [False, True]:
            with self.subTest(use_interrupt_socket=use_interrupt_socket):
                self.do_test(
                    incoming=incoming,
                    simulate_sigint_during_stdout_write=True,
                    use_interrupt_socket=use_interrupt_socket,
                    expected_outgoing=[],
                    expected_outgoing_signals=[signal.SIGINT],
                    expected_stdout="Some message or other\n",
                )

    def test_eof_at_prompt(self):
        """Test signaling when a prompt gets an EOFError."""
        incoming = [
            ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
            ("user", {"prompt": "(Pdb) ", "input": EOFError()}),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[
                {"signal": "EOF"},
            ],
            expected_state={"state": "pdb"},
        )

    def test_unrecognized_json_message(self):
        """Test failing after getting an unrecognized payload."""
        incoming = [
            ("server", {"monty": "python"}),
            ("server", {"message": "Some message or other\n", "type": "info"}),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[],
            expected_exception={
                "exception": RuntimeError,
                "msg": 'Unrecognized payload b\'{"monty": "python"}\'',
            },
        )

    def test_continuing_after_getting_a_non_json_payload(self):
        """Test continuing after getting a non JSON payload."""
        incoming = [
            ("server", b"spam"),
            ("server", {"message": "Something", "type": "info"}),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[],
            expected_stdout="\n".join(
                [
                    "*** Invalid JSON from remote: b'spam\\n'",
                    "Something",
                ]
            ),
        )

    def test_write_failing(self):
        """Test terminating if write fails due to a half closed socket."""
        incoming = [
            ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
            ("user", {"prompt": "(Pdb) ", "input": KeyboardInterrupt()}),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[{"signal": "INT"}],
            simulate_send_failure=True,
            expected_state={"write_failed": True},
        )

    def test_completion_in_pdb_state(self):
        """Test requesting tab completions at a (Pdb) prompt."""
        # GIVEN
        incoming = [
            ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
            (
                "user",
                {
                    "prompt": "(Pdb) ",
                    "completion_request": {
                        "line": "    mod._",
                        "begidx": 8,
                        "endidx": 9,
                    },
                    "input": "print(\n    mod.__name__)",
                },
            ),
            ("server", {"completions": ["__name__", "__file__"]}),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[
                {
                    "complete": {
                        "text": "_",
                        "line": "mod._",
                        "begidx": 4,
                        "endidx": 5,
                    }
                },
                {"reply": "print(\n    mod.__name__)"},
            ],
            expected_completions=["__name__", "__file__"],
            expected_state={"state": "pdb"},
        )

    def test_multiline_completion_in_pdb_state(self):
        """Test requesting tab completions at a (Pdb) continuation prompt."""
        # GIVEN
        incoming = [
            ("server", {"prompt": "(Pdb) ", "state": "pdb"}),
            ("user", {"prompt": "(Pdb) ", "input": "if True:"}),
            (
                "user",
                {
                    "prompt": "...   ",
                    "completion_request": {
                        "line": "    b",
                        "begidx": 4,
                        "endidx": 5,
                    },
                    "input": "    bool()",
                },
            ),
            ("server", {"completions": ["bin", "bool", "bytes"]}),
            ("user", {"prompt": "...   ", "input": ""}),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[
                {
                    "complete": {
                        "text": "b",
                        "line": "! b",
                        "begidx": 2,
                        "endidx": 3,
                    }
                },
                {"reply": "if True:\n    bool()\n"},
            ],
            expected_completions=["bin", "bool", "bytes"],
            expected_state={"state": "pdb"},
        )

    def test_completion_in_interact_state(self):
        """Test requesting tab completions at a >>> prompt."""
        incoming = [
            ("server", {"prompt": ">>> ", "state": "interact"}),
            (
                "user",
                {
                    "prompt": ">>> ",
                    "completion_request": {
                        "line": "    mod.__",
                        "begidx": 8,
                        "endidx": 10,
                    },
                    "input": "print(\n    mod.__name__)",
                },
            ),
            ("server", {"completions": ["__name__", "__file__"]}),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[
                {
                    "complete": {
                        "text": "__",
                        "line": "mod.__",
                        "begidx": 4,
                        "endidx": 6,
                    }
                },
                {"reply": "print(\n    mod.__name__)"},
            ],
            expected_completions=["__name__", "__file__"],
            expected_state={"state": "interact"},
        )

    def test_completion_in_unknown_state(self):
        """Test requesting tab completions at an unrecognized prompt."""
        incoming = [
            ("server", {"command_list": ["p"]}),
            ("server", {"prompt": "Do it? ", "state": "confirm"}),
            (
                "user",
                {
                    "prompt": "Do it? ",
                    "completion_request": {
                        "line": "_",
                        "begidx": 0,
                        "endidx": 1,
                    },
                    "input": "__name__",
                },
            ),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[
                {"reply": "__name__"},
            ],
            expected_state={"state": "dumb"},
        )

    def test_write_failure_during_completion(self):
        """Test failing to write to the socket to request tab completions."""
        incoming = [
            ("server", {"prompt": ">>> ", "state": "interact"}),
            (
                "user",
                {
                    "prompt": ">>> ",
                    "completion_request": {
                        "line": "xy",
                        "begidx": 0,
                        "endidx": 2,
                    },
                    "input": "xyz",
                },
            ),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[
                {
                    "complete": {
                        "text": "xy",
                        "line": "xy",
                        "begidx": 0,
                        "endidx": 2,
                    }
                },
                {"reply": "xyz"},
            ],
            simulate_send_failure=True,
            expected_completions=[],
            expected_state={"state": "interact", "write_failed": True},
        )

    def test_read_failure_during_completion(self):
        """Test failing to read tab completions from the socket."""
        incoming = [
            ("server", {"prompt": ">>> ", "state": "interact"}),
            (
                "user",
                {
                    "prompt": ">>> ",
                    "completion_request": {
                        "line": "xy",
                        "begidx": 0,
                        "endidx": 2,
                    },
                    "input": "xyz",
                },
            ),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[
                {
                    "complete": {
                        "text": "xy",
                        "line": "xy",
                        "begidx": 0,
                        "endidx": 2,
                    }
                },
                {"reply": "xyz"},
            ],
            expected_completions=[],
            expected_state={"state": "interact"},
        )

    def test_reading_invalid_json_during_completion(self):
        """Test receiving invalid JSON when getting tab completions."""
        incoming = [
            ("server", {"prompt": ">>> ", "state": "interact"}),
            (
                "user",
                {
                    "prompt": ">>> ",
                    "completion_request": {
                        "line": "xy",
                        "begidx": 0,
                        "endidx": 2,
                    },
                    "input": "xyz",
                },
            ),
            ("server", b'{"completions": '),
            ("user", {"prompt": ">>> ", "input": "xyz"}),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[
                {
                    "complete": {
                        "text": "xy",
                        "line": "xy",
                        "begidx": 0,
                        "endidx": 2,
                    }
                },
                {"reply": "xyz"},
            ],
            expected_stdout_substring="*** json.decoder.JSONDecodeError",
            expected_completions=[],
            expected_state={"state": "interact"},
        )

    def test_reading_empty_json_during_completion(self):
        """Test receiving an empty JSON object when getting tab completions."""
        incoming = [
            ("server", {"prompt": ">>> ", "state": "interact"}),
            (
                "user",
                {
                    "prompt": ">>> ",
                    "completion_request": {
                        "line": "xy",
                        "begidx": 0,
                        "endidx": 2,
                    },
                    "input": "xyz",
                },
            ),
            ("server", {}),
            ("user", {"prompt": ">>> ", "input": "xyz"}),
        ]
        self.do_test(
            incoming=incoming,
            expected_outgoing=[
                {
                    "complete": {
                        "text": "xy",
                        "line": "xy",
                        "begidx": 0,
                        "endidx": 2,
                    }
                },
                {"reply": "xyz"},
            ],
            expected_stdout=(
                "*** RuntimeError: Failed to get valid completions."
                " Got: {}\n"
            ),
            expected_completions=[],
            expected_state={"state": "interact"},
        )


class RemotePdbTestCase(unittest.TestCase):
    """Tests for the _PdbServer class."""

    def setUp(self):
        self.sockfile = MockSocketFile()
        self.pdb = _PdbServer(self.sockfile)

        # Mock some Bdb attributes that are lazily created when tracing starts
        self.pdb.botframe = None
        self.pdb.quitting = False

        # Create a frame for testing
        self.test_globals = {'a': 1, 'b': 2, '__pdb_convenience_variables': {'x': 100}}
        self.test_locals = {'c': 3, 'd': 4}

        # Create a simple test frame
        frame_info = unittest.mock.Mock()
        frame_info.f_globals = self.test_globals
        frame_info.f_locals = self.test_locals
        frame_info.f_lineno = 42
        frame_info.f_code = unittest.mock.Mock()
        frame_info.f_code.co_filename = "test_file.py"
        frame_info.f_code.co_name = "test_function"

        self.pdb.curframe = frame_info

    def test_message_and_error(self):
        """Test message and error methods send correct JSON."""
        self.pdb.message("Test message")
        self.pdb.error("Test error")

        outputs = self.sockfile.get_output()
        self.assertEqual(len(outputs), 2)
        self.assertEqual(outputs[0], {"message": "Test message\n", "type": "info"})
        self.assertEqual(outputs[1], {"message": "Test error", "type": "error"})

    def test_read_command(self):
        """Test reading commands from the socket."""
        # Add test input
        self.sockfile.add_input({"reply": "help"})

        # Read the command
        cmd = self.pdb._read_reply()
        self.assertEqual(cmd, "help")

    def test_read_command_EOF(self):
        """Test reading EOF command."""
        # Simulate socket closure
        self.pdb._write_failed = True
        with self.assertRaises(EOFError):
            self.pdb._read_reply()

    def test_completion(self):
        """Test handling completion requests."""
        # Mock completenames to return specific values
        with unittest.mock.patch.object(self.pdb, 'completenames',
                                       return_value=["continue", "clear"]):

            # Add a completion request
            self.sockfile.add_input({
                "complete": {
                    "text": "c",
                    "line": "c",
                    "begidx": 0,
                    "endidx": 1
                }
            })

            # Add a regular command to break the loop
            self.sockfile.add_input({"reply": "help"})

            # Read command - this should process the completion request first
            cmd = self.pdb._read_reply()

            # Verify completion response was sent
            outputs = self.sockfile.get_output()
            self.assertEqual(len(outputs), 1)
            self.assertEqual(outputs[0], {"completions": ["continue", "clear"]})

            # The actual command should be returned
            self.assertEqual(cmd, "help")

    def test_do_help(self):
        """Test that do_help sends the help message."""
        self.pdb.do_help("break")

        outputs = self.sockfile.get_output()
        self.assertEqual(len(outputs), 1)
        self.assertEqual(outputs[0], {"help": "break"})

    def test_interact_mode(self):
        """Test interaction mode setup and execution."""
        # First set up interact mode
        self.pdb.do_interact("")

        # Verify _interact_state is properly initialized
        self.assertIsNotNone(self.pdb._interact_state)
        self.assertIsInstance(self.pdb._interact_state, dict)

        # Test running code in interact mode
        with unittest.mock.patch.object(self.pdb, '_error_exc') as mock_error:
            self.pdb._run_in_python_repl("print('test')")
            mock_error.assert_not_called()

            # Test with syntax error
            self.pdb._run_in_python_repl("if:")
            mock_error.assert_called_once()

    def test_registering_commands(self):
        """Test registering breakpoint commands."""
        # Mock get_bpbynumber
        with unittest.mock.patch.object(self.pdb, 'get_bpbynumber'):
            # Queue up some input to send
            self.sockfile.add_input({"reply": "commands 1"})
            self.sockfile.add_input({"reply": "silent"})
            self.sockfile.add_input({"reply": "print('hi')"})
            self.sockfile.add_input({"reply": "end"})
            self.sockfile.add_input({"signal": "EOF"})

            # Run the PDB command loop
            self.pdb.cmdloop()

            outputs = self.sockfile.get_output()
            self.assertIn('command_list', outputs[0])
            self.assertEqual(outputs[1], {"prompt": "(Pdb) ", "state": "pdb"})
            self.assertEqual(outputs[2], {"prompt": "(com) ", "state": "commands"})
            self.assertEqual(outputs[3], {"prompt": "(com) ", "state": "commands"})
            self.assertEqual(outputs[4], {"prompt": "(com) ", "state": "commands"})
            self.assertEqual(outputs[5], {"prompt": "(Pdb) ", "state": "pdb"})
            self.assertEqual(outputs[6], {"message": "\n", "type": "info"})
            self.assertEqual(len(outputs), 7)

            self.assertEqual(
                self.pdb.commands[1],
                ["_pdbcmd_silence_frame_status", "print('hi')"],
            )

    def test_detach(self):
        """Test the detach method."""
        with unittest.mock.patch.object(self.sockfile, 'close') as mock_close:
            self.pdb.detach()
            mock_close.assert_called_once()
            self.assertFalse(self.pdb.quitting)

    def test_cmdloop(self):
        """Test the command loop with various commands."""
        # Mock onecmd to track command execution
        with unittest.mock.patch.object(self.pdb, 'onecmd', return_value=False) as mock_onecmd:
            # Add commands to the queue
            self.pdb.cmdqueue = ['help', 'list']

            # Add a command from the socket for when cmdqueue is empty
            self.sockfile.add_input({"reply": "next"})

            # Add a second command to break the loop
            self.sockfile.add_input({"reply": "quit"})

            # Configure onecmd to exit the loop on "quit"
            def side_effect(line):
                return line == 'quit'
            mock_onecmd.side_effect = side_effect

            # Run the command loop
            self.pdb.quitting = False # Set this by hand because we don't want to really call set_trace()
            self.pdb.cmdloop()

            # Should have processed 4 commands: 2 from cmdqueue, 2 from socket
            self.assertEqual(mock_onecmd.call_count, 4)
            mock_onecmd.assert_any_call('help')
            mock_onecmd.assert_any_call('list')
            mock_onecmd.assert_any_call('next')
            mock_onecmd.assert_any_call('quit')

            # Check if prompt was sent to client
            outputs = self.sockfile.get_output()
            prompts = [o for o in outputs if 'prompt' in o]
            self.assertEqual(len(prompts), 2)  # Should have sent 2 prompts


@requires_subprocess()
@unittest.skipIf(is_wasi, "WASI does not support TCP sockets")
class PdbConnectTestCase(unittest.TestCase):
    """Tests for the _connect mechanism using direct socket communication."""

    def setUp(self):
        # Create a server socket that will wait for the debugger to connect
        self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_sock.bind(('127.0.0.1', 0))  # Let OS assign port
        self.server_sock.listen(1)
        self.port = self.server_sock.getsockname()[1]

    def _create_script(self, script=None):
        # Create a file for subprocess script
        if script is None:
            script = textwrap.dedent(
                f"""
                import pdb
                import sys
                import time

                def foo():
                    x = 42
                    return bar()

                def bar():
                    return 42

                def connect_to_debugger():
                    # Create a frame to debug
                    def dummy_function():
                        x = 42
                        # Call connect to establish connection
                        # with the test server
                        frame = sys._getframe()  # Get the current frame
                        pdb._connect(
                            host='127.0.0.1',
                            port={self.port},
                            frame=frame,
                            commands="",
                            version=pdb._PdbServer.protocol_version(),
                            signal_raising_thread=False,
                            colorize=False,
                        )
                        return x  # This line won't be reached in debugging

                    return dummy_function()

                result = connect_to_debugger()
                foo()
                print(f"Function returned: {{result}}")
                """)

        self.script_path = TESTFN + "_connect_test.py"
        with open(self.script_path, 'w') as f:
            f.write(script)

    def tearDown(self):
        self.server_sock.close()
        try:
            unlink(self.script_path)
        except OSError:
            pass

    def _connect_and_get_client_file(self):
        """Helper to start subprocess and get connected client file."""
        # Start the subprocess that will connect to our socket
        process = subprocess.Popen(
            [sys.executable, self.script_path],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )

        # Accept the connection from the subprocess
        client_sock, _ = self.server_sock.accept()
        client_file = client_sock.makefile('rwb')
        self.addCleanup(client_file.close)
        self.addCleanup(client_sock.close)

        return process, client_file

    def _read_until_prompt(self, client_file):
        """Helper to read messages until a prompt is received."""
        messages = []
        while True:
            data = client_file.readline()
            if not data:
                break
            msg = json.loads(data.decode())
            messages.append(msg)
            if 'prompt' in msg:
                break
        return messages

    def _send_command(self, client_file, command):
        """Helper to send a command to the debugger."""
        client_file.write(json.dumps({"reply": command}).encode() + b"\n")
        client_file.flush()

    def test_connect_and_basic_commands(self):
        """Test connecting to a remote debugger and sending basic commands."""
        self._create_script()
        process, client_file = self._connect_and_get_client_file()

        with kill_on_error(process):
            # We should receive initial data from the debugger
            data = client_file.readline()
            initial_data = json.loads(data.decode())
            self.assertIn('message', initial_data)
            self.assertIn('pdb._connect', initial_data['message'])

            # First, look for command_list message
            data = client_file.readline()
            command_list = json.loads(data.decode())
            self.assertIn('command_list', command_list)

            # Then, look for the first prompt
            data = client_file.readline()
            prompt_data = json.loads(data.decode())
            self.assertIn('prompt', prompt_data)
            self.assertEqual(prompt_data['state'], 'pdb')

            # Send 'bt' (backtrace) command
            self._send_command(client_file, "bt")

            # Check for response - we should get some stack frames
            messages = self._read_until_prompt(client_file)

            # Extract text messages containing stack info
            text_msg = [msg['message'] for msg in messages
                    if 'message' in msg and 'connect_to_debugger' in msg['message']]
            got_stack_info = bool(text_msg)

            expected_stacks = [
                "<module>",
                "connect_to_debugger",
            ]

            for stack, msg in zip(expected_stacks, text_msg, strict=True):
                self.assertIn(stack, msg)

            self.assertTrue(got_stack_info, "Should have received stack trace information")

            # Send 'c' (continue) command to let the program finish
            self._send_command(client_file, "c")

            # Wait for process to finish
            stdout, _ = process.communicate(timeout=SHORT_TIMEOUT)

            # Check if we got the expected output
            self.assertIn("Function returned: 42", stdout)
            self.assertEqual(process.returncode, 0)

    def test_breakpoints(self):
        """Test setting and hitting breakpoints."""
        self._create_script()
        process, client_file = self._connect_and_get_client_file()
        with kill_on_error(process):
            # Skip initial messages until we get to the prompt
            self._read_until_prompt(client_file)

            # Set a breakpoint at the return statement
            self._send_command(client_file, "break bar")
            messages = self._read_until_prompt(client_file)
            bp_msg = next(msg['message'] for msg in messages if 'message' in msg)
            self.assertIn("Breakpoint", bp_msg)

            # Continue execution until breakpoint
            self._send_command(client_file, "c")
            messages = self._read_until_prompt(client_file)

            # Verify we hit the breakpoint
            hit_msg = next(msg['message'] for msg in messages if 'message' in msg)
            self.assertIn("bar()", hit_msg)

            # Check breakpoint list
            self._send_command(client_file, "b")
            messages = self._read_until_prompt(client_file)
            list_msg = next(msg['message'] for msg in reversed(messages) if 'message' in msg)
            self.assertIn("1   breakpoint", list_msg)
            self.assertIn("breakpoint already hit 1 time", list_msg)

            # Clear breakpoint
            self._send_command(client_file, "clear 1")
            messages = self._read_until_prompt(client_file)
            clear_msg = next(msg['message'] for msg in reversed(messages) if 'message' in msg)
            self.assertIn("Deleted breakpoint", clear_msg)

            # Continue to end
            self._send_command(client_file, "c")
            stdout, _ = process.communicate(timeout=SHORT_TIMEOUT)

            self.assertIn("Function returned: 42", stdout)
            self.assertEqual(process.returncode, 0)

    def test_keyboard_interrupt(self):
        """Test that sending keyboard interrupt breaks into pdb."""

        script = textwrap.dedent(f"""
            import time
            import sys
            import socket
            import pdb
            def bar():
                frame = sys._getframe()  # Get the current frame
                pdb._connect(
                    host='127.0.0.1',
                    port={self.port},
                    frame=frame,
                    commands="",
                    version=pdb._PdbServer.protocol_version(),
                    signal_raising_thread=True,
                    colorize=False,
                )
                print("Connected to debugger")
                iterations = 50
                while iterations > 0:
                    print("Iteration", iterations, flush=True)
                    time.sleep(0.2)
                    iterations -= 1
                return 42

            if __name__ == "__main__":
                print("Function returned:", bar())
            """)
        self._create_script(script=script)
        process, client_file = self._connect_and_get_client_file()

        # Accept a 2nd connection from the subprocess to tell it about signals
        signal_sock, _ = self.server_sock.accept()
        self.addCleanup(signal_sock.close)

        with kill_on_error(process):
            # Skip initial messages until we get to the prompt
            self._read_until_prompt(client_file)

            # Continue execution
            self._send_command(client_file, "c")

            # Confirm that the remote is already in the while loop. We know
            # it's in bar() and we can exit the loop immediately by setting
            # iterations to 0.
            while line := process.stdout.readline():
                if line.startswith("Iteration"):
                    break

            # Inject a script to interrupt the running process
            signal_sock.sendall(signal.SIGINT.to_bytes())
            messages = self._read_until_prompt(client_file)

            # Verify we got the keyboard interrupt message.
            interrupt_msgs = [msg['message'] for msg in messages if 'message' in msg]
            expected_msg = [msg for msg in interrupt_msgs if "bar()" in msg]
            self.assertGreater(len(expected_msg), 0)

            # Continue to end as fast as we can
            self._send_command(client_file, "iterations = 0")
            self._send_command(client_file, "c")
            stdout, _ = process.communicate(timeout=SHORT_TIMEOUT)
            self.assertIn("Function returned: 42", stdout)
            self.assertEqual(process.returncode, 0)

    def test_handle_eof(self):
        """Test that EOF signal properly exits the debugger."""
        self._create_script()
        process, client_file = self._connect_and_get_client_file()

        with kill_on_error(process):
            # Skip initial messages until we get to the prompt
            self._read_until_prompt(client_file)

            # Send EOF signal to exit the debugger
            client_file.write(json.dumps({"signal": "EOF"}).encode() + b"\n")
            client_file.flush()

            # The process should complete normally after receiving EOF
            stdout, stderr = process.communicate(timeout=SHORT_TIMEOUT)

            # Verify process completed correctly
            self.assertIn("Function returned: 42", stdout)
            self.assertEqual(process.returncode, 0)
            self.assertEqual(stderr, "")

    def test_protocol_version(self):
        """Test that incompatible protocol versions are properly detected."""
        # Create a script using an incompatible protocol version
        script = textwrap.dedent(f'''
            import sys
            import pdb

            def run_test():
                frame = sys._getframe()

                # Use a fake version number that's definitely incompatible
                fake_version = 0x01010101 # A fake version that doesn't match any real Python version

                # Connect with the wrong version
                pdb._connect(
                    host='127.0.0.1',
                    port={self.port},
                    frame=frame,
                    commands="",
                    version=fake_version,
                    signal_raising_thread=False,
                    colorize=False,
                )

                # This should print if the debugger detaches correctly
                print("Debugger properly detected version mismatch")
                return True

            if __name__ == "__main__":
                print("Test result:", run_test())
            ''')
        self._create_script(script=script)
        process, client_file = self._connect_and_get_client_file()

        with kill_on_error(process):
            # First message should be an error about protocol version mismatch
            data = client_file.readline()
            message = json.loads(data.decode())

            self.assertIn('message', message)
            self.assertEqual(message['type'], 'error')
            self.assertIn('incompatible', message['message'])
            self.assertIn('protocol version', message['message'])

            # The process should complete normally
            stdout, stderr = process.communicate(timeout=SHORT_TIMEOUT)

            # Verify the process completed successfully
            self.assertIn("Test result: True", stdout)
            self.assertIn("Debugger properly detected version mismatch", stdout)
            self.assertEqual(process.returncode, 0)

    def test_help_system(self):
        """Test that the help system properly sends help text to the client."""
        self._create_script()
        process, client_file = self._connect_and_get_client_file()

        with kill_on_error(process):
            # Skip initial messages until we get to the prompt
            self._read_until_prompt(client_file)

            # Request help for different commands
            help_commands = ["help", "help break", "help continue", "help pdb"]

            for cmd in help_commands:
                self._send_command(client_file, cmd)

                # Look for help message
                data = client_file.readline()
                message = json.loads(data.decode())

                self.assertIn('help', message)

                if cmd == "help":
                    # Should just contain the command itself
                    self.assertEqual(message['help'], "")
                else:
                    # Should contain the specific command we asked for help with
                    command = cmd.split()[1]
                    self.assertEqual(message['help'], command)

                # Skip to the next prompt
                self._read_until_prompt(client_file)

            # Continue execution to finish the program
            self._send_command(client_file, "c")

            stdout, stderr = process.communicate(timeout=SHORT_TIMEOUT)
            self.assertIn("Function returned: 42", stdout)
            self.assertEqual(process.returncode, 0)

    def test_multi_line_commands(self):
        """Test that multi-line commands work properly over remote connection."""
        self._create_script()
        process, client_file = self._connect_and_get_client_file()

        with kill_on_error(process):
            # Skip initial messages until we get to the prompt
            self._read_until_prompt(client_file)

            # Send a multi-line command
            multi_line_commands = [
                # Define a function
                "def test_func():\n    return 42",

                # For loop
                "for i in range(3):\n    print(i)",

                # If statement
                "if True:\n    x = 42\nelse:\n    x = 0",

                # Try/except
                "try:\n    result = 10/2\n    print(result)\nexcept ZeroDivisionError:\n    print('Error')",

                # Class definition
                "class TestClass:\n    def __init__(self):\n        self.value = 100\n    def get_value(self):\n        return self.value"
            ]

            for cmd in multi_line_commands:
                self._send_command(client_file, cmd)
                self._read_until_prompt(client_file)

            # Test executing the defined function
            self._send_command(client_file, "test_func()")
            messages = self._read_until_prompt(client_file)

            # Find the result message
            result_msg = next(msg['message'] for msg in messages if 'message' in msg)
            self.assertIn("42", result_msg)

            # Test creating an instance of the defined class
            self._send_command(client_file, "obj = TestClass()")
            self._read_until_prompt(client_file)

            # Test calling a method on the instance
            self._send_command(client_file, "obj.get_value()")
            messages = self._read_until_prompt(client_file)

            # Find the result message
            result_msg = next(msg['message'] for msg in messages if 'message' in msg)
            self.assertIn("100", result_msg)

            # Continue execution to finish
            self._send_command(client_file, "c")

            stdout, stderr = process.communicate(timeout=SHORT_TIMEOUT)
            self.assertIn("Function returned: 42", stdout)
            self.assertEqual(process.returncode, 0)

    def test_exec_in_closure_result_uses_pdb_stdout(self):
        """
        Expression results executed via _exec_in_closure() should be written
        to the debugger output stream (pdb stdout), not to sys.stdout.
        """
        self._create_script()
        process, client_file = self._connect_and_get_client_file()

        with kill_on_error(process):
            self._read_until_prompt(client_file)

            self._send_command(client_file, "(lambda: 123)()")
            messages = self._read_until_prompt(client_file)
            result_msg = "".join(msg.get("message", "") for msg in messages)
            self.assertIn("123", result_msg)

            self._send_command(client_file, "sum(i for i in (1, 2, 3))")
            messages = self._read_until_prompt(client_file)
            result_msg = "".join(msg.get("message", "") for msg in messages)
            self.assertIn("6", result_msg)

            self._send_command(client_file, "c")
            stdout, _ = process.communicate(timeout=SHORT_TIMEOUT)

            self.assertNotIn("\n123\n", stdout)
            self.assertNotIn("\n6\n", stdout)
            self.assertEqual(process.returncode, 0)


def _supports_remote_attaching():
    PROCESS_VM_READV_SUPPORTED = False

    try:
        from _remote_debugging import PROCESS_VM_READV_SUPPORTED
    except ImportError:
        pass

    return PROCESS_VM_READV_SUPPORTED


@unittest.skipIf(not sys.is_remote_debug_enabled(), "Remote debugging is not enabled")
@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux" and sys.platform != "win32",
                    "Test only runs on Linux, Windows and MacOS")
@unittest.skipIf(sys.platform == "linux" and not _supports_remote_attaching(),
                    "Testing on Linux requires process_vm_readv support")
@cpython_only
@requires_subprocess()
class PdbAttachTestCase(unittest.TestCase):
    def setUp(self):
        # Create a server socket that will wait for the debugger to connect
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.bind(('127.0.0.1', 0))  # Let OS assign port
        self.sock.listen(1)
        self.port = self.sock.getsockname()[1]
        self._create_script()

    def _create_script(self, script=None):
        # Create a file for subprocess script
        script = textwrap.dedent(
            f"""
            import socket
            import time

            def foo():
                return bar()

            def bar():
                return baz()

            def baz():
                x = 1
                # Trigger attach
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.connect(('127.0.0.1', {self.port}))
                sock.close()
                count = 0
                while x == 1 and count < 100:
                    count += 1
                    time.sleep(0.1)
                return x

            result = foo()
            print(f"Function returned: {{result}}")
            """
        )

        self.script_path = TESTFN + "_connect_test.py"
        with open(self.script_path, 'w') as f:
            f.write(script)

    def tearDown(self):
        self.sock.close()
        try:
            unlink(self.script_path)
        except OSError:
            pass

    def do_integration_test(self, client_stdin):
        process = subprocess.Popen(
            [sys.executable, self.script_path],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )
        self.addCleanup(process.stdout.close)
        self.addCleanup(process.stderr.close)

        # Wait for the process to reach our attachment point
        self.sock.settimeout(10)
        conn, _ = self.sock.accept()
        conn.close()

        client_stdin = io.StringIO(client_stdin)
        client_stdout = io.StringIO()
        client_stderr = io.StringIO()

        self.addCleanup(client_stdin.close)
        self.addCleanup(client_stdout.close)
        self.addCleanup(client_stderr.close)
        self.addCleanup(process.wait)

        with (
            unittest.mock.patch("sys.stdin", client_stdin),
            redirect_stdout(client_stdout),
            redirect_stderr(client_stderr),
            unittest.mock.patch("sys.argv", ["pdb", "-p", str(process.pid)]),
            unittest.mock.patch(
                "pdb.exit_with_permission_help_text", side_effect=PermissionError
            ),
        ):
            try:
                pdb.main()
            except PermissionError:
                self.skipTest("Insufficient permissions for remote execution")

        process.wait()
        server_stdout = process.stdout.read()
        server_stderr = process.stderr.read()

        if process.returncode != 0:
            print("server failed")
            print(f"server stdout:\n{server_stdout}")
            print(f"server stderr:\n{server_stderr}")

        self.assertEqual(process.returncode, 0)
        return {
            "client": {
                "stdout": client_stdout.getvalue(),
                "stderr": client_stderr.getvalue(),
            },
            "server": {
                "stdout": server_stdout,
                "stderr": server_stderr,
            },
        }

    def test_attach_to_process_without_colors(self):
        with force_color(False):
            output = self.do_integration_test("ll\nx=42\n")
        self.assertEqual(output["client"]["stderr"], "")
        self.assertEqual(output["server"]["stderr"], "")

        self.assertEqual(output["server"]["stdout"], "Function returned: 42\n")
        self.assertIn("while x == 1", output["client"]["stdout"])
        self.assertNotIn("\x1b", output["client"]["stdout"])

    def test_attach_to_process_with_colors(self):
        with force_color(True):
            output = self.do_integration_test("ll\nx=42\n")
        self.assertEqual(output["client"]["stderr"], "")
        self.assertEqual(output["server"]["stderr"], "")

        self.assertEqual(output["server"]["stdout"], "Function returned: 42\n")
        self.assertIn("\x1b", output["client"]["stdout"])
        self.assertNotIn("while x == 1", output["client"]["stdout"])
        self.assertIn("while x == 1", re.sub("\x1b[^m]*m", "", output["client"]["stdout"]))

    def test_attach_to_non_existent_process(self):
        with force_color(False):
            result = subprocess.run([sys.executable, "-m", "pdb", "-p", "999999"], text=True, capture_output=True)
        self.assertNotEqual(result.returncode, 0)
        if sys.platform == "darwin":
            # On MacOS, attaching to a non-existent process gives PermissionError
            error = "The specified process cannot be attached to due to insufficient permissions"
        else:
            error = "Cannot attach to pid 999999, please make sure that the process exists"
        self.assertIn(error, result.stdout)


if __name__ == "__main__":
    unittest.main()
