**/*itertools* @rhettinger
**/*collections* @rhettinger
**/*random* @rhettinger
-**/*queue* @rhettinger
**/*bisect* @rhettinger
**/*heapq* @rhettinger
**/*functools* @rhettinger
--- /dev/null
+# Copyright 2000-2008 Michael Hudson-Doyle <micahel@gmail.com>
+# Armin Rigo
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""
+OS-independent base for an event and VT sequence scanner
+
+See unix_eventqueue and windows_eventqueue for subclasses.
+"""
+
+from collections import deque
+
+from . import keymap
+from .console import Event
+from .trace import trace
+
+class BaseEventQueue:
+ def __init__(self, encoding: str, keymap_dict: dict[bytes, str]) -> None:
+ self.compiled_keymap = keymap.compile_keymap(keymap_dict)
+ self.keymap = self.compiled_keymap
+ trace("keymap {k!r}", k=self.keymap)
+ self.encoding = encoding
+ self.events: deque[Event] = deque()
+ self.buf = bytearray()
+
+ def get(self) -> Event | None:
+ """
+ Retrieves the next event from the queue.
+ """
+ if self.events:
+ return self.events.popleft()
+ else:
+ return None
+
+ def empty(self) -> bool:
+ """
+ Checks if the queue is empty.
+ """
+ return not self.events
+
+ def flush_buf(self) -> bytearray:
+ """
+ Flushes the buffer and returns its contents.
+ """
+ old = self.buf
+ self.buf = bytearray()
+ return old
+
+ def insert(self, event: Event) -> None:
+ """
+ Inserts an event into the queue.
+ """
+ trace('added event {event}', event=event)
+ self.events.append(event)
+
+ def push(self, char: int | bytes) -> None:
+ """
+ Processes a character by updating the buffer and handling special key mappings.
+ """
+ ord_char = char if isinstance(char, int) else ord(char)
+ char = bytes(bytearray((ord_char,)))
+ self.buf.append(ord_char)
+ if char in self.keymap:
+ if self.keymap is self.compiled_keymap:
+ # sanity check, buffer is empty when a special key comes
+ assert len(self.buf) == 1
+ k = self.keymap[char]
+ trace('found map {k!r}', k=k)
+ if isinstance(k, dict):
+ self.keymap = k
+ else:
+ self.insert(Event('key', k, self.flush_buf()))
+ self.keymap = self.compiled_keymap
+
+ elif self.buf and self.buf[0] == 27: # escape
+ # escape sequence not recognized by our keymap: propagate it
+ # outside so that i can be recognized as an M-... key (see also
+ # the docstring in keymap.py
+ trace('unrecognized escape sequence, propagating...')
+ self.keymap = self.compiled_keymap
+ self.insert(Event('key', '\033', bytearray(b'\033')))
+ for _c in self.flush_buf()[1:]:
+ self.push(_c)
+
+ else:
+ try:
+ decoded = bytes(self.buf).decode(self.encoding)
+ except UnicodeError:
+ return
+ else:
+ self.insert(Event('key', decoded, self.flush_buf()))
+ self.keymap = self.compiled_keymap
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-from collections import deque
-
-from . import keymap
-from .console import Event
from . import curses
from .trace import trace
+from .base_eventqueue import BaseEventQueue
from termios import tcgetattr, VERASE
import os
keycodes.update(CTRL_ARROW_KEYCODES)
return keycodes
-class EventQueue:
+class EventQueue(BaseEventQueue):
def __init__(self, fd: int, encoding: str) -> None:
- self.keycodes = get_terminal_keycodes()
+ keycodes = get_terminal_keycodes()
if os.isatty(fd):
backspace = tcgetattr(fd)[6][VERASE]
- self.keycodes[backspace] = "backspace"
- self.compiled_keymap = keymap.compile_keymap(self.keycodes)
- self.keymap = self.compiled_keymap
- trace("keymap {k!r}", k=self.keymap)
- self.encoding = encoding
- self.events: deque[Event] = deque()
- self.buf = bytearray()
-
- def get(self) -> Event | None:
- """
- Retrieves the next event from the queue.
- """
- if self.events:
- return self.events.popleft()
- else:
- return None
-
- def empty(self) -> bool:
- """
- Checks if the queue is empty.
- """
- return not self.events
-
- def flush_buf(self) -> bytearray:
- """
- Flushes the buffer and returns its contents.
- """
- old = self.buf
- self.buf = bytearray()
- return old
-
- def insert(self, event: Event) -> None:
- """
- Inserts an event into the queue.
- """
- trace('added event {event}', event=event)
- self.events.append(event)
-
- def push(self, char: int | bytes) -> None:
- """
- Processes a character by updating the buffer and handling special key mappings.
- """
- ord_char = char if isinstance(char, int) else ord(char)
- char = bytes(bytearray((ord_char,)))
- self.buf.append(ord_char)
- if char in self.keymap:
- if self.keymap is self.compiled_keymap:
- #sanity check, buffer is empty when a special key comes
- assert len(self.buf) == 1
- k = self.keymap[char]
- trace('found map {k!r}', k=k)
- if isinstance(k, dict):
- self.keymap = k
- else:
- self.insert(Event('key', k, self.flush_buf()))
- self.keymap = self.compiled_keymap
-
- elif self.buf and self.buf[0] == 27: # escape
- # escape sequence not recognized by our keymap: propagate it
- # outside so that i can be recognized as an M-... key (see also
- # the docstring in keymap.py
- trace('unrecognized escape sequence, propagating...')
- self.keymap = self.compiled_keymap
- self.insert(Event('key', '\033', bytearray(b'\033')))
- for _c in self.flush_buf()[1:]:
- self.push(_c)
-
- else:
- try:
- decoded = bytes(self.buf).decode(self.encoding)
- except UnicodeError:
- return
- else:
- self.insert(Event('key', decoded, self.flush_buf()))
- self.keymap = self.compiled_keymap
+ keycodes[backspace] = "backspace"
+ BaseEventQueue.__init__(self, encoding, keycodes)
from .console import Event, Console
from .trace import trace
from .utils import wlen
+from .windows_eventqueue import EventQueue
try:
from ctypes import GetLastError, WinDLL, windll, WinError # type: ignore[attr-defined]
0x83: "f20", # VK_F20
}
-# Console escape codes: https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences
+# Virtual terminal output sequences
+# Reference: https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences#output-sequences
+# Check `windows_eventqueue.py` for input sequences
ERASE_IN_LINE = "\x1b[K"
MOVE_LEFT = "\x1b[{}D"
MOVE_RIGHT = "\x1b[{}C"
class _error(Exception):
pass
+def _supports_vt():
+ try:
+ import nt
+ return nt._supports_virtual_terminal()
+ except (ImportError, AttributeError):
+ return False
class WindowsConsole(Console):
def __init__(
):
super().__init__(f_in, f_out, term, encoding)
+ self.__vt_support = _supports_vt()
+
+ if self.__vt_support:
+ trace('console supports virtual terminal')
+
+ # Save original console modes so we can recover on cleanup.
+ original_input_mode = DWORD()
+ GetConsoleMode(InHandle, original_input_mode)
+ trace(f'saved original input mode 0x{original_input_mode.value:x}')
+ self.__original_input_mode = original_input_mode.value
+
SetConsoleMode(
OutHandle,
ENABLE_WRAP_AT_EOL_OUTPUT
| ENABLE_PROCESSED_OUTPUT
| ENABLE_VIRTUAL_TERMINAL_PROCESSING,
)
+
self.screen: list[str] = []
self.width = 80
self.height = 25
self.__offset = 0
- self.event_queue: deque[Event] = deque()
+ self.event_queue = EventQueue(encoding)
try:
self.out = io._WindowsConsoleIO(self.output_fd, "w") # type: ignore[attr-defined]
except ValueError:
def _disable_blinking(self):
self.__write("\x1b[?12l")
+ def _enable_bracketed_paste(self) -> None:
+ self.__write("\x1b[?2004h")
+
+ def _disable_bracketed_paste(self) -> None:
+ self.__write("\x1b[?2004l")
+
def __write(self, text: str) -> None:
if "\x1a" in text:
text = ''.join(["^Z" if x == '\x1a' else x for x in text])
self.__gone_tall = 0
self.__offset = 0
+ if self.__vt_support:
+ SetConsoleMode(InHandle, self.__original_input_mode | ENABLE_VIRTUAL_TERMINAL_INPUT)
+ self._enable_bracketed_paste()
+
def restore(self) -> None:
- pass
+ if self.__vt_support:
+ # Recover to original mode before running REPL
+ self._disable_bracketed_paste()
+ SetConsoleMode(InHandle, self.__original_input_mode)
def _move_relative(self, x: int, y: int) -> None:
"""Moves relative to the current posxy"""
raise ValueError(f"Bad cursor position {x}, {y}")
if y < self.__offset or y >= self.__offset + self.height:
- self.event_queue.insert(0, Event("scroll", ""))
+ self.event_queue.insert(Event("scroll", ""))
else:
self._move_relative(x, y)
self.posxy = x, y
"""Return an Event instance. Returns None if |block| is false
and there is no event pending, otherwise waits for the
completion of an event."""
- if self.event_queue:
- return self.event_queue.pop()
- while True:
+ while self.event_queue.empty():
rec = self._read_input(block)
if rec is None:
return None
key = f"ctrl {key}"
elif key_event.dwControlKeyState & ALT_ACTIVE:
# queue the key, return the meta command
- self.event_queue.insert(0, Event(evt="key", data=key, raw=key))
+ self.event_queue.insert(Event(evt="key", data=key, raw=key))
return Event(evt="key", data="\033") # keymap.py uses this for meta
return Event(evt="key", data=key, raw=key)
if block:
continue
return None
+ elif self.__vt_support:
+ # If virtual terminal is enabled, scanning VT sequences
+ self.event_queue.push(rec.Event.KeyEvent.uChar.UnicodeChar)
+ continue
if key_event.dwControlKeyState & ALT_ACTIVE:
# queue the key, return the meta command
- self.event_queue.insert(0, Event(evt="key", data=key, raw=raw_key))
+ self.event_queue.insert(Event(evt="key", data=key, raw=raw_key))
return Event(evt="key", data="\033") # keymap.py uses this for meta
return Event(evt="key", data=key, raw=raw_key)
+ return self.event_queue.get()
def push_char(self, char: int | bytes) -> None:
"""
MOUSE_EVENT = 0x02
WINDOW_BUFFER_SIZE_EVENT = 0x04
+ENABLE_PROCESSED_INPUT = 0x0001
+ENABLE_LINE_INPUT = 0x0002
+ENABLE_ECHO_INPUT = 0x0004
+ENABLE_MOUSE_INPUT = 0x0010
+ENABLE_INSERT_MODE = 0x0020
+ENABLE_VIRTUAL_TERMINAL_INPUT = 0x0200
+
ENABLE_PROCESSED_OUTPUT = 0x01
ENABLE_WRAP_AT_EOL_OUTPUT = 0x02
ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x04
]
ScrollConsoleScreenBuffer.restype = BOOL
+ GetConsoleMode = _KERNEL32.GetConsoleMode
+ GetConsoleMode.argtypes = [HANDLE, POINTER(DWORD)]
+ GetConsoleMode.restype = BOOL
+
SetConsoleMode = _KERNEL32.SetConsoleMode
SetConsoleMode.argtypes = [HANDLE, DWORD]
SetConsoleMode.restype = BOOL
GetStdHandle = _win_only
GetConsoleScreenBufferInfo = _win_only
ScrollConsoleScreenBuffer = _win_only
+ GetConsoleMode = _win_only
SetConsoleMode = _win_only
ReadConsoleInput = _win_only
GetNumberOfConsoleInputEvents = _win_only
--- /dev/null
+"""
+Windows event and VT sequence scanner
+"""
+
+from .base_eventqueue import BaseEventQueue
+
+
+# Reference: https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences#input-sequences
+VT_MAP: dict[bytes, str] = {
+ b'\x1b[A': 'up',
+ b'\x1b[B': 'down',
+ b'\x1b[C': 'right',
+ b'\x1b[D': 'left',
+ b'\x1b[1;5D': 'ctrl left',
+ b'\x1b[1;5C': 'ctrl right',
+
+ b'\x1b[H': 'home',
+ b'\x1b[F': 'end',
+
+ b'\x7f': 'backspace',
+ b'\x1b[2~': 'insert',
+ b'\x1b[3~': 'delete',
+ b'\x1b[5~': 'page up',
+ b'\x1b[6~': 'page down',
+
+ b'\x1bOP': 'f1',
+ b'\x1bOQ': 'f2',
+ b'\x1bOR': 'f3',
+ b'\x1bOS': 'f4',
+ b'\x1b[15~': 'f5',
+ b'\x1b[17~': 'f6',
+ b'\x1b[18~': 'f7',
+ b'\x1b[19~': 'f8',
+ b'\x1b[20~': 'f9',
+ b'\x1b[21~': 'f10',
+ b'\x1b[23~': 'f11',
+ b'\x1b[24~': 'f12',
+}
+
+class EventQueue(BaseEventQueue):
+ def __init__(self, encoding: str) -> None:
+ BaseEventQueue.__init__(self, encoding, VT_MAP)
import unittest
import sys
from unittest.mock import patch
+from test import support
try:
from _pyrepl.console import Event
- from _pyrepl.unix_eventqueue import EventQueue
+ from _pyrepl import base_eventqueue
except ImportError:
pass
-@unittest.skipIf(sys.platform == "win32", "No Unix event queue on Windows")
-@patch("_pyrepl.curses.tigetstr", lambda x: b"")
-class TestUnixEventQueue(unittest.TestCase):
- def setUp(self):
- self.file = tempfile.TemporaryFile()
+try:
+ from _pyrepl import unix_eventqueue
+except ImportError:
+ pass
- def tearDown(self) -> None:
- self.file.close()
+try:
+ from _pyrepl import windows_eventqueue
+except ImportError:
+ pass
+
+class EventQueueTestBase:
+ """OS-independent mixin"""
+ def make_eventqueue(self) -> base_eventqueue.BaseEventQueue:
+ raise NotImplementedError()
def test_get(self):
- eq = EventQueue(self.file.fileno(), "utf-8")
+ eq = self.make_eventqueue()
event = Event("key", "a", b"a")
eq.insert(event)
self.assertEqual(eq.get(), event)
def test_empty(self):
- eq = EventQueue(self.file.fileno(), "utf-8")
+ eq = self.make_eventqueue()
self.assertTrue(eq.empty())
eq.insert(Event("key", "a", b"a"))
self.assertFalse(eq.empty())
def test_flush_buf(self):
- eq = EventQueue(self.file.fileno(), "utf-8")
+ eq = self.make_eventqueue()
eq.buf.extend(b"test")
self.assertEqual(eq.flush_buf(), b"test")
self.assertEqual(eq.buf, bytearray())
def test_insert(self):
- eq = EventQueue(self.file.fileno(), "utf-8")
+ eq = self.make_eventqueue()
event = Event("key", "a", b"a")
eq.insert(event)
self.assertEqual(eq.events[0], event)
- @patch("_pyrepl.unix_eventqueue.keymap")
+ @patch("_pyrepl.base_eventqueue.keymap")
def test_push_with_key_in_keymap(self, mock_keymap):
mock_keymap.compile_keymap.return_value = {"a": "b"}
- eq = EventQueue(self.file.fileno(), "utf-8")
+ eq = self.make_eventqueue()
eq.keymap = {b"a": "b"}
eq.push("a")
mock_keymap.compile_keymap.assert_called()
self.assertEqual(eq.events[0].evt, "key")
self.assertEqual(eq.events[0].data, "b")
- @patch("_pyrepl.unix_eventqueue.keymap")
+ @patch("_pyrepl.base_eventqueue.keymap")
def test_push_without_key_in_keymap(self, mock_keymap):
mock_keymap.compile_keymap.return_value = {"a": "b"}
- eq = EventQueue(self.file.fileno(), "utf-8")
+ eq = self.make_eventqueue()
eq.keymap = {b"c": "d"}
eq.push("a")
mock_keymap.compile_keymap.assert_called()
self.assertEqual(eq.events[0].evt, "key")
self.assertEqual(eq.events[0].data, "a")
- @patch("_pyrepl.unix_eventqueue.keymap")
+ @patch("_pyrepl.base_eventqueue.keymap")
def test_push_with_keymap_in_keymap(self, mock_keymap):
mock_keymap.compile_keymap.return_value = {"a": "b"}
- eq = EventQueue(self.file.fileno(), "utf-8")
+ eq = self.make_eventqueue()
eq.keymap = {b"a": {b"b": "c"}}
eq.push("a")
mock_keymap.compile_keymap.assert_called()
self.assertEqual(eq.events[1].evt, "key")
self.assertEqual(eq.events[1].data, "d")
- @patch("_pyrepl.unix_eventqueue.keymap")
+ @patch("_pyrepl.base_eventqueue.keymap")
def test_push_with_keymap_in_keymap_and_escape(self, mock_keymap):
mock_keymap.compile_keymap.return_value = {"a": "b"}
- eq = EventQueue(self.file.fileno(), "utf-8")
+ eq = self.make_eventqueue()
eq.keymap = {b"a": {b"b": "c"}}
eq.push("a")
mock_keymap.compile_keymap.assert_called()
self.assertEqual(eq.events[1].data, "b")
def test_push_special_key(self):
- eq = EventQueue(self.file.fileno(), "utf-8")
+ eq = self.make_eventqueue()
eq.keymap = {}
eq.push("\x1b")
eq.push("[")
self.assertEqual(eq.events[0].data, "\x1b")
def test_push_unrecognized_escape_sequence(self):
- eq = EventQueue(self.file.fileno(), "utf-8")
+ eq = self.make_eventqueue()
eq.keymap = {}
eq.push("\x1b")
eq.push("[")
self.assertEqual(eq.events[1].data, "[")
self.assertEqual(eq.events[2].evt, "key")
self.assertEqual(eq.events[2].data, "Z")
+
+
+@unittest.skipIf(support.MS_WINDOWS, "No Unix event queue on Windows")
+class TestUnixEventQueue(EventQueueTestBase, unittest.TestCase):
+ def setUp(self):
+ self.enterContext(patch("_pyrepl.curses.tigetstr", lambda x: b""))
+ self.file = tempfile.TemporaryFile()
+
+ def tearDown(self) -> None:
+ self.file.close()
+
+ def make_eventqueue(self) -> base_eventqueue.BaseEventQueue:
+ return unix_eventqueue.EventQueue(self.file.fileno(), "utf-8")
+
+
+@unittest.skipUnless(support.MS_WINDOWS, "No Windows event queue on Unix")
+class TestWindowsEventQueue(EventQueueTestBase, unittest.TestCase):
+ def make_eventqueue(self) -> base_eventqueue.BaseEventQueue:
+ return windows_eventqueue.EventQueue("utf-8")
--- /dev/null
+Turn on virtual terminal mode and enable bracketed paste in REPL on Windows
+console. (If the terminal does not support bracketed paste, enabling it
+does nothing.)