trace('added event {event}', event=event)
self.events.append(event)
- def push(self, char: int | bytes | str) -> None:
+ def push(self, char: int | bytes) -> None:
"""
Processes a character by updating the buffer and handling special key mappings.
"""
+ assert isinstance(char, (int, bytes))
ord_char = char if isinstance(char, int) else ord(char)
- if ord_char > 255:
- assert isinstance(char, str)
- char = bytes(char.encode(self.encoding, "replace"))
- self.buf.extend(char)
- else:
- char = bytes(bytearray((ord_char,)))
- self.buf.append(ord_char)
+ char = ord_char.to_bytes()
+ self.buf.append(ord_char)
if char in self.keymap:
if self.keymap is self.compiled_keymap:
mock_keymap.compile_keymap.return_value = {"a": "b"}
eq = self.make_eventqueue()
eq.keymap = {b"a": "b"}
- eq.push("a")
+ eq.push(b"a")
mock_keymap.compile_keymap.assert_called()
self.assertEqual(eq.events[0].evt, "key")
self.assertEqual(eq.events[0].data, "b")
mock_keymap.compile_keymap.return_value = {"a": "b"}
eq = self.make_eventqueue()
eq.keymap = {b"c": "d"}
- eq.push("a")
+ eq.push(b"a")
mock_keymap.compile_keymap.assert_called()
self.assertEqual(eq.events[0].evt, "key")
self.assertEqual(eq.events[0].data, "a")
mock_keymap.compile_keymap.return_value = {"a": "b"}
eq = self.make_eventqueue()
eq.keymap = {b"a": {b"b": "c"}}
- eq.push("a")
+ eq.push(b"a")
mock_keymap.compile_keymap.assert_called()
self.assertTrue(eq.empty())
- eq.push("b")
+ eq.push(b"b")
self.assertEqual(eq.events[0].evt, "key")
self.assertEqual(eq.events[0].data, "c")
- eq.push("d")
+ eq.push(b"d")
self.assertEqual(eq.events[1].evt, "key")
self.assertEqual(eq.events[1].data, "d")
mock_keymap.compile_keymap.return_value = {"a": "b"}
eq = self.make_eventqueue()
eq.keymap = {b"a": {b"b": "c"}}
- eq.push("a")
+ eq.push(b"a")
mock_keymap.compile_keymap.assert_called()
self.assertTrue(eq.empty())
eq.flush_buf()
- eq.push("\033")
+ eq.push(b"\033")
self.assertEqual(eq.events[0].evt, "key")
self.assertEqual(eq.events[0].data, "\033")
- eq.push("b")
+ eq.push(b"b")
self.assertEqual(eq.events[1].evt, "key")
self.assertEqual(eq.events[1].data, "b")
def test_push_special_key(self):
eq = self.make_eventqueue()
eq.keymap = {}
- eq.push("\x1b")
- eq.push("[")
- eq.push("A")
+ eq.push(b"\x1b")
+ eq.push(b"[")
+ eq.push(b"A")
self.assertEqual(eq.events[0].evt, "key")
self.assertEqual(eq.events[0].data, "\x1b")
def test_push_unrecognized_escape_sequence(self):
eq = self.make_eventqueue()
eq.keymap = {}
- eq.push("\x1b")
- eq.push("[")
- eq.push("Z")
+ eq.push(b"\x1b")
+ eq.push(b"[")
+ eq.push(b"Z")
self.assertEqual(len(eq.events), 3)
self.assertEqual(eq.events[0].evt, "key")
self.assertEqual(eq.events[0].data, "\x1b")
self.assertEqual(eq.events[2].evt, "key")
self.assertEqual(eq.events[2].data, "Z")
- def test_push_unicode_character(self):
+ def test_push_unicode_character_as_str(self):
eq = self.make_eventqueue()
eq.keymap = {}
- eq.push("ч")
- self.assertEqual(eq.events[0].evt, "key")
- self.assertEqual(eq.events[0].data, "ч")
+ with self.assertRaises(AssertionError):
+ eq.push("ч")
+ with self.assertRaises(AssertionError):
+ eq.push("ñ")
+
+ def test_push_unicode_character_two_bytes(self):
+ eq = self.make_eventqueue()
+ eq.keymap = {}
+
+ encoded = "ч".encode(eq.encoding, "replace")
+ self.assertEqual(len(encoded), 2)
+
+ eq.push(encoded[0])
+ e = eq.get()
+ self.assertIsNone(e)
+
+ eq.push(encoded[1])
+ e = eq.get()
+ self.assertEqual(e.evt, "key")
+ self.assertEqual(e.data, "ч")
+
+ def test_push_single_chars_and_unicode_character_as_str(self):
+ eq = self.make_eventqueue()
+ eq.keymap = {}
+
+ def _event(evt, data, raw=None):
+ r = raw if raw is not None else data.encode(eq.encoding)
+ e = Event(evt, data, r)
+ return e
+
+ def _push(keys):
+ for k in keys:
+ eq.push(k)
+
+ self.assertIsInstance("ñ", str)
+
+ # If an exception happens during push, the existing events must be
+ # preserved and we can continue to push.
+ _push(b"b")
+ with self.assertRaises(AssertionError):
+ _push("ñ")
+ _push(b"a")
+
+ self.assertEqual(eq.get(), _event("key", "b"))
+ self.assertEqual(eq.get(), _event("key", "a"))
@unittest.skipIf(support.MS_WINDOWS, "No Unix event queue on Windows")