import termios
+TYPE_CHECKING = False
+
+if TYPE_CHECKING:
+ from typing import cast
+else:
+ cast = lambda typ, val: val
+
+
class TermState:
- def __init__(self, tuples):
- (
- self.iflag,
- self.oflag,
- self.cflag,
- self.lflag,
- self.ispeed,
- self.ospeed,
- self.cc,
- ) = tuples
+ def __init__(self, attrs: list[int | list[bytes]]) -> None:
+ self.iflag = cast(int, attrs[0])
+ self.oflag = cast(int, attrs[1])
+ self.cflag = cast(int, attrs[2])
+ self.lflag = cast(int, attrs[3])
+ self.ispeed = cast(int, attrs[4])
+ self.ospeed = cast(int, attrs[5])
+ self.cc = cast(list[bytes], attrs[6])
- def as_list(self):
+ def as_list(self) -> list[int | list[bytes]]:
return [
self.iflag,
self.oflag,
self.cc[:],
]
- def copy(self):
+ def copy(self) -> "TermState":
return self.__class__(self.as_list())
-def tcgetattr(fd):
+def tcgetattr(fd: int) -> TermState:
return TermState(termios.tcgetattr(fd))
-def tcsetattr(fd, when, attrs):
+def tcsetattr(fd: int, when: int, attrs: TermState) -> None:
termios.tcsetattr(fd, when, attrs.as_list())
class Term(TermState):
TS__init__ = TermState.__init__
- def __init__(self, fd=0):
+ def __init__(self, fd: int = 0) -> None:
self.TS__init__(termios.tcgetattr(fd))
self.fd = fd
- self.stack = []
+ self.stack: list[list[int | list[bytes]]] = []
- def save(self):
+ def save(self) -> None:
self.stack.append(self.as_list())
- def set(self, when=termios.TCSANOW):
+ def set(self, when: int = termios.TCSANOW) -> None:
termios.tcsetattr(self.fd, when, self.as_list())
- def restore(self):
+ def restore(self) -> None:
self.TS__init__(self.stack.pop())
self.set()
from . import curses
from .console import Console, Event
-from .fancy_termios import tcgetattr, tcsetattr
+from .fancy_termios import tcgetattr, tcsetattr, TermState
from .trace import trace
from .unix_eventqueue import EventQueue
from .utils import wlen
# types
if TYPE_CHECKING:
- from typing import IO, Literal, overload
+ from typing import AbstractSet, IO, Literal, overload, cast
else:
overload = lambda func: None
+ cast = lambda typ, val: val
class InvalidTerminal(RuntimeError):
- pass
+ def __init__(self, message: str) -> None:
+ super().__init__(errno.EIO, message)
_error = (termios.error, curses.error, InvalidTerminal)
+_error_codes_to_ignore = frozenset([errno.EIO, errno.ENXIO, errno.EPERM])
SIGWINCH_EVENT = "repaint"
def register(self, fd, flag):
self.fd = fd
+
# note: The 'timeout' argument is received as *milliseconds*
def poll(self, timeout: float | None = None) -> list[int]:
if timeout is None:
r, w, e = select.select([self.fd], [], [])
else:
- r, w, e = select.select([self.fd], [], [], timeout/1000)
+ r, w, e = select.select([self.fd], [], [], timeout / 1000)
return r
poll = MinimalPoll # type: ignore[assignment]
and os.getenv("TERM_PROGRAM") == "Apple_Terminal"
)
+ try:
+ self.__input_fd_set(tcgetattr(self.input_fd), ignore=frozenset())
+ except _error as e:
+ raise RuntimeError(f"termios failure ({e.args[1]})")
+
@overload
- def _my_getstr(cap: str, optional: Literal[False] = False) -> bytes: ...
+ def _my_getstr(
+ cap: str, optional: Literal[False] = False
+ ) -> bytes: ...
@overload
def _my_getstr(cap: str, optional: bool) -> bytes | None: ...
self.input_buffer_pos = 0
return ret
-
def change_encoding(self, encoding: str) -> None:
"""
Change the encoding used for I/O operations.
"""
Prepare the console for input/output operations.
"""
+ self.__buffer = []
+
self.__svtermstate = tcgetattr(self.input_fd)
raw = self.__svtermstate.copy()
raw.iflag &= ~(termios.INPCK | termios.ISTRIP | termios.IXON)
raw.lflag |= termios.ISIG
raw.cc[termios.VMIN] = 1
raw.cc[termios.VTIME] = 0
- try:
- tcsetattr(self.input_fd, termios.TCSADRAIN, raw)
- except termios.error as e:
- if e.args[0] != errno.EIO:
- # gh-135329: when running under external programs (like strace),
- # tcsetattr may fail with EIO. We can safely ignore this
- # and continue with default terminal settings.
- raise
+ self.__input_fd_set(raw)
# In macOS terminal we need to deactivate line wrap via ANSI escape code
if self.is_apple_terminal:
self.screen = []
self.height, self.width = self.getheightwidth()
- self.__buffer = []
-
self.posxy = 0, 0
self.__gone_tall = 0
self.__move = self.__move_short
self.__disable_bracketed_paste()
self.__maybe_write_code(self._rmkx)
self.flushoutput()
- try:
- tcsetattr(self.input_fd, termios.TCSADRAIN, self.__svtermstate)
- except termios.error as e:
- if e.args[0] != errno.EIO:
- raise
+ self.__input_fd_set(self.__svtermstate)
if self.is_apple_terminal:
os.write(self.output_fd, b"\033[?7h")
os.write(self.output_fd, self._pad * nchars)
else:
time.sleep(float(delay) / 1000.0)
+
+ def __input_fd_set(
+ self,
+ state: TermState,
+ ignore: AbstractSet[int] = _error_codes_to_ignore,
+ ) -> bool:
+ try:
+ tcsetattr(self.input_fd, termios.TCSADRAIN, state)
+ except termios.error as te:
+ if te.args[0] not in ignore:
+ raise
+ return False
+ else:
+ return True