from __future__ import annotations
import os
import time
+from typing import TYPE_CHECKING
# Categories of actions:
# killing
# finishing
# [completion]
+from .render import RenderedScreen
from .trace import trace
# types
-if False:
+if TYPE_CHECKING:
from .historical_reader import HistoricalReader
else:
r.kill_ring.append(text)
r.pos = start
- r.dirty = True
+ r.invalidate_buffer(start)
class YankCommand(Command):
r.arg = 10 * r.arg - d
else:
r.arg = 10 * r.arg + d
- r.dirty = True
+ r.invalidate_prompt()
class clear_screen(Command):
def do(self) -> None:
r = self.reader
+ trace("command.clear_screen")
r.console.clear()
- r.dirty = True
+ r.invalidate_full()
class refresh(Command):
def do(self) -> None:
- self.reader.dirty = True
+ trace("command.refresh")
+ self.reader.invalidate_full()
class repaint(Command):
def do(self) -> None:
- self.reader.dirty = True
+ trace("command.repaint")
+ self.reader.invalidate_full()
self.reader.console.repaint()
repl = len(r.kill_ring[-1])
r.kill_ring.insert(0, r.kill_ring.pop())
t = r.kill_ring[-1]
+ start = r.pos - repl
b[r.pos - repl : r.pos] = t
r.pos = r.pos - repl + len(t)
- r.dirty = True
+ r.invalidate_buffer(start)
class interrupt(FinishCommand):
r.console.prepare()
r.pos = p
# r.posxy = 0, 0 # XXX this is invalid
- r.dirty = True
- r.console.screen = []
+ r.invalidate_full()
+ trace("command.suspend sync_rendered_screen")
+ r.console.sync_rendered_screen(RenderedScreen.empty(), r.console.posxy)
class up(MotionCommand):
def do(self) -> None:
r = self.reader
text = self.event * r.get_arg()
+ start = r.pos
r.insert(text)
if r.paste_mode:
data = ""
data += ev.data
if data:
r.insert(data)
- r.last_refresh_cache.invalidated = True
+ r.invalidate_buffer(start)
class insert_nl(EditCommand):
del b[s]
b.insert(t, c)
r.pos = t
- r.dirty = True
+ r.invalidate_buffer(s)
class backspace(EditCommand):
def do(self) -> None:
r = self.reader
b = r.buffer
+ changed_from: int | None = None
for i in range(r.get_arg()):
if r.pos > 0:
r.pos -= 1
del b[r.pos]
- r.dirty = True
+ changed_from = r.pos if changed_from is None else min(changed_from, r.pos)
else:
self.reader.error("can't backspace at start")
+ if changed_from is not None:
+ r.invalidate_buffer(changed_from)
class delete(EditCommand):
r.console.finish()
raise EOFError
+ changed_from: int | None = None
for i in range(r.get_arg()):
if r.pos != len(b):
del b[r.pos]
- r.dirty = True
+ changed_from = r.pos if changed_from is None else min(changed_from, r.pos)
else:
self.reader.error("end of buffer")
+ if changed_from is not None:
+ r.invalidate_buffer(changed_from)
class accept(FinishCommand):
with self.reader.suspend():
self.reader.msg = _sitebuiltins._Helper()() # type: ignore[assignment]
+ self.reader.invalidate_prompt()
class invalid_key(Command):
from .pager import get_pager
from site import gethistoryfile
+ # After the pager exits, the screen state is unknown (Unix may
+ # restore via alternate screen, Windows shows pager output).
+ # Clear and force a full redraw at the end for consistency.
+ self.reader.console.clear()
+
history = os.linesep.join(self.reader.history[:])
self.reader.console.restore()
pager = get_pager()
pager(history, gethistoryfile())
self.reader.console.prepare()
- # We need to copy over the state so that it's consistent between
- # console and reader, and console does not overwrite/append stuff
- self.reader.console.screen = self.reader.screen.copy()
- self.reader.console.posxy = self.reader.cxy
+ self.reader.invalidate_full()
class paste_mode(Command):
def do(self) -> None:
self.reader.paste_mode = not self.reader.paste_mode
- self.reader.dirty = True
+ self.reader.invalidate_prompt()
class perform_bracketed_paste(Command):
s=time.time() - start,
)
self.reader.insert(data.replace(done, ""))
- self.reader.last_refresh_cache.invalidated = True
from __future__ import annotations
from dataclasses import dataclass, field
+from typing import TYPE_CHECKING
import re
from . import commands, console, reader
+from .render import RenderLine, ScreenOverlay
from .reader import Reader
# types
Command = commands.Command
-TYPE_CHECKING = False
if TYPE_CHECKING:
- from .types import KeySpec, CommandName, CompletionAction
+ from .types import CommandName, CompletionAction, Keymap, KeySpec
def prefix(wordlist: list[str], j: int = 0) -> str:
r.cmpltn_action = None # consumed
if msg:
r.msg = msg
+ r.cmpltn_message_visible = True
+ r.invalidate_message()
else: # other input since last tab: cancel action
r.cmpltn_action = None
completion = stripcolor(completions[0])
if completions_unchangable and len(completion) == len(stem):
r.msg = "[ sole completion ]"
- r.dirty = True
+ r.cmpltn_message_visible = True
+ r.invalidate_message()
r.insert(completion[len(stem):])
else:
clean_completions = [stripcolor(word) for word in completions]
r.insert(p)
if last_is_completer:
r.cmpltn_menu_visible = True
- r.cmpltn_message_visible = False
r.cmpltn_menu, r.cmpltn_menu_end = build_menu(
r.console, completions, r.cmpltn_menu_end,
r.use_brackets, r.sort_in_column)
- r.dirty = True
+ if r.msg:
+ r.msg = ""
+ r.cmpltn_message_visible = False
+ r.invalidate_message()
+ r.invalidate_overlay()
elif not r.cmpltn_menu_visible:
- r.cmpltn_message_visible = True
if stem + p in clean_completions:
r.msg = "[ complete but not unique ]"
- r.dirty = True
+ r.cmpltn_message_visible = True
+ r.invalidate_message()
else:
r.msg = "[ not unique ]"
- r.dirty = True
+ r.cmpltn_message_visible = True
+ r.invalidate_message()
if r.cmpltn_action:
if r.msg and r.cmpltn_message_visible:
else:
r.msg = r.cmpltn_action[0]
r.cmpltn_message_visible = True
- r.dirty = True
+ r.invalidate_message()
class self_insert(commands.self_insert):
r.cmpltn_menu, r.cmpltn_menu_end = build_menu(
r.console, completions, 0,
r.use_brackets, r.sort_in_column)
+ r.invalidate_overlay()
else:
r.cmpltn_reset()
self.commands[c.__name__] = c
self.commands[c.__name__.replace('_', '-')] = c
- def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]:
+ def collect_keymap(self) -> Keymap:
return super().collect_keymap() + (
(r'\t', 'complete'),)
if not isinstance(cmd, (complete, self_insert)):
self.cmpltn_reset()
- def calc_screen(self) -> list[str]:
- screen = super().calc_screen()
- if self.cmpltn_menu_visible:
- # We display the completions menu below the current prompt
- ly = self.lxy[1] + 1
- screen[ly:ly] = self.cmpltn_menu
- # If we're not in the middle of multiline edit, don't append to screeninfo
- # since that screws up the position calculation in pos2xy function.
- # This is a hack to prevent the cursor jumping
- # into the completions menu when pressing left or down arrow.
- if self.pos != len(self.buffer):
- self.screeninfo[ly:ly] = [(0, [])]*len(self.cmpltn_menu)
- return screen
+ def get_screen_overlays(self) -> tuple[ScreenOverlay, ...]:
+ if not self.cmpltn_menu_visible:
+ return ()
+ return (
+ ScreenOverlay(
+ self.lxy[1] + 1,
+ tuple(RenderLine.from_rendered_text(line) for line in self.cmpltn_menu),
+ insert=True,
+ ),
+ )
def finish(self) -> None:
super().finish()
self.cmpltn_reset()
def cmpltn_reset(self) -> None:
+ if getattr(self, "cmpltn_menu_visible", False):
+ self.invalidate_overlay()
self.cmpltn_menu = []
self.cmpltn_menu_visible = False
self.cmpltn_message_visible = False
from __future__ import annotations
+import os
import _colorize
from abc import ABC, abstractmethod
import ast
import code
import linecache
-from dataclasses import dataclass, field
-import os.path
+from dataclasses import dataclass
import re
import sys
+from typing import TYPE_CHECKING
-
-TYPE_CHECKING = False
+from .render import RenderedScreen
+from .trace import trace
if TYPE_CHECKING:
- from typing import IO
- from typing import Callable
+ from typing import Callable, IO
+
+ from .types import CursorXY
@dataclass
@dataclass
class Console(ABC):
- posxy: tuple[int, int]
- screen: list[str] = field(default_factory=list)
+ posxy: CursorXY = (0, 0)
height: int = 25
width: int = 80
+ _redraw_debug_palette: tuple[str, ...] = (
+ "\x1b[41m",
+ "\x1b[42m",
+ "\x1b[43m",
+ "\x1b[44m",
+ "\x1b[45m",
+ "\x1b[46m",
+ )
def __init__(
self,
else:
self.output_fd = f_out.fileno()
+ self.posxy = (0, 0)
+ self.height = 25
+ self.width = 80
+ self._rendered_screen = RenderedScreen.empty()
+ self._redraw_visual_cycle = 0
+
+ @property
+ def screen(self) -> list[str]:
+ return list(self._rendered_screen.screen_lines)
+
+ def sync_rendered_screen(
+ self,
+ rendered_screen: RenderedScreen,
+ posxy: CursorXY | None = None,
+ ) -> None:
+ if posxy is None:
+ posxy = rendered_screen.cursor
+ self.posxy = posxy
+ self._rendered_screen = rendered_screen
+ trace(
+ "console.sync_rendered_screen lines={lines} cursor={cursor}",
+ lines=len(rendered_screen.composed_lines),
+ cursor=posxy,
+ )
+
+ def invalidate_render_state(self) -> None:
+ self._rendered_screen = RenderedScreen.empty()
+ trace("console.invalidate_render_state")
+
+ def begin_redraw_visualization(self) -> str | None:
+ if "PYREPL_VISUALIZE_REDRAWS" not in os.environ:
+ return None
+
+ palette = self._redraw_debug_palette
+ cycle = self._redraw_visual_cycle
+ style = palette[cycle % len(palette)]
+ self._redraw_visual_cycle = cycle + 1
+ trace(
+ "console.begin_redraw_visualization cycle={cycle} style={style!r}",
+ cycle=cycle,
+ style=style,
+ )
+ return style
+
@abstractmethod
- def refresh(self, screen: list[str], xy: tuple[int, int]) -> None: ...
+ def refresh(self, rendered_screen: RenderedScreen) -> None: ...
@abstractmethod
def prepare(self) -> None: ...
--- /dev/null
+from __future__ import annotations
+
+from dataclasses import dataclass
+
+from .utils import ColorSpan, StyleRef, THEME, iter_display_chars, unbracket, wlen
+
+
+@dataclass(frozen=True, slots=True)
+class ContentFragment:
+ """A single display character with its visual width and style.
+
+ The body of ``>>> def greet`` becomes one fragment per character::
+
+ d e f g r e e t
+ ╰──┴──╯ ╰──┴──┴──┴──╯
+ keyword (unstyled)
+
+ e.g. ``ContentFragment("d", 1, StyleRef(tag="keyword"))``.
+ """
+
+ text: str
+ width: int
+ style: StyleRef = StyleRef()
+
+
+@dataclass(frozen=True, slots=True)
+class PromptContent:
+ """The prompt split into leading full-width lines and an inline portion.
+
+ For the common ``">>> "`` prompt (no newlines)::
+
+ >>> def greet(name):
+ ╰─╯
+ text=">>> ", width=4, leading_lines=()
+
+ If ``sys.ps1`` contains newlines, e.g. ``"Python 3.13\\n>>> "``::
+
+ Python 3.13 ← leading_lines[0]
+ >>> def greet(name):
+ ╰─╯
+ text=">>> ", width=4
+ """
+
+ leading_lines: tuple[ContentFragment, ...]
+ text: str
+ width: int
+
+
+@dataclass(frozen=True, slots=True)
+class SourceLine:
+ """One logical line from the editor buffer, before styling.
+
+ Given this two-line input in the REPL::
+
+ >>> def greet(name):
+ ... return name
+ ▲ cursor
+
+ The buffer ``"def greet(name):\\n return name"`` yields::
+
+ SourceLine(lineno=0, text="def greet(name):",
+ start_offset=0, has_newline=True)
+ SourceLine(lineno=1, text=" return name",
+ start_offset=17, cursor_index=14)
+ """
+
+ lineno: int
+ text: str
+ start_offset: int
+ has_newline: bool
+ cursor_index: int | None = None
+
+ @property
+ def cursor_on_line(self) -> bool:
+ return self.cursor_index is not None
+
+
+@dataclass(frozen=True, slots=True)
+class ContentLine:
+ """A logical line paired with its prompt and styled body.
+
+ For ``>>> def greet(name):``::
+
+ >>> def greet(name):
+ ╰─╯ ╰──────────────╯
+ prompt body: one ContentFragment per character
+ """
+
+ source: SourceLine
+ prompt: PromptContent
+ body: tuple[ContentFragment, ...]
+
+
+def process_prompt(prompt: str) -> PromptContent:
+ r"""Return prompt content with width measured without zero-width markup."""
+
+ prompt_text = unbracket(prompt, including_content=False)
+ visible_prompt = unbracket(prompt, including_content=True)
+ leading_lines: list[ContentFragment] = []
+
+ while "\n" in prompt_text:
+ leading_text, _, prompt_text = prompt_text.partition("\n")
+ visible_leading, _, visible_prompt = visible_prompt.partition("\n")
+ leading_lines.append(ContentFragment(leading_text, wlen(visible_leading)))
+
+ return PromptContent(tuple(leading_lines), prompt_text, wlen(visible_prompt))
+
+
+def build_body_fragments(
+ buffer: str,
+ colors: list[ColorSpan] | None,
+ start_index: int,
+) -> tuple[ContentFragment, ...]:
+ """Convert a line's text into styled content fragments."""
+ # Two separate loops to avoid the THEME() call in the common uncolored path.
+ if colors is None:
+ return tuple(
+ ContentFragment(
+ styled_char.text,
+ styled_char.width,
+ StyleRef(),
+ )
+ for styled_char in iter_display_chars(buffer, colors, start_index)
+ )
+
+ theme = THEME()
+ return tuple(
+ ContentFragment(
+ styled_char.text,
+ styled_char.width,
+ StyleRef.from_tag(styled_char.tag, theme[styled_char.tag])
+ if styled_char.tag
+ else StyleRef(),
+ )
+ for styled_char in iter_display_chars(buffer, colors, start_index)
+ )
if r.get_unicode() != r.history[r.historyi]:
r.buffer = list(r.history[r.historyi])
r.pos = len(r.buffer)
- r.dirty = True
+ r.invalidate_buffer(0)
class first_history(commands.Command):
o = len(r.yank_arg_yanked)
else:
o = 0
+ start = r.pos - o
b[r.pos - o : r.pos] = list(w)
r.yank_arg_yanked = w
r.pos += len(w) - o
- r.dirty = True
+ r.invalidate_buffer(start)
class forward_history_isearch(commands.Command):
r.isearch_direction = ISEARCH_DIRECTION_FORWARDS
r.isearch_start = r.historyi, r.pos
r.isearch_term = ""
- r.dirty = True
+ r.invalidate_prompt()
r.push_input_trans(r.isearch_trans)
def do(self) -> None:
r = self.reader
r.isearch_direction = ISEARCH_DIRECTION_BACKWARDS
- r.dirty = True
+ r.invalidate_prompt()
r.isearch_term = ""
r.push_input_trans(r.isearch_trans)
r.isearch_start = r.historyi, r.pos
r.pop_input_trans()
r.select_item(r.isearch_start[0])
r.pos = r.isearch_start[1]
- r.dirty = True
+ r.invalidate_prompt()
class isearch_add_character(commands.Command):
r = self.reader
b = r.buffer
r.isearch_term += self.event[-1]
- r.dirty = True
+ r.invalidate_prompt()
p = r.pos + len(r.isearch_term) - 1
if b[p : p + 1] != [r.isearch_term[-1]]:
r.isearch_next()
r = self.reader
if len(r.isearch_term) > 0:
r.isearch_term = r.isearch_term[:-1]
- r.dirty = True
+ r.invalidate_prompt()
else:
r.error("nothing to rubout")
r.isearch_direction = ISEARCH_DIRECTION_NONE
r.console.forgetinput()
r.pop_input_trans()
- r.dirty = True
+ r.invalidate_prompt()
@dataclass
self.buffer = list(buf)
self.historyi = i
self.pos = len(self.buffer)
- self.dirty = True
- self.last_refresh_cache.invalidated = True
+ self.invalidate_buffer(0)
def get_item(self, i: int) -> str:
if i != len(self.history):
if forwards and not match_prefix:
self.pos = 0
self.buffer = []
- self.dirty = True
+ self.invalidate_buffer(0)
else:
self.error("not found")
return
from abc import ABC, abstractmethod
import unicodedata
from collections import deque
+from typing import TYPE_CHECKING
# types
-if False:
+if TYPE_CHECKING:
from .types import EventTuple
--- /dev/null
+"""Wrap content lines to the terminal width before rendering."""
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import Self
+
+from .content import ContentFragment, ContentLine
+from .types import CursorXY, ScreenInfoRow
+
+
+@dataclass(frozen=True, slots=True)
+class LayoutRow:
+ """Metadata for one physical screen row.
+
+ For the row ``>>> def greet(name):``::
+
+ >>> def greet(name):
+ ╰─╯ ╰──────────────╯
+ 4 char_widths=(1,1,1,…) ← 16 entries
+ buffer_advance=17 ← includes the newline
+ """
+
+ prompt_width: int
+ char_widths: tuple[int, ...]
+ suffix_width: int = 0
+ buffer_advance: int = 0
+
+ @property
+ def width(self) -> int:
+ return self.prompt_width + sum(self.char_widths) + self.suffix_width
+
+ @property
+ def screeninfo(self) -> ScreenInfoRow:
+ widths = list(self.char_widths)
+ if self.suffix_width:
+ widths.append(self.suffix_width)
+ return self.prompt_width, widths
+
+
+@dataclass(frozen=True, slots=True)
+class LayoutMap:
+ """Mapping between buffer positions and screen coordinates.
+
+ Single source of truth for cursor placement. Given::
+
+ >>> def greet(name): ← row 0, buffer_advance=17
+ ... return name ← row 1, buffer_advance=15
+ ▲cursor
+
+ ``pos_to_xy(31)`` → ``(18, 1)``: prompt width 4 + 14 body chars.
+ """
+ rows: tuple[LayoutRow, ...]
+
+ @classmethod
+ def empty(cls) -> Self:
+ return cls((LayoutRow(0, ()),))
+
+ @property
+ def screeninfo(self) -> list[ScreenInfoRow]:
+ return [row.screeninfo for row in self.rows]
+
+ def max_column(self, y: int) -> int:
+ return self.rows[y].width
+
+ def max_row(self) -> int:
+ return len(self.rows) - 1
+
+ def pos_to_xy(self, pos: int) -> CursorXY:
+ if not self.rows:
+ return 0, 0
+
+ remaining = pos
+ for y, row in enumerate(self.rows):
+ if remaining <= len(row.char_widths):
+ # Prompt-only leading rows are terminal scenery, not real
+ # buffer positions. Treating them as real just manufactures
+ # bugs.
+ if remaining == 0 and not row.char_widths and row.buffer_advance == 0 and y < len(self.rows) - 1:
+ continue
+ x = row.prompt_width
+ for width in row.char_widths[:remaining]:
+ x += width
+ return x, y
+ remaining -= row.buffer_advance
+ last_row = self.rows[-1]
+ return last_row.width - last_row.suffix_width, len(self.rows) - 1
+
+ def xy_to_pos(self, x: int, y: int) -> int:
+ if not self.rows:
+ return 0
+
+ pos = 0
+ for row in self.rows[:y]:
+ pos += row.buffer_advance
+
+ row = self.rows[y]
+ cur_x = row.prompt_width
+ char_widths = row.char_widths
+ i = 0
+ for i, width in enumerate(char_widths):
+ if cur_x >= x:
+ # Include trailing zero-width (combining) chars at this position
+ for trailing_width in char_widths[i:]:
+ if trailing_width == 0:
+ pos += 1
+ else:
+ break
+ return pos
+ if width == 0:
+ pos += 1
+ continue
+ cur_x += width
+ pos += 1
+ return pos
+
+
+@dataclass(frozen=True, slots=True)
+class WrappedRow:
+ """One physical screen row after wrapping, ready for rendering.
+
+ When a line overflows the terminal width, it splits into
+ multiple rows with a ``\\`` continuation marker::
+
+ >>> x = "a very long li\\ ← suffix="\\", suffix_width=1
+ ne that wraps" ← prompt_text="" (continuation)
+ """
+ prompt_text: str = ""
+ prompt_width: int = 0
+ fragments: tuple[ContentFragment, ...] = ()
+ layout_widths: tuple[int, ...] = ()
+ suffix: str = ""
+ suffix_width: int = 0
+ buffer_advance: int = 0
+
+
+@dataclass(frozen=True, slots=True)
+class LayoutResult:
+ wrapped_rows: tuple[WrappedRow, ...]
+ layout_map: LayoutMap
+ line_end_offsets: tuple[int, ...]
+
+
+def layout_content_lines(
+ lines: tuple[ContentLine, ...],
+ width: int,
+ start_offset: int,
+) -> LayoutResult:
+ """Wrap content lines to fit *width* columns.
+
+ A short line passes through as one ``WrappedRow``; a long line is
+ split at the column boundary with ``\\`` markers::
+
+ >>> short = 1 ← one WrappedRow
+ >>> x = "a long stri\\ ← two WrappedRows, first has suffix="\\"
+ ng"
+ """
+ if width <= 0:
+ return LayoutResult((), LayoutMap(()), ())
+
+ offset = start_offset
+ wrapped_rows: list[WrappedRow] = []
+ layout_rows: list[LayoutRow] = []
+ line_end_offsets: list[int] = []
+
+ for line in lines:
+ newline_advance = int(line.source.has_newline)
+ for leading in line.prompt.leading_lines:
+ line_end_offsets.append(offset)
+ wrapped_rows.append(
+ WrappedRow(
+ fragments=(leading,),
+ )
+ )
+ layout_rows.append(LayoutRow(0, (), buffer_advance=0))
+
+ prompt_text = line.prompt.text
+ prompt_width = line.prompt.width
+ body = tuple(line.body)
+ body_widths = tuple(fragment.width for fragment in body)
+
+ # Fast path: line fits on one row.
+ if not body_widths or (sum(body_widths) + prompt_width) < width:
+ offset += len(body) + newline_advance
+ line_end_offsets.append(offset)
+ wrapped_rows.append(
+ WrappedRow(
+ prompt_text=prompt_text,
+ prompt_width=prompt_width,
+ fragments=body,
+ layout_widths=body_widths,
+ buffer_advance=len(body) + newline_advance,
+ )
+ )
+ layout_rows.append(
+ LayoutRow(
+ prompt_width,
+ body_widths,
+ buffer_advance=len(body) + newline_advance,
+ )
+ )
+ continue
+
+ # Slow path: line needs wrapping.
+ current_prompt = prompt_text
+ current_prompt_width = prompt_width
+ start = 0
+ total = len(body)
+ while True:
+ # Find how many characters fit on this row.
+ index_to_wrap_before = 0
+ column = 0
+ for char_width in body_widths[start:]:
+ if column + char_width + current_prompt_width >= width:
+ break
+ index_to_wrap_before += 1
+ column += char_width
+
+ if index_to_wrap_before == 0 and start < total:
+ index_to_wrap_before = 1 # force progress
+
+ at_line_end = (start + index_to_wrap_before) >= total
+ if at_line_end:
+ offset += index_to_wrap_before + newline_advance
+ suffix = ""
+ suffix_width = 0
+ buffer_advance = index_to_wrap_before + newline_advance
+ else:
+ offset += index_to_wrap_before
+ suffix = "\\"
+ suffix_width = 1
+ buffer_advance = index_to_wrap_before
+
+ end = start + index_to_wrap_before
+ row_fragments = body[start:end]
+ row_widths = body_widths[start:end]
+ line_end_offsets.append(offset)
+ wrapped_rows.append(
+ WrappedRow(
+ prompt_text=current_prompt,
+ prompt_width=current_prompt_width,
+ fragments=row_fragments,
+ layout_widths=row_widths,
+ suffix=suffix,
+ suffix_width=suffix_width,
+ buffer_advance=buffer_advance,
+ )
+ )
+ layout_rows.append(
+ LayoutRow(
+ current_prompt_width,
+ row_widths,
+ suffix_width=suffix_width,
+ buffer_advance=buffer_advance,
+ )
+ )
+
+ start = end
+ current_prompt = ""
+ current_prompt_width = 0
+ if at_line_end:
+ break
+
+ return LayoutResult(
+ tuple(wrapped_rows),
+ LayoutMap(tuple(layout_rows)),
+ tuple(line_end_offsets),
+ )
import _colorize
from contextlib import contextmanager
-from dataclasses import dataclass, field, fields
+from dataclasses import dataclass, field, fields, replace
+from typing import Self
from . import commands, console, input
-from .utils import wlen, unbracket, disp_str, gen_colors, THEME
+from .content import (
+ ContentFragment,
+ ContentLine,
+ SourceLine,
+ build_body_fragments,
+ process_prompt as build_prompt_content,
+)
+from .layout import LayoutMap, LayoutResult, LayoutRow, WrappedRow, layout_content_lines
+from .render import RenderCell, RenderLine, RenderedScreen, ScreenOverlay
+from .utils import ANSI_ESCAPE_SEQUENCE, THEME, StyleRef, wlen, gen_colors
from .trace import trace
# types
Command = commands.Command
-from .types import Callback, SimpleContextManager, KeySpec, CommandName
+from .types import (
+ Callback,
+ CommandName,
+ CursorXY,
+ Dimensions,
+ EventData,
+ KeySpec,
+ Keymap,
+ ScreenInfoRow,
+ SimpleContextManager,
+)
+
+type CommandClass = type[Command]
+type CommandInput = tuple[CommandName | CommandClass, EventData]
+type PromptCellCacheKey = tuple[str, bool]
# syntax classes
return st
-def make_default_commands() -> dict[CommandName, type[Command]]:
- result: dict[CommandName, type[Command]] = {}
+def make_default_commands() -> dict[CommandName, CommandClass]:
+ result: dict[CommandName, CommandClass] = {}
for v in vars(commands).values():
if isinstance(v, type) and issubclass(v, Command) and v.__name__[0].islower():
result[v.__name__] = v
return result
-default_keymap: tuple[tuple[KeySpec, CommandName], ...] = tuple(
+default_keymap: Keymap = tuple(
[
(r"\C-a", "beginning-of-line"),
(r"\C-b", "left"),
)
+@dataclass(frozen=True, slots=True)
+class RefreshInvalidation:
+ """Which parts of the screen need to be recomputed on the next refresh."""
+
+ cursor_only: bool = False
+ buffer_from_pos: int | None = None
+ prompt: bool = False
+ layout: bool = False
+ theme: bool = False
+ message: bool = False
+ overlay: bool = False
+ full: bool = False
+
+ @classmethod
+ def empty(cls) -> Self:
+ return cls()
+
+ @property
+ def needs_screen_refresh(self) -> bool:
+ return any(
+ (
+ self.buffer_from_pos is not None,
+ self.prompt,
+ self.layout,
+ self.theme,
+ self.message,
+ self.overlay,
+ self.full,
+ )
+ )
+
+ @property
+ def is_cursor_only(self) -> bool:
+ return self.cursor_only and not self.needs_screen_refresh
+
+ @property
+ def buffer_rebuild_from_pos(self) -> int | None:
+ if self.full or self.prompt or self.layout or self.theme:
+ return 0
+ return self.buffer_from_pos
+
+ def with_cursor(self) -> Self:
+ if self.needs_screen_refresh:
+ return self
+ return replace(self, cursor_only=True)
+
+ def with_buffer(self, from_pos: int) -> Self:
+ current = from_pos
+ if self.buffer_from_pos is not None:
+ current = min(current, self.buffer_from_pos)
+ return replace(self, cursor_only=False, buffer_from_pos=current)
+
+ def with_prompt(self) -> Self:
+ return replace(self, cursor_only=False, prompt=True)
+
+ def with_layout(self) -> Self:
+ return replace(self, cursor_only=False, layout=True)
+
+ def with_theme(self) -> Self:
+ return replace(self, cursor_only=False, theme=True)
+
+ def with_message(self) -> Self:
+ return replace(self, cursor_only=False, message=True)
+
+ def with_overlay(self) -> Self:
+ return replace(self, cursor_only=False, overlay=True)
+
+ def with_full(self) -> Self:
+ return replace(self, cursor_only=False, full=True)
+
+
@dataclass(slots=True)
class Reader:
"""The Reader class implements the bare bones of a command reader,
* pos:
A 0-based index into 'buffer' for where the insertion point
is.
- * screeninfo:
- A list of screen position tuples. Each list element is a tuple
- representing information on visible line length for a given line.
- Allows for efficient skipping of color escape sequences.
+ * layout:
+ A mapping between buffer positions and rendered rows/columns.
+ It is the internal source of truth for cursor placement.
* cxy, lxy:
the position of the insertion point in screen ...
* syntax_table:
* arg:
The emacs-style prefix argument. It will be None if no such
argument has been provided.
- * dirty:
- True if we need to refresh the display.
* kill_ring:
The emacs-style kill-ring; manipulated with yank & yank-pop
* ps1, ps2, ps3, ps4:
kill_ring: list[list[str]] = field(default_factory=list)
msg: str = ""
arg: int | None = None
- dirty: bool = False
finished: bool = False
paste_mode: bool = False
- commands: dict[str, type[Command]] = field(default_factory=make_default_commands)
- last_command: type[Command] | None = None
+ commands: dict[CommandName, CommandClass] = field(default_factory=make_default_commands)
+ last_command: CommandClass | None = None
syntax_table: dict[str, int] = field(default_factory=make_default_syntax_table)
- keymap: tuple[tuple[str, str], ...] = ()
+ keymap: Keymap = ()
input_trans: input.KeymapTranslator = field(init=False)
input_trans_stack: list[input.KeymapTranslator] = field(default_factory=list)
- screen: list[str] = field(default_factory=list)
- screeninfo: list[tuple[int, list[int]]] = field(init=False)
- cxy: tuple[int, int] = field(init=False)
- lxy: tuple[int, int] = field(init=False)
- scheduled_commands: list[str] = field(default_factory=list)
+ rendered_screen: RenderedScreen = field(init=False)
+ layout: LayoutMap = field(init=False)
+ cxy: CursorXY = field(init=False)
+ lxy: CursorXY = field(init=False)
+ scheduled_commands: list[CommandName] = field(default_factory=list)
can_colorize: bool = False
threading_hook: Callback | None = None
+ invalidation: RefreshInvalidation = field(init=False)
## cached metadata to speed up screen refreshes
@dataclass
class RefreshCache:
- screen: list[str] = field(default_factory=list)
- screeninfo: list[tuple[int, list[int]]] = field(init=False)
+ """Previously computed render/layout data for incremental refresh."""
+
+ render_lines: list[RenderLine] = field(default_factory=list)
+ layout_rows: list[LayoutRow] = field(default_factory=list)
line_end_offsets: list[int] = field(default_factory=list)
- pos: int = field(init=False)
- cxy: tuple[int, int] = field(init=False)
- dimensions: tuple[int, int] = field(init=False)
- invalidated: bool = False
+ pos: int = 0
+ dimensions: Dimensions = (0, 0)
def update_cache(self,
reader: Reader,
- screen: list[str],
- screeninfo: list[tuple[int, list[int]]],
+ render_lines: list[RenderLine],
+ layout_rows: list[LayoutRow],
+ line_end_offsets: list[int],
) -> None:
- self.screen = screen.copy()
- self.screeninfo = screeninfo.copy()
+ self.render_lines = render_lines.copy()
+ self.layout_rows = layout_rows.copy()
+ self.line_end_offsets = line_end_offsets.copy()
self.pos = reader.pos
- self.cxy = reader.cxy
self.dimensions = reader.console.width, reader.console.height
- self.invalidated = False
def valid(self, reader: Reader) -> bool:
- if self.invalidated:
- return False
dimensions = reader.console.width, reader.console.height
dimensions_changed = dimensions != self.dimensions
return not dimensions_changed
- def get_cached_location(self, reader: Reader) -> tuple[int, int]:
- if self.invalidated:
- raise ValueError("Cache is invalidated")
- offset = 0
- earliest_common_pos = min(reader.pos, self.pos)
+ def get_cached_location(
+ self,
+ reader: Reader,
+ buffer_from_pos: int | None = None,
+ *,
+ reuse_full: bool = False,
+ ) -> tuple[int, int]:
+ """Return (buffer_offset, num_reusable_lines) for incremental refresh.
+
+ Three paths:
+ - reuse_full (overlay/message-only): reuse all cached lines.
+ - buffer_from_pos=None (full rebuild): rewind to common cursor pos.
+ - explicit buffer_from_pos: reuse lines before that position.
+ """
+ if reuse_full:
+ if self.line_end_offsets:
+ last_offset = self.line_end_offsets[-1]
+ if last_offset >= len(reader.buffer):
+ return last_offset, len(self.line_end_offsets)
+ return 0, 0
+ if buffer_from_pos is None:
+ buffer_from_pos = min(reader.pos, self.pos)
num_common_lines = len(self.line_end_offsets)
while num_common_lines > 0:
- offset = self.line_end_offsets[num_common_lines - 1]
- if earliest_common_pos > offset:
+ candidate = self.line_end_offsets[num_common_lines - 1]
+ if buffer_from_pos > candidate:
break
num_common_lines -= 1
- else:
- offset = 0
+ # Prompt-only leading rows consume no buffer content. Reusing them
+ # in isolation causes the next incremental rebuild to emit them a
+ # second time.
+ while (
+ num_common_lines > 0
+ and self.layout_rows[num_common_lines - 1].buffer_advance == 0
+ ):
+ num_common_lines -= 1
+ offset = self.line_end_offsets[num_common_lines - 1] if num_common_lines else 0
return offset, num_common_lines
last_refresh_cache: RefreshCache = field(default_factory=RefreshCache)
self.input_trans = input.KeymapTranslator(
self.keymap, invalid_cls="invalid-key", character_cls="self-insert"
)
- self.screeninfo = [(0, [])]
+ self.layout = LayoutMap.empty()
self.cxy = self.pos2xy()
self.lxy = (self.pos, 0)
+ self.rendered_screen = RenderedScreen.empty()
self.can_colorize = _colorize.can_colorize()
+ self.invalidation = RefreshInvalidation.empty()
- self.last_refresh_cache.screeninfo = self.screeninfo
+ self.last_refresh_cache.layout_rows = list(self.layout.rows)
self.last_refresh_cache.pos = self.pos
- self.last_refresh_cache.cxy = self.cxy
+
self.last_refresh_cache.dimensions = (0, 0)
- def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]:
- return default_keymap
+ @property
+ def screen(self) -> list[str]:
+ return list(self.rendered_screen.screen_lines)
- def calc_screen(self) -> list[str]:
- """Translate changes in self.buffer into changes in self.console.screen."""
- # Since the last call to calc_screen:
- # screen and screeninfo may differ due to a completion menu being shown
- # pos and cxy may differ due to edits, cursor movements, or completion menus
+ @property
+ def screeninfo(self) -> list[ScreenInfoRow]:
+ return self.layout.screeninfo
- # Lines that are above both the old and new cursor position can't have changed,
- # unless the terminal has been resized (which might cause reflowing) or we've
- # entered or left paste mode (which changes prompts, causing reflowing).
+ def collect_keymap(self) -> Keymap:
+ return default_keymap
+
+ def calc_screen(self) -> RenderedScreen:
+ """Translate the editable buffer into a base rendered screen."""
num_common_lines = 0
offset = 0
if self.last_refresh_cache.valid(self):
- offset, num_common_lines = self.last_refresh_cache.get_cached_location(self)
-
- screen = self.last_refresh_cache.screen
- del screen[num_common_lines:]
-
- screeninfo = self.last_refresh_cache.screeninfo
- del screeninfo[num_common_lines:]
-
- last_refresh_line_end_offsets = self.last_refresh_cache.line_end_offsets
- del last_refresh_line_end_offsets[num_common_lines:]
+ if (
+ self.invalidation.buffer_from_pos is None
+ and not (
+ self.invalidation.full
+ or self.invalidation.prompt
+ or self.invalidation.layout
+ or self.invalidation.theme
+ )
+ and (self.invalidation.message or self.invalidation.overlay)
+ ):
+ # Fast path: only overlays or messages changed.
+ offset, num_common_lines = self.last_refresh_cache.get_cached_location(
+ self,
+ reuse_full=True,
+ )
+ assert not self.last_refresh_cache.line_end_offsets or (
+ self.last_refresh_cache.line_end_offsets[-1] >= len(self.buffer)
+ ), "Buffer modified without invalidate_buffer() call"
+ else:
+ offset, num_common_lines = self.last_refresh_cache.get_cached_location(
+ self,
+ self._buffer_refresh_from_pos(),
+ )
+
+ base_render_lines = self.last_refresh_cache.render_lines[:num_common_lines]
+ layout_rows = self.last_refresh_cache.layout_rows[:num_common_lines]
+ last_refresh_line_end_offsets = self.last_refresh_cache.line_end_offsets[:num_common_lines]
+
+ source_lines = self._build_source_lines(offset, num_common_lines)
+ content_lines = self._build_content_lines(
+ source_lines,
+ prompt_from_cache=bool(offset and self.buffer[offset - 1] != "\n"),
+ )
+ layout_result = self._layout_content(content_lines, offset)
+ base_render_lines.extend(self._render_wrapped_rows(layout_result.wrapped_rows))
+ layout_rows.extend(layout_result.layout_map.rows)
+ last_refresh_line_end_offsets.extend(layout_result.line_end_offsets)
- pos = self.pos
- pos -= offset
+ self.layout = LayoutMap(tuple(layout_rows))
+ self.cxy = self.pos2xy()
+ if not source_lines:
+ # reuse_full path: _build_source_lines didn't run,
+ # so lxy wasn't updated. Derive it from the buffer.
+ self.lxy = self._compute_lxy()
+ self.last_refresh_cache.update_cache(
+ self,
+ base_render_lines,
+ layout_rows,
+ last_refresh_line_end_offsets,
+ )
+ return RenderedScreen(tuple(base_render_lines), self.cxy)
- prompt_from_cache = (offset and self.buffer[offset - 1] != "\n")
+ def _buffer_refresh_from_pos(self) -> int:
+ """Return buffer position from which to rebuild content.
- if self.can_colorize:
- colors = list(gen_colors(self.get_unicode()))
+ Returns 0 (full rebuild) when no incremental position is known.
+ """
+ buffer_from_pos = self.invalidation.buffer_rebuild_from_pos
+ if buffer_from_pos is not None:
+ return buffer_from_pos
+ return 0
+
+ def _compute_lxy(self) -> CursorXY:
+ """Derive logical cursor (col, lineno) from the buffer and pos."""
+ text = "".join(self.buffer[:self.pos])
+ lineno = text.count("\n")
+ if lineno:
+ col = self.pos - text.rindex("\n") - 1
else:
- colors = None
- trace("colors = {colors}", colors=colors)
+ col = self.pos
+ return col, lineno
+
+ def _build_source_lines(
+ self,
+ offset: int,
+ first_lineno: int,
+ ) -> tuple[SourceLine, ...]:
+ if offset == len(self.buffer) and (offset > 0 or first_lineno > 0):
+ return ()
+
+ pos = self.pos - offset
lines = "".join(self.buffer[offset:]).split("\n")
cursor_found = False
lines_beyond_cursor = 0
- for ln, line in enumerate(lines, num_common_lines):
+ source_lines: list[SourceLine] = []
+ current_offset = offset
+
+ for line_index, line in enumerate(lines):
+ lineno = first_lineno + line_index
+ has_newline = line_index < len(lines) - 1
line_len = len(line)
+ cursor_index: int | None = None
if 0 <= pos <= line_len:
- self.lxy = pos, ln
+ cursor_index = pos
+ self.lxy = pos, lineno
cursor_found = True
elif cursor_found:
lines_beyond_cursor += 1
if lines_beyond_cursor > self.console.height:
- # No need to keep formatting lines.
- # The console can't show them.
break
+
+ source_lines.append(
+ SourceLine(
+ lineno=lineno,
+ text=line,
+ start_offset=current_offset,
+ has_newline=has_newline,
+ cursor_index=cursor_index,
+ )
+ )
+ pos -= line_len + 1
+ current_offset += line_len + (1 if has_newline else 0)
+
+ return tuple(source_lines)
+
+ def _build_content_lines(
+ self,
+ source_lines: tuple[SourceLine, ...],
+ *,
+ prompt_from_cache: bool,
+ ) -> tuple[ContentLine, ...]:
+ if self.can_colorize:
+ colors = list(gen_colors(self.get_unicode()))
+ else:
+ colors = None
+ trace("colors = {colors}", colors=colors)
+
+ content_lines: list[ContentLine] = []
+ for source_line in source_lines:
if prompt_from_cache:
- # Only the first line's prompt can come from the cache
prompt_from_cache = False
prompt = ""
else:
- prompt = self.get_prompt(ln, line_len >= pos >= 0)
- while "\n" in prompt:
- pre_prompt, _, prompt = prompt.partition("\n")
- last_refresh_line_end_offsets.append(offset)
- screen.append(pre_prompt)
- screeninfo.append((0, []))
- pos -= line_len + 1
- prompt, prompt_len = self.process_prompt(prompt)
- chars, char_widths = disp_str(line, colors, offset)
- wrapcount = (sum(char_widths) + prompt_len) // self.console.width
- if wrapcount == 0 or not char_widths:
- offset += line_len + 1 # Takes all of the line plus the newline
- last_refresh_line_end_offsets.append(offset)
- screen.append(prompt + "".join(chars))
- screeninfo.append((prompt_len, char_widths))
- else:
- pre = prompt
- prelen = prompt_len
- for wrap in range(wrapcount + 1):
- index_to_wrap_before = 0
- column = 0
- for char_width in char_widths:
- if column + char_width + prelen >= self.console.width:
- break
- index_to_wrap_before += 1
- column += char_width
- if len(chars) > index_to_wrap_before:
- offset += index_to_wrap_before
- post = "\\"
- after = [1]
- else:
- offset += index_to_wrap_before + 1 # Takes the newline
- post = ""
- after = []
- last_refresh_line_end_offsets.append(offset)
- render = pre + "".join(chars[:index_to_wrap_before]) + post
- render_widths = char_widths[:index_to_wrap_before] + after
- screen.append(render)
- screeninfo.append((prelen, render_widths))
- chars = chars[index_to_wrap_before:]
- char_widths = char_widths[index_to_wrap_before:]
- pre = ""
- prelen = 0
- self.screeninfo = screeninfo
- self.cxy = self.pos2xy()
- if self.msg:
- width = self.console.width
- for mline in self.msg.split("\n"):
- # If self.msg is larger than console width, make it fit
- # TODO: try to split between words?
- if not mline:
- screen.append("")
- screeninfo.append((0, []))
- continue
- for r in range((len(mline) - 1) // width + 1):
- screen.append(mline[r * width : (r + 1) * width])
- screeninfo.append((0, []))
-
- self.last_refresh_cache.update_cache(self, screen, screeninfo)
- return screen
+ prompt = self.get_prompt(source_line.lineno, source_line.cursor_on_line)
+ content_lines.append(
+ ContentLine(
+ source=source_line,
+ prompt=build_prompt_content(prompt),
+ body=build_body_fragments(
+ source_line.text,
+ colors,
+ source_line.start_offset,
+ ),
+ )
+ )
+ return tuple(content_lines)
+
+ def _layout_content(
+ self,
+ content_lines: tuple[ContentLine, ...],
+ offset: int,
+ ) -> LayoutResult:
+ return layout_content_lines(content_lines, self.console.width, offset)
+
+ def _render_wrapped_rows(
+ self,
+ wrapped_rows: tuple[WrappedRow, ...],
+ ) -> list[RenderLine]:
+ return [
+ self._render_line(
+ row.prompt_text,
+ row.fragments,
+ row.suffix,
+ )
+ for row in wrapped_rows
+ ]
+
+ def _render_message_lines(self) -> tuple[RenderLine, ...]:
+ if not self.msg:
+ return ()
+ width = self.console.width
+ render_lines: list[RenderLine] = []
+ for message_line in self.msg.split("\n"):
+ # If self.msg is larger than console width, make it fit.
+ # TODO: try to split between words?
+ if not message_line:
+ render_lines.append(RenderLine.from_rendered_text(""))
+ continue
+ for offset in range(0, len(message_line), width):
+ render_lines.append(
+ RenderLine.from_rendered_text(message_line[offset : offset + width])
+ )
+ return tuple(render_lines)
+
+ def get_screen_overlays(self) -> tuple[ScreenOverlay, ...]:
+ return ()
+
+ def compose_rendered_screen(self, base_screen: RenderedScreen) -> RenderedScreen:
+ overlays = list(self.get_screen_overlays())
+ message_lines = self._render_message_lines()
+ if message_lines:
+ overlays.append(ScreenOverlay(len(base_screen.lines), message_lines))
+ if not overlays:
+ return base_screen
+ return RenderedScreen(base_screen.lines, base_screen.cursor, tuple(overlays))
+
+ _prompt_cell_cache: dict[PromptCellCacheKey, tuple[RenderCell, ...]] = field(
+ init=False, default_factory=dict, repr=False
+ )
+
+ def _render_line(
+ self,
+ prefix: str,
+ fragments: tuple[ContentFragment, ...],
+ suffix: str = "",
+ ) -> RenderLine:
+ cells: list[RenderCell] = []
+ if prefix:
+ cache_key = (prefix, self.can_colorize)
+ cached = self._prompt_cell_cache.get(cache_key)
+ if cached is None:
+ prompt_cells = RenderLine.from_rendered_text(prefix).cells
+ if self.can_colorize and prompt_cells and not ANSI_ESCAPE_SEQUENCE.search(prefix):
+ prompt_style = StyleRef.from_tag("prompt", THEME()["prompt"])
+ prompt_cells = tuple(
+ RenderCell(
+ cell.text,
+ cell.width,
+ style=prompt_style if cell.text else cell.style,
+ controls=cell.controls,
+ )
+ for cell in prompt_cells
+ )
+ self._prompt_cell_cache[cache_key] = prompt_cells
+ cached = prompt_cells
+ cells.extend(cached)
+ cells.extend(
+ RenderCell(fragment.text, fragment.width, style=fragment.style)
+ for fragment in fragments
+ )
+ if suffix:
+ cells.extend(RenderLine.from_rendered_text(suffix).cells)
+ return RenderLine.from_cells(cells)
@staticmethod
def process_prompt(prompt: str) -> tuple[str, int]:
(\x01 and \x02) removed. The length ignores anything between those
brackets as well as any ANSI escape sequences.
"""
- out_prompt = unbracket(prompt, including_content=False)
- visible_prompt = unbracket(prompt, including_content=True)
- return out_prompt, wlen(visible_prompt)
+ prompt_content = build_prompt_content(prompt)
+ return prompt_content.text, prompt_content.width
def bow(self, p: int | None = None) -> int:
"""Return the 0-based index of the word break preceding p most
def max_column(self, y: int) -> int:
"""Return the last x-offset for line y"""
- return self.screeninfo[y][0] + sum(self.screeninfo[y][1])
+ return self.layout.max_column(y)
def max_row(self) -> int:
- return len(self.screeninfo) - 1
+ return self.layout.max_row()
def get_arg(self, default: int = 1) -> int:
"""Return any prefix argument that the user has supplied,
prompt = self.ps3
else:
prompt = self.ps1
-
- if self.can_colorize:
- t = THEME()
- prompt = f"{t.prompt}{prompt}{t.reset}"
return prompt
def push_input_trans(self, itrans: input.KeymapTranslator) -> None:
def setpos_from_xy(self, x: int, y: int) -> None:
"""Set pos according to coordinates x, y"""
- pos = 0
- i = 0
- while i < y:
- prompt_len, char_widths = self.screeninfo[i]
- offset = len(char_widths)
- in_wrapped_line = prompt_len + sum(char_widths) >= self.console.width
- if in_wrapped_line:
- pos += offset - 1 # -1 cause backslash is not in buffer
- else:
- pos += offset + 1 # +1 cause newline is in buffer
- i += 1
-
- j = 0
- cur_x = self.screeninfo[i][0]
- while cur_x < x:
- if self.screeninfo[i][1][j] == 0:
- j += 1 # prevent potential future infinite loop
- continue
- cur_x += self.screeninfo[i][1][j]
- j += 1
- pos += 1
+ self.pos = self.layout.xy_to_pos(x, y)
- self.pos = pos
-
- def pos2xy(self) -> tuple[int, int]:
+ def pos2xy(self) -> CursorXY:
"""Return the x, y coordinates of position 'pos'."""
+ assert 0 <= self.pos <= len(self.buffer)
+ return self.layout.pos_to_xy(self.pos)
+
+ def insert(self, text: str | list[str]) -> None:
+ """Insert 'text' at the insertion point."""
+ start = self.pos
+ self.buffer[self.pos : self.pos] = list(text)
+ self.pos += len(text)
+ self.invalidate_buffer(start)
- prompt_len, y = 0, 0
- char_widths: list[int] = []
- pos = self.pos
- assert 0 <= pos <= len(self.buffer)
+ def invalidate_cursor(self) -> None:
+ self.invalidation = self.invalidation.with_cursor()
- # optimize for the common case: typing at the end of the buffer
- if pos == len(self.buffer) and len(self.screeninfo) > 0:
- y = len(self.screeninfo) - 1
- prompt_len, char_widths = self.screeninfo[y]
- return prompt_len + sum(char_widths), y
+ def invalidate_buffer(self, from_pos: int) -> None:
+ self.invalidation = self.invalidation.with_buffer(from_pos)
- for prompt_len, char_widths in self.screeninfo:
- offset = len(char_widths)
- in_wrapped_line = prompt_len + sum(char_widths) >= self.console.width
- if in_wrapped_line:
- offset -= 1 # need to remove line-wrapping backslash
+ def invalidate_prompt(self) -> None:
+ self._prompt_cell_cache.clear()
+ self.invalidation = self.invalidation.with_prompt()
- if offset >= pos:
- break
+ def invalidate_layout(self) -> None:
+ self.invalidation = self.invalidation.with_layout()
- if not in_wrapped_line:
- offset += 1 # there's a newline in buffer
+ def invalidate_theme(self) -> None:
+ self._prompt_cell_cache.clear()
+ self.invalidation = self.invalidation.with_theme()
- pos -= offset
- y += 1
- return prompt_len + sum(char_widths[:pos]), y
+ def invalidate_message(self) -> None:
+ self.invalidation = self.invalidation.with_message()
- def insert(self, text: str | list[str]) -> None:
- """Insert 'text' at the insertion point."""
- self.buffer[self.pos : self.pos] = list(text)
- self.pos += len(text)
- self.dirty = True
+ def invalidate_overlay(self) -> None:
+ self.invalidation = self.invalidation.with_overlay()
+
+ def invalidate_full(self) -> None:
+ self.invalidation = self.invalidation.with_full()
+
+ def clear_invalidation(self) -> None:
+ self.invalidation = RefreshInvalidation.empty()
def update_cursor(self) -> None:
"""Move the cursor to reflect changes in self.pos"""
"""This function is called to allow post command cleanup."""
if getattr(cmd, "kills_digit_arg", True):
if self.arg is not None:
- self.dirty = True
+ self.invalidate_prompt()
self.arg = None
def prepare(self) -> None:
self.finished = False
del self.buffer[:]
self.pos = 0
- self.dirty = True
+ self.layout = LayoutMap.empty()
+ self.cxy = self.pos2xy()
+ self.lxy = (self.pos, 0)
+ self.rendered_screen = RenderedScreen.empty()
+ self.invalidate_full()
self.last_command = None
- self.calc_screen()
+ base_screen = self.calc_screen()
+ self.rendered_screen = self.compose_rendered_screen(base_screen)
+ self.invalidation = RefreshInvalidation.empty()
except BaseException:
self.restore()
raise
cmd = self.scheduled_commands.pop()
self.do_cmd((cmd, []))
- def last_command_is(self, cls: type) -> bool:
+ def last_command_is(self, cls: CommandClass) -> bool:
if not self.last_command:
return False
return issubclass(cls, self.last_command)
def error(self, msg: str = "none") -> None:
self.msg = "! " + msg + " "
- self.dirty = True
+ self.invalidate_message()
self.console.beep()
def update_screen(self) -> None:
- if self.dirty:
+ if self.invalidation.is_cursor_only:
+ self.update_cursor()
+ self.clear_invalidation()
+ elif self.invalidation.needs_screen_refresh:
self.refresh()
def refresh(self) -> None:
"""Recalculate and refresh the screen."""
self.console.height, self.console.width = self.console.getheightwidth()
# this call sets up self.cxy, so call it first.
- self.screen = self.calc_screen()
- self.console.refresh(self.screen, self.cxy)
- self.dirty = False
+ base_screen = self.calc_screen()
+ rendered_screen = self.compose_rendered_screen(base_screen)
+ self.rendered_screen = rendered_screen
+ trace(
+ "reader.refresh cursor={cursor} lines={lines} "
+ "dims=({width},{height}) invalidation={invalidation}",
+ cursor=self.cxy,
+ lines=len(rendered_screen.composed_lines),
+ width=self.console.width,
+ height=self.console.height,
+ invalidation=self.invalidation,
+ )
+ self.console.refresh(rendered_screen)
+ self.clear_invalidation()
- def do_cmd(self, cmd: tuple[str, list[str]]) -> None:
+ def do_cmd(self, cmd: CommandInput) -> None:
"""`cmd` is a tuple of "event_name" and "event", which in the current
implementation is always just the "buffer" which happens to be a list
of single-character strings."""
command.do()
self.after_command(command)
-
- if self.dirty:
- self.refresh()
- else:
- self.update_cursor()
-
- if not isinstance(cmd, commands.digit_arg):
+ if (
+ not self.invalidation.needs_screen_refresh
+ and not self.invalidation.is_cursor_only
+ ):
+ self.invalidate_cursor()
+ self.update_screen()
+
+ if command_type is not commands.digit_arg:
self.last_command = command_type
self.finished = bool(command.finish)
if self.msg:
self.msg = ""
- self.dirty = True
+ self.invalidate_message()
while True:
# We use the same timeout as in readline.c: 100ms
if event.evt == "key":
self.input_trans.push(event)
elif event.evt == "scroll":
+ self.invalidate_full()
self.refresh()
+ return True
elif event.evt == "resize":
+ self.invalidate_full()
self.refresh()
+ return True
else:
translate = False
def do(self) -> None:
r: ReadlineAlikeReader
r = self.reader # type: ignore[assignment]
- r.dirty = True # this is needed to hide the completion menu, if visible
+ r.invalidate_overlay() # hide completion menu, if visible
# if there are already several lines and the cursor
# is not on the last one, always insert a new \n.
break
r.pos -= repeat
del b[r.pos : r.pos + repeat]
- r.dirty = True
+ r.invalidate_buffer(r.pos)
else:
self.reader.error("can't backspace at start")
def get_completer_delims(self) -> str:
return "".join(sorted(self.config.completer_delims))
- def _histline(self, line: str) -> str:
+ def _histline(self, line: str, *, sanitize_nuls: bool = False) -> str:
line = line.rstrip("\n")
+ if "\0" in line:
+ if not sanitize_nuls:
+ raise ValueError("embedded null character")
+ line = line.replace("\0", "")
return line
def get_history_length(self) -> int:
if line.endswith("\r"):
buffer.append(line+'\n')
else:
- line = self._histline(line)
+ line = self._histline(line, sanitize_nuls=True)
if buffer:
- line = self._histline("".join(buffer).replace("\r", "") + line)
+ line = self._histline(
+ "".join(buffer).replace("\r", "") + line,
+ sanitize_nuls=True,
+ )
del buffer[:]
if line:
history.append(line)
--- /dev/null
+from __future__ import annotations
+
+from collections.abc import Iterable, Sequence
+from dataclasses import dataclass, field
+from typing import Literal, Protocol, Self
+
+from .utils import ANSI_ESCAPE_SEQUENCE, THEME, StyleRef, str_width
+from .types import CursorXY
+
+type RenderStyle = StyleRef | str | None
+type LineUpdateKind = Literal[
+ "insert_char",
+ "replace_char",
+ "replace_span",
+ "delete_then_insert",
+ "rewrite_suffix",
+]
+
+
+class _ThemeSyntax(Protocol):
+ """Protocol for theme objects that map tag names to SGR escape strings."""
+ def __getitem__(self, key: str, /) -> str: ...
+
+
+@dataclass(frozen=True, slots=True)
+class RenderCell:
+ """One terminal cell: a character, its column width, and SGR style.
+
+ A screen row like ``>>> def`` is a sequence of cells::
+
+ > > > d e f
+ ╰─╯╰─╯╰─╯╰─╯╰─╯╰─╯╰─╯
+ """
+
+ text: str
+ width: int
+ style: StyleRef = field(default_factory=StyleRef)
+ controls: tuple[str, ...] = ()
+
+ @property
+ def terminal_text(self) -> str:
+ return render_cells((self,))
+
+
+def _theme_style(theme: _ThemeSyntax, tag: str) -> str:
+ return theme[tag]
+
+
+def _style_escape(style: StyleRef) -> str:
+ if style.sgr:
+ return style.sgr
+ if style.tag is None:
+ return ""
+ return _theme_style(THEME(), style.tag)
+
+
+def _update_terminal_state(state: str, escape: str) -> str:
+ if escape in {"\x1b[0m", "\x1b[m"}:
+ return ""
+ return state + escape
+
+
+def _cells_from_rendered_text(text: str) -> tuple[RenderCell, ...]:
+ if not text:
+ return ()
+
+ cells: list[RenderCell] = []
+ pending_controls: list[str] = []
+ active_sgr = ""
+ index = 0
+
+ def append_plain_text(segment: str) -> None:
+ nonlocal pending_controls
+ if not segment:
+ return
+ if pending_controls:
+ cells.append(RenderCell("", 0, controls=tuple(pending_controls)))
+ pending_controls = []
+ for char in segment:
+ cells.append(
+ RenderCell(
+ char,
+ str_width(char),
+ style=StyleRef.from_sgr(active_sgr),
+ )
+ )
+
+ for match in ANSI_ESCAPE_SEQUENCE.finditer(text):
+ append_plain_text(text[index : match.start()])
+ escape = match.group(0)
+ if escape.endswith("m"):
+ active_sgr = _update_terminal_state(active_sgr, escape)
+ else:
+ pending_controls.append(escape)
+ index = match.end()
+
+ append_plain_text(text[index:])
+ if pending_controls:
+ cells.append(RenderCell("", 0, controls=tuple(pending_controls)))
+
+ return tuple(cells)
+
+
+@dataclass(frozen=True, slots=True)
+class RenderLine:
+ """One physical screen row as a tuple of :class:`RenderCell` objects.
+
+ ``text`` is the pre-rendered terminal string (characters + SGR escapes);
+ ``width`` is the total visible column count.
+ """
+
+ cells: tuple[RenderCell, ...]
+ text: str
+ width: int
+
+ @classmethod
+ def from_cells(cls, cells: Iterable[RenderCell]) -> Self:
+ cell_tuple = tuple(cells)
+ return cls(
+ cells=cell_tuple,
+ text=render_cells(cell_tuple),
+ width=sum(cell.width for cell in cell_tuple),
+ )
+
+ @classmethod
+ def from_parts(
+ cls,
+ parts: Sequence[str],
+ widths: Sequence[int],
+ styles: Sequence[RenderStyle] | None = None,
+ ) -> Self:
+ if styles is None:
+ return cls.from_cells(
+ RenderCell(text, width)
+ for text, width in zip(parts, widths)
+ )
+
+ cells: list[RenderCell] = []
+ for text, width, style in zip(parts, widths, styles):
+ if isinstance(style, StyleRef):
+ cells.append(RenderCell(text, width, style=style))
+ elif style is None:
+ cells.append(RenderCell(text, width))
+ else:
+ cells.append(RenderCell(text, width, style=StyleRef.from_tag(style)))
+ return cls.from_cells(cells)
+
+ @classmethod
+ def from_rendered_text(cls, text: str) -> Self:
+ return cls.from_cells(_cells_from_rendered_text(text))
+
+
+@dataclass(frozen=True, slots=True)
+class ScreenOverlay:
+ """An overlay that replaces or inserts lines at a screen position.
+
+ If *insert* is True, lines are spliced in (shifting content down);
+ if False (default), lines replace existing content at *y*.
+
+ Overlays are used to display tab completion menus and status messages.
+ For example, a tab-completion menu inserted below the input::
+
+ >>> os.path.j ← line 0 (base content)
+ join ← ScreenOverlay(y=1, insert=True)
+ junction ← (pushes remaining lines down)
+ ... ← line 1 (shifted down by 2)
+ """
+ y: int
+ lines: tuple[RenderLine, ...]
+ insert: bool = False
+
+
+@dataclass(frozen=True, slots=True)
+class RenderedScreen:
+ """The complete screen state: content lines, cursor, and overlays.
+
+ ``lines`` holds the base content; ``composed_lines`` is the final
+ result after overlays (completion menus, messages) are applied::
+
+ lines: composed_lines:
+ ┌──────────────────┐ ┌──────────────────┐
+ │>>> os.path.j │ │>>> os.path.j │
+ │... │ ──► │ join │ ← overlay
+ └──────────────────┘ │... │
+ └──────────────────┘
+ """
+
+ lines: tuple[RenderLine, ...]
+ cursor: CursorXY
+ overlays: tuple[ScreenOverlay, ...] = ()
+ composed_lines: tuple[RenderLine, ...] = field(init=False, default=())
+
+ def __post_init__(self) -> None:
+ object.__setattr__(self, "composed_lines", self._compose())
+
+ def _compose(self) -> tuple[RenderLine, ...]:
+ """Apply overlays in tuple order; inserts shift subsequent positions."""
+ if not self.overlays:
+ return self.lines
+
+ lines = list(self.lines)
+ y_offset = 0
+ for overlay in self.overlays:
+ adjusted_y = overlay.y + y_offset
+ assert adjusted_y >= 0, (
+ f"Overlay y={overlay.y} with offset={y_offset} is negative; "
+ "overlays must be sorted by ascending y"
+ )
+ if overlay.insert:
+ # Splice overlay lines in, pushing existing content down.
+ lines[adjusted_y:adjusted_y] = overlay.lines
+ y_offset += len(overlay.lines)
+ else:
+ # Replace existing lines at the overlay position.
+ target_len = adjusted_y + len(overlay.lines)
+ if len(lines) < target_len:
+ lines.extend([EMPTY_RENDER_LINE] * (target_len - len(lines)))
+ for index, line in enumerate(overlay.lines):
+ lines[adjusted_y + index] = line
+ return tuple(lines)
+
+ @classmethod
+ def empty(cls) -> Self:
+ return cls((), (0, 0), ())
+
+ @classmethod
+ def from_screen_lines(
+ cls,
+ screen: Sequence[str],
+ cursor: CursorXY,
+ ) -> Self:
+ return cls(
+ tuple(RenderLine.from_rendered_text(line) for line in screen),
+ cursor,
+ (),
+ )
+
+ def with_overlay(
+ self,
+ y: int,
+ lines: Iterable[RenderLine],
+ ) -> Self:
+ return type(self)(
+ self.lines,
+ self.cursor,
+ self.overlays + (ScreenOverlay(y, tuple(lines)),),
+ )
+
+ @property
+ def screen_lines(self) -> tuple[str, ...]:
+ return tuple(line.text for line in self.composed_lines)
+
+
+@dataclass(frozen=True, slots=True)
+class LineDiff:
+ """The changed region between an old and new version of one screen row.
+
+ When the user types ``e`` so the row changes from
+ ``>>> nam`` to ``>>> name``::
+
+ >>> n a m old
+ >>> n a m e new
+ ╰─╯
+ start_cell=7, new_cells=("m","e"), old_cells=("m",)
+ """
+
+ start_cell: int
+ start_x: int
+ old_cells: tuple[RenderCell, ...]
+ new_cells: tuple[RenderCell, ...]
+ old_width: int
+ new_width: int
+
+ @property
+ def old_text(self) -> str:
+ return render_cells(self.old_cells)
+
+ @property
+ def new_text(self) -> str:
+ return render_cells(self.new_cells)
+
+ @property
+ def old_changed_width(self) -> int:
+ return sum(cell.width for cell in self.old_cells)
+
+ @property
+ def new_changed_width(self) -> int:
+ return sum(cell.width for cell in self.new_cells)
+
+
+EMPTY_RENDER_LINE = RenderLine(cells=(), text="", width=0)
+
+
+@dataclass(frozen=True, slots=True)
+class LineUpdate:
+ kind: LineUpdateKind
+ y: int
+ start_cell: int
+ start_x: int
+ """Screen x-coordinate where the update begins. Used for cursor positioning."""
+ cells: tuple[RenderCell, ...]
+ char_width: int = 0
+ clear_eol: bool = False
+ reset_to_margin: bool = False
+ """If True, the console must resync the cursor position after writing
+ (needed when cells contain non-SGR escape sequences that may move the cursor)."""
+ text: str = field(init=False, default="")
+
+ def __post_init__(self) -> None:
+ object.__setattr__(self, "text", render_cells(self.cells))
+
+
+def _controls_require_cursor_resync(controls: Sequence[str]) -> bool:
+ # Anything beyond SGR means the cursor may no longer be where we left it.
+ return any(not control.endswith("m") for control in controls)
+
+
+def requires_cursor_resync(cells: Sequence[RenderCell]) -> bool:
+ return any(_controls_require_cursor_resync(cell.controls) for cell in cells)
+
+
+def render_cells(
+ cells: Sequence[RenderCell],
+ visual_style: str | None = None,
+) -> str:
+ """Render a sequence of cells into a terminal string with SGR escapes.
+
+ Tracks the active SGR state to emit resets only when the style
+ actually changes, minimizing output bytes.
+
+ If *visual_style* is given (used by redraw visualization), it is appended
+ to every cell's style.
+ """
+ rendered: list[str] = []
+ active_escape = ""
+ for cell in cells:
+ if cell.controls:
+ rendered.extend(cell.controls)
+ if not cell.text:
+ continue
+
+ target_escape = _style_escape(cell.style)
+ if visual_style is not None:
+ target_escape += visual_style
+ if target_escape != active_escape:
+ if active_escape:
+ rendered.append("\x1b[0m")
+ if target_escape:
+ rendered.append(target_escape)
+ active_escape = target_escape
+ rendered.append(cell.text)
+
+ if active_escape:
+ rendered.append("\x1b[0m")
+ return "".join(rendered)
+
+
+def diff_render_lines(old: RenderLine, new: RenderLine) -> LineDiff | None:
+ if old == new:
+ return None
+
+ prefix = 0
+ start_x = 0
+ max_prefix = min(len(old.cells), len(new.cells))
+ while prefix < max_prefix and old.cells[prefix] == new.cells[prefix]:
+ # Stop at any cell with non-SGR controls, since those might affect
+ # cursor position and must be re-emitted.
+ if old.cells[prefix].controls:
+ break
+ start_x += old.cells[prefix].width
+ prefix += 1
+
+ old_suffix = len(old.cells)
+ new_suffix = len(new.cells)
+ while old_suffix > prefix and new_suffix > prefix:
+ old_cell = old.cells[old_suffix - 1]
+ new_cell = new.cells[new_suffix - 1]
+ if old_cell.controls or new_cell.controls or old_cell != new_cell:
+ break
+ old_suffix -= 1
+ new_suffix -= 1
+
+ # Extend diff range to include trailing zero-width combining characters,
+ # so we never render a combining char without its base character.
+ while old_suffix < len(old.cells) and old.cells[old_suffix].width == 0:
+ old_suffix += 1
+ while new_suffix < len(new.cells) and new.cells[new_suffix].width == 0:
+ new_suffix += 1
+
+ return LineDiff(
+ start_cell=prefix,
+ start_x=start_x,
+ old_cells=old.cells[prefix:old_suffix],
+ new_cells=new.cells[prefix:new_suffix],
+ old_width=old.width,
+ new_width=new.width,
+ )
if r.input_trans is r.isearch_trans:
r.do_cmd(("isearch-end", [""]))
r.pos = len(r.get_unicode())
- r.dirty = True
+ r.invalidate_full()
r.refresh()
console.write("\nKeyboardInterrupt\n")
console.resetbuffer()
line = line.format(*k, **kw)
trace_file.write(line + "\n")
trace_file.flush()
+
+
+def trace_text(text: str, limit: int = 60) -> str:
+ if len(text) > limit:
+ text = text[:limit] + "..."
+ return repr(text)
type SimpleContextManager = Iterator[None]
type KeySpec = str # like r"\C-c"
type CommandName = str # like "interrupt"
-type EventTuple = tuple[CommandName, str]
+type EventData = list[str]
+type EventTuple = tuple[CommandName, EventData]
+type CursorXY = tuple[int, int]
+type Dimensions = tuple[int, int]
+type ScreenInfoRow = tuple[int, list[int]]
+type Keymap = tuple[tuple[KeySpec, CommandName], ...]
type Completer = Callable[[str, int], str | None]
type CharBuffer = list[str]
type CharWidths = list[int]
import time
import types
import platform
+from collections.abc import Callable
+from dataclasses import dataclass
from fcntl import ioctl
+from typing import TYPE_CHECKING, cast, overload
from . import terminfo
from .console import Console, Event
from .fancy_termios import tcgetattr, tcsetattr, TermState
-from .trace import trace
+from .render import (
+ EMPTY_RENDER_LINE,
+ LineUpdate,
+ RenderLine,
+ RenderedScreen,
+ requires_cursor_resync,
+ diff_render_lines,
+ render_cells,
+)
+from .trace import trace, trace_text
from .unix_eventqueue import EventQueue
-from .utils import wlen
# declare posix optional to allow None assignment on other platforms
posix: types.ModuleType | None
except ImportError:
posix = None
-TYPE_CHECKING = False
-
# types
if TYPE_CHECKING:
- from typing import AbstractSet, IO, Literal, overload, cast
-else:
- overload = lambda func: None
- cast = lambda typ, val: val
+ from typing import AbstractSet, IO, Literal
+
+type _MoveFunc = Callable[[int, int], None]
+type _PendingWrite = tuple[str | bytes, bool]
class InvalidTerminal(RuntimeError):
poll = MinimalPoll # type: ignore[assignment]
+@dataclass(frozen=True, slots=True)
+class UnixRefreshPlan:
+ """Instructions for updating the terminal after a screen change.
+
+ After the user types ``e`` to complete ``name``::
+
+ Before: >>> def greet(nam|):
+ ▲
+ LineUpdate here: insert_char "e"
+
+ After: >>> def greet(name|):
+ ▲
+
+ Only the changed cells are sent to the terminal; unchanged rows
+ are skipped entirely.
+ """
+
+ grow_lines: int
+ """Number of blank lines to append at the bottom to accommodate new content."""
+ use_tall_mode: bool
+ """Use absolute cursor addressing via ``cup`` instead of relative moves.
+ Activated when content exceeds one screen height."""
+ offset: int
+ """Vertical scroll offset: the buffer row displayed at the top of the terminal window."""
+ reverse_scroll: int
+ """Number of lines to scroll backwards (content moves down)."""
+ forward_scroll: int
+ """Number of lines to scroll forwards (content moves up)."""
+ line_updates: tuple[LineUpdate, ...]
+ cleared_lines: tuple[int, ...]
+ """Row indices to erase (old content with no replacement)."""
+ rendered_screen: RenderedScreen
+ cursor: tuple[int, int]
+
+
class UnixConsole(Console):
+ __buffer: list[_PendingWrite]
+ __gone_tall: bool
+ __move: _MoveFunc
+ __offset: int
+
def __init__(
self,
f_in: IO[bytes] | int = 0,
self.event_queue = EventQueue(
self.input_fd, self.encoding, self.terminfo
)
- self.cursor_visible = 1
+ self.cursor_visible = True
signal.signal(signal.SIGCONT, self._sigcont_handler)
"""
self.encoding = encoding
- def refresh(self, screen, c_xy):
+ def refresh(self, rendered_screen: RenderedScreen) -> None:
"""
Refresh the console screen.
Parameters:
- - screen (list): List of strings representing the screen contents.
- - c_xy (tuple): Cursor position (x, y) on the screen.
+ - rendered_screen: Structured rendered screen contents and cursor.
"""
+ c_xy = rendered_screen.cursor
+ trace(
+ "unix.refresh start cursor={cursor} lines={lines} prev_lines={prev_lines} "
+ "offset={offset} posxy={posxy}",
+ cursor=c_xy,
+ lines=len(rendered_screen.composed_lines),
+ prev_lines=len(self._rendered_screen.composed_lines),
+ offset=self.__offset,
+ posxy=self.posxy,
+ )
+ plan = self.__plan_refresh(rendered_screen, c_xy)
+ self.__apply_refresh_plan(plan)
+
+ def __plan_refresh(
+ self,
+ rendered_screen: RenderedScreen,
+ c_xy: tuple[int, int],
+ ) -> UnixRefreshPlan:
cx, cy = c_xy
- if not self.__gone_tall:
- while len(self.screen) < min(len(screen), self.height):
- self.__hide_cursor()
- if self.screen:
- self.__move(0, len(self.screen) - 1)
- self.__write("\n")
- self.posxy = 0, len(self.screen)
- self.screen.append("")
- else:
- while len(self.screen) < len(screen):
- self.screen.append("")
+ height = self.height
+ old_offset = offset = self.__offset
+ prev_composed = self._rendered_screen.composed_lines
+ previous_lines = list(prev_composed)
+ next_lines = list(rendered_screen.composed_lines)
+ line_count = len(next_lines)
- if len(screen) > self.height:
- self.__gone_tall = 1
- self.__move = self.__move_tall
+ grow_lines = 0
+ if not self.__gone_tall:
+ grow_lines = max(
+ min(line_count, height) - len(prev_composed),
+ 0,
+ )
+ previous_lines.extend([EMPTY_RENDER_LINE] * grow_lines)
+ elif len(previous_lines) < line_count:
+ previous_lines.extend([EMPTY_RENDER_LINE] * (line_count - len(previous_lines)))
- px, py = self.posxy
- old_offset = offset = self.__offset
- height = self.height
+ use_tall_mode = self.__gone_tall or line_count > height
# we make sure the cursor is on the screen, and that we're
# using all of the screen if we can
offset = cy
elif cy >= offset + height:
offset = cy - height + 1
- elif offset > 0 and len(screen) < offset + height:
- offset = max(len(screen) - height, 0)
- screen.append("")
+ elif offset > 0 and line_count < offset + height:
+ offset = max(line_count - height, 0)
+ next_lines.append(EMPTY_RENDER_LINE)
- oldscr = self.screen[old_offset : old_offset + height]
- newscr = screen[offset : offset + height]
+ oldscr = previous_lines[old_offset : old_offset + height]
+ newscr = next_lines[offset : offset + height]
- # use hardware scrolling if we have it.
+ reverse_scroll = 0
+ forward_scroll = 0
if old_offset > offset and self._ri:
+ reverse_scroll = old_offset - offset
+ for _ in range(reverse_scroll):
+ if oldscr:
+ oldscr.pop(-1)
+ oldscr.insert(0, EMPTY_RENDER_LINE)
+ elif old_offset < offset and self._ind:
+ forward_scroll = offset - old_offset
+ for _ in range(forward_scroll):
+ if oldscr:
+ oldscr.pop(0)
+ oldscr.append(EMPTY_RENDER_LINE)
+
+ line_updates: list[LineUpdate] = []
+ px, _ = self.posxy
+ for y, oldline, newline in zip(range(offset, offset + height), oldscr, newscr):
+ update = self.__plan_changed_line(y, oldline, newline, px)
+ if update is not None:
+ line_updates.append(update)
+
+ cleared_lines = tuple(range(offset + len(newscr), offset + len(oldscr)))
+ console_rendered_screen = RenderedScreen(tuple(next_lines), c_xy)
+ trace(
+ "unix.refresh plan grow={grow} tall={tall} offset={offset} "
+ "reverse_scroll={reverse_scroll} forward_scroll={forward_scroll} "
+ "updates={updates} clears={clears}",
+ grow=grow_lines,
+ tall=use_tall_mode,
+ offset=offset,
+ reverse_scroll=reverse_scroll,
+ forward_scroll=forward_scroll,
+ updates=len(line_updates),
+ clears=len(cleared_lines),
+ )
+ return UnixRefreshPlan(
+ grow_lines=grow_lines,
+ use_tall_mode=use_tall_mode,
+ offset=offset,
+ reverse_scroll=reverse_scroll,
+ forward_scroll=forward_scroll,
+ line_updates=tuple(line_updates),
+ cleared_lines=cleared_lines,
+ rendered_screen=console_rendered_screen,
+ cursor=(cx, cy),
+ )
+
+ def __apply_refresh_plan(self, plan: UnixRefreshPlan) -> None:
+ cx, cy = plan.cursor
+ trace(
+ "unix.refresh apply cursor={cursor} updates={updates} clears={clears}",
+ cursor=plan.cursor,
+ updates=len(plan.line_updates),
+ clears=len(plan.cleared_lines),
+ )
+ visual_style = self.begin_redraw_visualization()
+ screen_line_count = len(self._rendered_screen.composed_lines)
+
+ for _ in range(plan.grow_lines):
+ self.__hide_cursor()
+ if screen_line_count:
+ self.__move(0, screen_line_count - 1)
+ self.__write("\n")
+ self.posxy = 0, screen_line_count
+ screen_line_count += 1
+
+ if plan.use_tall_mode and not self.__gone_tall:
+ self.__gone_tall = True
+ self.__move = self.__move_tall
+
+ old_offset = self.__offset
+ if plan.reverse_scroll:
self.__hide_cursor()
self.__write_code(self._cup, 0, 0)
self.posxy = 0, old_offset
- for i in range(old_offset - offset):
+ for _ in range(plan.reverse_scroll):
self.__write_code(self._ri)
- oldscr.pop(-1)
- oldscr.insert(0, "")
- elif old_offset < offset and self._ind:
+ elif plan.forward_scroll:
self.__hide_cursor()
self.__write_code(self._cup, self.height - 1, 0)
self.posxy = 0, old_offset + self.height - 1
- for i in range(offset - old_offset):
+ for _ in range(plan.forward_scroll):
self.__write_code(self._ind)
- oldscr.pop(0)
- oldscr.append("")
- self.__offset = offset
+ self.__offset = plan.offset
- for (
- y,
- oldline,
- newline,
- ) in zip(range(offset, offset + height), oldscr, newscr):
- if oldline != newline:
- self.__write_changed_line(y, oldline, newline, px)
+ for update in plan.line_updates:
+ self.__apply_line_update(update, visual_style)
- y = len(newscr)
- while y < len(oldscr):
+ for y in plan.cleared_lines:
self.__hide_cursor()
self.__move(0, y)
self.posxy = 0, y
self.__write_code(self._el)
- y += 1
self.__show_cursor()
-
- self.screen = screen.copy()
self.move_cursor(cx, cy)
self.flushoutput()
+ self.sync_rendered_screen(plan.rendered_screen, self.posxy)
- def move_cursor(self, x, y):
+ def move_cursor(self, x: int, y: int) -> None:
"""
Move the cursor to the specified position on the screen.
- y (int): Y coordinate.
"""
if y < self.__offset or y >= self.__offset + self.height:
- self.event_queue.insert(Event("scroll", None))
+ trace(
+ "unix.move_cursor offscreen x={x} y={y} offset={offset} height={height}",
+ x=x,
+ y=y,
+ offset=self.__offset,
+ height=self.height,
+ )
+ self.event_queue.insert(Event("scroll", ""))
else:
+ trace("unix.move_cursor x={x} y={y}", x=x, y=y)
self.__move(x, y)
self.posxy = x, y
self.flushoutput()
- def prepare(self):
+ def prepare(self) -> None:
"""
Prepare the console for input/output operations.
"""
+ trace("unix.prepare")
self.__buffer = []
self.__svtermstate = tcgetattr(self.input_fd)
raw.iflag |= termios.BRKINT
raw.lflag &= ~(termios.ICANON | termios.ECHO | termios.IEXTEN)
raw.lflag |= termios.ISIG
- raw.cc[termios.VMIN] = 1
- raw.cc[termios.VTIME] = 0
+ raw.cc[termios.VMIN] = b"\x01"
+ raw.cc[termios.VTIME] = b"\x00"
self.__input_fd_set(raw)
- # In macOS terminal we need to deactivate line wrap via ANSI escape code
+ # Apple Terminal will re-wrap lines for us unless we preempt the
+ # damage.
if self.is_apple_terminal:
os.write(self.output_fd, b"\033[?7l")
- self.screen = []
self.height, self.width = self.getheightwidth()
self.posxy = 0, 0
- self.__gone_tall = 0
+ self.__gone_tall = False
self.__move = self.__move_short
self.__offset = 0
+ self.sync_rendered_screen(RenderedScreen.empty(), self.posxy)
self.__maybe_write_code(self._smkx)
self.__enable_bracketed_paste()
- def restore(self):
+ def restore(self) -> None:
"""
Restore the console to the default state
"""
+ trace("unix.restore")
self.__disable_bracketed_paste()
self.__maybe_write_code(self._rmkx)
self.flushoutput()
or bool(self.pollob.poll(timeout))
)
- def set_cursor_vis(self, visible):
+ def set_cursor_vis(self, visible: bool) -> None:
"""
Set the visibility of the cursor.
"""
Finish console operations and flush the output buffer.
"""
- y = len(self.screen) - 1
- while y >= 0 and not self.screen[y]:
+ rendered_lines = self._rendered_screen.composed_lines
+ y = len(rendered_lines) - 1
+ while y >= 0 and not rendered_lines[y].text:
y -= 1
self.__move(0, min(y, self.height + self.__offset - 1))
self.__write("\n\r")
while not self.event_queue.empty():
e2 = self.event_queue.get()
e.data += e2.data
- e.raw += e.raw
+ e.raw += e2.raw
amount = struct.unpack("i", ioctl(self.input_fd, FIONREAD, b"\0\0\0\0"))[0]
trace("getpending({a})", a=amount)
while not self.event_queue.empty():
e2 = self.event_queue.get()
e.data += e2.data
- e.raw += e.raw
+ e.raw += e2.raw
amount = 10000
raw = self.__read(amount)
"""
Clear the console screen.
"""
+ trace("unix.clear")
self.__write_code(self._clear)
- self.__gone_tall = 1
+ self.__gone_tall = True
self.__move = self.__move_tall
self.posxy = 0, 0
- self.screen = []
+ self.sync_rendered_screen(RenderedScreen.empty(), self.posxy)
@property
def input_hook(self):
self.__move = self.__move_short
- def __write_changed_line(self, y, oldline, newline, px_coord):
- # this is frustrating; there's no reason to test (say)
- # self.dch1 inside the loop -- but alternative ways of
- # structuring this function are equally painful (I'm trying to
- # avoid writing code generators these days...)
- minlen = min(wlen(oldline), wlen(newline))
- x_pos = 0
- x_coord = 0
-
- px_pos = 0
- j = 0
- for c in oldline:
- if j >= px_coord:
- break
- j += wlen(c)
- px_pos += 1
-
- # reuse the oldline as much as possible, but stop as soon as we
- # encounter an ESCAPE, because it might be the start of an escape
- # sequence
- while (
- x_coord < minlen
- and oldline[x_pos] == newline[x_pos]
- and newline[x_pos] != "\x1b"
- ):
- x_coord += wlen(newline[x_pos])
- x_pos += 1
+ @staticmethod
+ def __cell_index_from_x(line: RenderLine, x_coord: int) -> int:
+ width = 0
+ index = 0
+ while index < len(line.cells) and width < x_coord:
+ width += line.cells[index].width
+ index += 1
+ return index
+
+ def __plan_changed_line(
+ self,
+ y: int,
+ oldline: RenderLine,
+ newline: RenderLine,
+ px_coord: int,
+ ) -> LineUpdate | None:
+ # NOTE: The shared replace_char / replace_span / rewrite_suffix logic
+ # is duplicated in WindowsConsole.__plan_changed_line. Keep changes to
+ # these common cases synchronised between the two files. Yes, this is
+ # duplicated on purpose; the two backends agree just enough to make a
+ # shared helper a trap. Unix-only cases (insert_char, delete_then_insert)
+ # rely on terminal capabilities (ich1/dch1) that are unavailable on
+ # Windows.
+ diff = diff_render_lines(oldline, newline)
+ if diff is None:
+ return None
- # if we need to insert a single character right after the first detected change
- if oldline[x_pos:] == newline[x_pos + 1 :] and self.ich1:
+ start_cell = diff.start_cell
+ start_x = diff.start_x
+
+ if (
+ self.ich1
+ and not diff.old_cells
+ and (visible_new_cells := tuple(
+ cell for cell in diff.new_cells if cell.width
+ ))
+ and len(visible_new_cells) == 1
+ and all(cell.width == 0 for cell in diff.new_cells[1:])
+ and oldline.cells[start_cell:] == newline.cells[start_cell + 1 :]
+ ):
+ px_cell = self.__cell_index_from_x(oldline, px_coord)
if (
y == self.posxy[1]
- and x_coord > self.posxy[0]
- and oldline[px_pos:x_pos] == newline[px_pos + 1 : x_pos + 1]
+ and start_x > self.posxy[0]
+ and oldline.cells[px_cell:start_cell]
+ == newline.cells[px_cell + 1 : start_cell + 1]
):
- x_pos = px_pos
- x_coord = px_coord
- character_width = wlen(newline[x_pos])
- self.__move(x_coord, y)
- self.__write_code(self.ich1)
- self.__write(newline[x_pos])
- self.posxy = x_coord + character_width, y
-
- # if it's a single character change in the middle of the line
- elif (
- x_coord < minlen
- and oldline[x_pos + 1 :] == newline[x_pos + 1 :]
- and wlen(oldline[x_pos]) == wlen(newline[x_pos])
+ start_cell = px_cell
+ start_x = px_coord
+ planned_cells = diff.new_cells
+ changed_cell = visible_new_cells[0]
+ return LineUpdate(
+ kind="insert_char",
+ y=y,
+ start_cell=start_cell,
+ start_x=start_x,
+ cells=planned_cells,
+ char_width=changed_cell.width,
+ reset_to_margin=requires_cursor_resync(planned_cells),
+ )
+
+ if (
+ len(diff.old_cells) == 1
+ and len(diff.new_cells) == 1
+ and diff.old_cells[0].width == diff.new_cells[0].width
):
- character_width = wlen(newline[x_pos])
- self.__move(x_coord, y)
- self.__write(newline[x_pos])
- self.posxy = x_coord + character_width, y
-
- # if this is the last character to fit in the line and we edit in the middle of the line
- elif (
+ planned_cells = diff.new_cells
+ changed_cell = planned_cells[0]
+ return LineUpdate(
+ kind="replace_char",
+ y=y,
+ start_cell=start_cell,
+ start_x=start_x,
+ cells=planned_cells,
+ char_width=changed_cell.width,
+ reset_to_margin=requires_cursor_resync(planned_cells),
+ )
+
+ if diff.old_changed_width == diff.new_changed_width:
+ planned_cells = diff.new_cells
+ return LineUpdate(
+ kind="replace_span",
+ y=y,
+ start_cell=start_cell,
+ start_x=start_x,
+ cells=planned_cells,
+ char_width=diff.new_changed_width,
+ reset_to_margin=requires_cursor_resync(planned_cells),
+ )
+
+ if (
self.dch1
and self.ich1
- and wlen(newline) == self.width
- and x_coord < wlen(newline) - 2
- and newline[x_pos + 1 : -1] == oldline[x_pos:-2]
+ and newline.width == self.width
+ and start_x < newline.width - 2
+ and newline.cells[start_cell + 1 : -1] == oldline.cells[start_cell:-2]
):
+ planned_cells = (newline.cells[start_cell],)
+ changed_cell = planned_cells[0]
+ return LineUpdate(
+ kind="delete_then_insert",
+ y=y,
+ start_cell=start_cell,
+ start_x=start_x,
+ cells=planned_cells,
+ char_width=changed_cell.width,
+ reset_to_margin=requires_cursor_resync(planned_cells),
+ )
+
+ suffix_cells = newline.cells[start_cell:]
+ return LineUpdate(
+ kind="rewrite_suffix",
+ y=y,
+ start_cell=start_cell,
+ start_x=start_x,
+ cells=suffix_cells,
+ char_width=sum(cell.width for cell in suffix_cells),
+ clear_eol=oldline.width > newline.width,
+ reset_to_margin=requires_cursor_resync(suffix_cells),
+ )
+
+ def __apply_line_update(
+ self,
+ update: LineUpdate,
+ visual_style: str | None = None,
+ ) -> None:
+ text = render_cells(update.cells, visual_style) if visual_style else update.text
+ trace(
+ "unix.refresh update kind={kind} y={y} x={x} text={text} "
+ "clear_eol={clear_eol} reset_to_margin={reset}",
+ kind=update.kind,
+ y=update.y,
+ x=update.start_x,
+ text=trace_text(text),
+ clear_eol=update.clear_eol,
+ reset=update.reset_to_margin,
+ )
+ if update.kind == "insert_char":
+ self.__move(update.start_x, update.y)
+ self.__write_code(self.ich1)
+ self.__write(text)
+ self.posxy = update.start_x + update.char_width, update.y
+ elif update.kind in {"replace_char", "replace_span"}:
+ self.__move(update.start_x, update.y)
+ self.__write(text)
+ self.posxy = update.start_x + update.char_width, update.y
+ elif update.kind == "delete_then_insert":
self.__hide_cursor()
- self.__move(self.width - 2, y)
- self.posxy = self.width - 2, y
+ self.__move(self.width - 2, update.y)
+ self.posxy = self.width - 2, update.y
self.__write_code(self.dch1)
-
- character_width = wlen(newline[x_pos])
- self.__move(x_coord, y)
+ self.__move(update.start_x, update.y)
self.__write_code(self.ich1)
- self.__write(newline[x_pos])
- self.posxy = character_width + 1, y
-
+ self.__write(text)
+ self.posxy = update.start_x + update.char_width, update.y
else:
self.__hide_cursor()
- self.__move(x_coord, y)
- if wlen(oldline) > wlen(newline):
+ self.__move(update.start_x, update.y)
+ if update.clear_eol:
self.__write_code(self._el)
- self.__write(newline[x_pos:])
- self.posxy = wlen(newline), y
+ self.__write(text)
+ self.posxy = update.start_x + update.char_width, update.y
- if "\x1b" in newline:
- # ANSI escape characters are present, so we can't assume
- # anything about the position of the cursor. Moving the cursor
- # to the left margin should work to get to a known position.
- self.move_cursor(0, y)
+ if update.reset_to_margin:
+ # Non-SGR terminal controls can affect the cursor position.
+ self.move_cursor(0, update.y)
def __write(self, text):
- self.__buffer.append((text, 0))
+ self.__buffer.append((text, False))
def __write_code(self, fmt, *args):
- self.__buffer.append((terminfo.tparm(fmt, *args), 1))
+ self.__buffer.append((terminfo.tparm(fmt, *args), True))
def __maybe_write_code(self, fmt, *args):
if fmt:
self.__write_code(self._cup, y - self.__offset, x)
def __sigwinch(self, signum, frame):
- self.event_queue.insert(Event("resize", None))
+ self.event_queue.insert(Event("resize", ""))
def __hide_cursor(self):
if self.cursor_visible:
self.__maybe_write_code(self._civis)
- self.cursor_visible = 0
+ self.cursor_visible = False
def __show_cursor(self):
if not self.cursor_visible:
self.__maybe_write_code(self._cnorm)
- self.cursor_visible = 1
+ self.cursor_visible = True
def repaint(self):
+ composed = self._rendered_screen.composed_lines
+ trace(
+ "unix.repaint gone_tall={gone_tall} screen_lines={lines} offset={offset}",
+ gone_tall=self.__gone_tall,
+ lines=len(composed),
+ offset=self.__offset,
+ )
if not self.__gone_tall:
self.posxy = 0, self.posxy[1]
self.__write("\r")
- ns = len(self.screen) * ["\000" * self.width]
- self.screen = ns
+ ns = len(composed) * ["\000" * self.width]
else:
self.posxy = 0, self.__offset
self.__move(0, self.__offset)
ns = self.height * ["\000" * self.width]
- self.screen = ns
+ self.sync_rendered_screen(
+ RenderedScreen.from_screen_lines(ns, self.posxy),
+ self.posxy,
+ )
def __tputs(self, fmt, prog=delayprog):
"""A Python implementation of the curses tputs function; the
import _colorize
from collections import deque
+from dataclasses import dataclass
from io import StringIO
from tokenize import TokenInfo as TI
from typing import Iterable, Iterator, Match, NamedTuple, Self
tag: str
+class StyledChar(NamedTuple):
+ text: str
+ width: int
+ tag: str | None = None
+
+
+def _ascii_control_repr(c: str) -> str | None:
+ code = ord(c)
+ if code < 32:
+ return "^" + chr(code + 64)
+ if code == 127:
+ return "^?"
+ return None
+
+
@functools.cache
def str_width(c: str) -> int:
if ord(c) < 128:
return False
+def iter_display_chars(
+ buffer: str,
+ colors: list[ColorSpan] | None = None,
+ start_index: int = 0,
+) -> Iterator[StyledChar]:
+ """Yield visible display characters with widths and semantic color tags.
+
+ Note: ``colors`` is consumed in place as spans are processed -- callers
+ that split a buffer across multiple calls rely on this mutation to track
+ which spans have already been handled.
+ """
+
+ if not buffer:
+ return
+
+ color_idx = 0
+ if colors:
+ while color_idx < len(colors) and colors[color_idx].span.end < start_index:
+ color_idx += 1
+
+ active_tag = None
+ if colors and color_idx < len(colors) and colors[color_idx].span.start < start_index:
+ active_tag = colors[color_idx].tag
+
+ for i, c in enumerate(buffer, start_index):
+ if colors and color_idx < len(colors) and colors[color_idx].span.start == i:
+ active_tag = colors[color_idx].tag
+
+ if control := _ascii_control_repr(c):
+ text = control
+ width = len(control)
+ elif ord(c) < 128:
+ text = c
+ width = 1
+ elif unicodedata.category(c).startswith("C"):
+ text = r"\u%04x" % ord(c)
+ width = len(text)
+ else:
+ text = c
+ width = str_width(c)
+
+ yield StyledChar(text, width, active_tag)
+
+ if colors and color_idx < len(colors) and colors[color_idx].span.end == i:
+ color_idx += 1
+ active_tag = None
+ # Check if the next span starts at the same position
+ if color_idx < len(colors) and colors[color_idx].span.start == i:
+ active_tag = colors[color_idx].tag
+
+ # Remove consumed spans so callers see the mutation
+ if color_idx > 0 and colors:
+ del colors[:color_idx]
+
+
def disp_str(
buffer: str,
colors: list[ColorSpan] | None = None,
(['\x1b[1;34mw', 'h', 'i', 'l', 'e\x1b[0m', ' ', '1', ':'], [1, 1, 1, 1, 1, 1, 1, 1])
"""
+ styled_chars = list(iter_display_chars(buffer, colors, start_index))
chars: CharBuffer = []
char_widths: CharWidths = []
-
- if not buffer:
- return chars, char_widths
-
- while colors and colors[0].span.end < start_index:
- # move past irrelevant spans
- colors.pop(0)
-
theme = THEME(force_color=force_color)
- pre_color = ""
- post_color = ""
- if colors and colors[0].span.start < start_index:
- # looks like we're continuing a previous color (e.g. a multiline str)
- pre_color = theme[colors[0].tag]
-
- for i, c in enumerate(buffer, start_index):
- if colors and colors[0].span.start == i: # new color starts now
- pre_color = theme[colors[0].tag]
-
- if c == "\x1a": # CTRL-Z on Windows
- chars.append(c)
- char_widths.append(2)
- elif ord(c) < 128:
- chars.append(c)
- char_widths.append(1)
- elif unicodedata.category(c).startswith("C"):
- c = r"\u%04x" % ord(c)
- chars.append(c)
- char_widths.append(len(c))
- else:
- chars.append(c)
- char_widths.append(str_width(c))
-
- if colors and colors[0].span.end == i: # current color ends now
- post_color = theme.reset
- colors.pop(0)
-
- chars[-1] = pre_color + chars[-1] + post_color
- pre_color = ""
- post_color = ""
- if colors and colors[0].span.start < i and colors[0].span.end > i:
- # even though the current color should be continued, reset it for now.
- # the next call to `disp_str()` will revive it.
- chars[-1] += theme.reset
+ for index, styled_char in enumerate(styled_chars):
+ previous_tag = styled_chars[index - 1].tag if index else None
+ next_tag = styled_chars[index + 1].tag if index + 1 < len(styled_chars) else None
+ prefix = theme[styled_char.tag] if styled_char.tag and styled_char.tag != previous_tag else ""
+ suffix = theme.reset if styled_char.tag and styled_char.tag != next_tag else ""
+ chars.append(prefix + styled_char.text + suffix)
+ char_widths.append(styled_char.width)
return chars, char_widths
"""
iterator = iter(iterable)
- window = deque((None, next(iterator)), maxlen=3)
+ try:
+ first = next(iterator)
+ except StopIteration:
+ return
+ window = deque((None, first), maxlen=3)
try:
for x in iterator:
window.append(x)
yield tuple(window)
- except Exception:
- raise
finally:
window.append(None)
yield tuple(window)
+
+
+@dataclass(frozen=True, slots=True)
+class StyleRef:
+ tag: str | None = None # From THEME().syntax, e.g. "keyword", "builtin"
+ sgr: str = ""
+
+ @classmethod
+ def from_tag(cls, tag: str, sgr: str = "") -> Self:
+ return cls(tag=tag, sgr=sgr)
+
+ @classmethod
+ def from_sgr(cls, sgr: str) -> Self:
+ if not sgr:
+ return cls()
+ return cls(sgr=sgr)
+
+ @property
+ def is_plain(self) -> bool:
+ return self.tag is None and not self.sgr
import ctypes
import types
+from dataclasses import dataclass
from ctypes.wintypes import (
_COORD,
WORD,
SHORT,
)
from ctypes import Structure, POINTER, Union
+from typing import TYPE_CHECKING
from .console import Event, Console
-from .trace import trace
-from .utils import wlen
+from .render import (
+ EMPTY_RENDER_LINE,
+ LineUpdate,
+ RenderLine,
+ RenderedScreen,
+ requires_cursor_resync,
+ diff_render_lines,
+ render_cells,
+)
+from .trace import trace, trace_text
from .windows_eventqueue import EventQueue
try:
except ImportError:
nt = None
-TYPE_CHECKING = False
-
if TYPE_CHECKING:
from typing import IO
class _error(Exception):
pass
+
+@dataclass(frozen=True, slots=True)
+class WindowsRefreshPlan:
+ grow_lines: int
+ offset: int
+ scroll_lines: int
+ line_updates: tuple[LineUpdate, ...]
+ cleared_lines: tuple[int, ...]
+ rendered_screen: RenderedScreen
+ cursor: tuple[int, int]
+
def _supports_vt():
try:
return nt._supports_virtual_terminal()
):
raise WinError(get_last_error())
- self.screen: list[str] = []
self.width = 80
self.height = 25
self.__offset = 0
# Console I/O is redirected, fallback...
self.out = None
- def refresh(self, screen: list[str], c_xy: tuple[int, int]) -> None:
+ def refresh(self, rendered_screen: RenderedScreen) -> None:
"""
Refresh the console screen.
Parameters:
- - screen (list): List of strings representing the screen contents.
- - c_xy (tuple): Cursor position (x, y) on the screen.
+ - rendered_screen: Structured rendered screen contents and cursor.
"""
- cx, cy = c_xy
-
- while len(self.screen) < min(len(screen), self.height):
- self._hide_cursor()
- if self.screen:
- self._move_relative(0, len(self.screen) - 1)
- self.__write("\n")
- self.posxy = 0, len(self.screen)
- self.screen.append("")
+ c_xy = rendered_screen.cursor
+ trace(
+ "windows.refresh start cursor={cursor} lines={lines} prev_lines={prev_lines} "
+ "offset={offset} posxy={posxy}",
+ cursor=c_xy,
+ lines=len(rendered_screen.composed_lines),
+ prev_lines=len(self._rendered_screen.composed_lines),
+ offset=self.__offset,
+ posxy=self.posxy,
+ )
+ plan = self.__plan_refresh(rendered_screen, c_xy)
+ self.__apply_refresh_plan(plan)
- px, py = self.posxy
- old_offset = offset = self.__offset
+ def __plan_refresh(
+ self,
+ rendered_screen: RenderedScreen,
+ c_xy: tuple[int, int],
+ ) -> WindowsRefreshPlan:
+ cx, cy = c_xy
height = self.height
+ old_offset = offset = self.__offset
+ prev_composed = self._rendered_screen.composed_lines
+ previous_lines = list(prev_composed)
+ next_lines = list(rendered_screen.composed_lines)
+ line_count = len(next_lines)
+
+ grow_lines = max(
+ min(line_count, height) - len(prev_composed),
+ 0,
+ )
+ previous_lines.extend([EMPTY_RENDER_LINE] * grow_lines)
- # we make sure the cursor is on the screen, and that we're
- # using all of the screen if we can
+ scroll_lines = 0
if cy < offset:
offset = cy
elif cy >= offset + height:
offset = cy - height + 1
scroll_lines = offset - old_offset
+ previous_lines.extend([EMPTY_RENDER_LINE] * scroll_lines)
+ elif offset > 0 and line_count < offset + height:
+ offset = max(line_count - height, 0)
+ next_lines.append(EMPTY_RENDER_LINE)
+
+ oldscr = previous_lines[old_offset : old_offset + height]
+ newscr = next_lines[offset : offset + height]
+
+ line_updates: list[LineUpdate] = []
+ px, _ = self.posxy
+ for y, oldline, newline in zip(range(offset, offset + height), oldscr, newscr):
+ update = self.__plan_changed_line(y, oldline, newline, px)
+ if update is not None:
+ line_updates.append(update)
+
+ cleared_lines = tuple(range(offset + len(newscr), offset + len(oldscr)))
+ console_rendered_screen = RenderedScreen(tuple(next_lines), c_xy)
+ trace(
+ "windows.refresh plan grow={grow} offset={offset} scroll_lines={scroll_lines} "
+ "updates={updates} clears={clears}",
+ grow=grow_lines,
+ offset=offset,
+ scroll_lines=scroll_lines,
+ updates=len(line_updates),
+ clears=len(cleared_lines),
+ )
+ return WindowsRefreshPlan(
+ grow_lines=grow_lines,
+ offset=offset,
+ scroll_lines=scroll_lines,
+ line_updates=tuple(line_updates),
+ cleared_lines=cleared_lines,
+ rendered_screen=console_rendered_screen,
+ cursor=(cx, cy),
+ )
- # Scrolling the buffer as the current input is greater than the visible
- # portion of the window. We need to scroll the visible portion and the
- # entire history
- self._scroll(scroll_lines, self._getscrollbacksize())
- self.posxy = self.posxy[0], self.posxy[1] + scroll_lines
- self.__offset += scroll_lines
+ def __apply_refresh_plan(self, plan: WindowsRefreshPlan) -> None:
+ cx, cy = plan.cursor
+ trace(
+ "windows.refresh apply cursor={cursor} updates={updates} clears={clears}",
+ cursor=plan.cursor,
+ updates=len(plan.line_updates),
+ clears=len(plan.cleared_lines),
+ )
+ visual_style = self.begin_redraw_visualization()
+ screen_line_count = len(self._rendered_screen.composed_lines)
- for i in range(scroll_lines):
- self.screen.append("")
- elif offset > 0 and len(screen) < offset + height:
- offset = max(len(screen) - height, 0)
- screen.append("")
+ for _ in range(plan.grow_lines):
+ self._hide_cursor()
+ if screen_line_count:
+ self._move_relative(0, screen_line_count - 1)
+ self.__write("\n")
+ self.posxy = 0, screen_line_count
+ screen_line_count += 1
- oldscr = self.screen[old_offset : old_offset + height]
- newscr = screen[offset : offset + height]
+ if plan.scroll_lines:
+ self._scroll(plan.scroll_lines, self._getscrollbacksize())
+ self.posxy = self.posxy[0], self.posxy[1] + plan.scroll_lines
- self.__offset = offset
+ self.__offset = plan.offset
self._hide_cursor()
- for (
- y,
- oldline,
- newline,
- ) in zip(range(offset, offset + height), oldscr, newscr):
- if oldline != newline:
- self.__write_changed_line(y, oldline, newline, px)
-
- y = len(newscr)
- while y < len(oldscr):
+ for update in plan.line_updates:
+ self.__apply_line_update(update, visual_style)
+
+ for y in plan.cleared_lines:
self._move_relative(0, y)
self.posxy = 0, y
self._erase_to_end()
- y += 1
self._show_cursor()
-
- self.screen = screen
self.move_cursor(cx, cy)
+ self.sync_rendered_screen(plan.rendered_screen, self.posxy)
@property
def input_hook(self):
if nt is not None and nt._is_inputhook_installed():
return nt._inputhook
- def __write_changed_line(
- self, y: int, oldline: str, newline: str, px_coord: int
- ) -> None:
- minlen = min(wlen(oldline), wlen(newline))
- x_pos = 0
- x_coord = 0
-
- # reuse the oldline as much as possible, but stop as soon as we
- # encounter an ESCAPE, because it might be the start of an escape
- # sequence
- while (
- x_coord < minlen
- and oldline[x_pos] == newline[x_pos]
- and newline[x_pos] != "\x1b"
+ def __plan_changed_line( # keep in sync with UnixConsole.__plan_changed_line
+ self,
+ y: int,
+ oldline: RenderLine,
+ newline: RenderLine,
+ px_coord: int,
+ ) -> LineUpdate | None:
+ diff = diff_render_lines(oldline, newline)
+ if diff is None:
+ return None
+
+ start_cell = diff.start_cell
+ start_x = diff.start_x
+ if (
+ len(diff.old_cells) == 1
+ and len(diff.new_cells) == 1
+ and diff.old_cells[0].width == diff.new_cells[0].width
):
- x_coord += wlen(newline[x_pos])
- x_pos += 1
+ changed_cell = diff.new_cells[0]
+ # Ctrl-Z (SUB) can reach here via RenderLine.from_rendered_text()
+ # for prompt/message lines, which bypasses iter_display_chars().
+ # On Windows, raw \x1a causes console cursor anomalies, so we
+ # force a cursor resync when it appears.
+ return LineUpdate(
+ kind="replace_char",
+ y=y,
+ start_cell=start_cell,
+ start_x=start_x,
+ cells=diff.new_cells,
+ char_width=changed_cell.width,
+ reset_to_margin=(
+ requires_cursor_resync(diff.new_cells)
+ or "\x1a" in changed_cell.text
+ ),
+ )
+
+ if diff.old_changed_width == diff.new_changed_width:
+ return LineUpdate(
+ kind="replace_span",
+ y=y,
+ start_cell=start_cell,
+ start_x=start_x,
+ cells=diff.new_cells,
+ char_width=diff.new_changed_width,
+ reset_to_margin=(
+ requires_cursor_resync(diff.new_cells)
+ or any("\x1a" in cell.text for cell in diff.new_cells)
+ ),
+ )
+
+ suffix_cells = newline.cells[start_cell:]
+ return LineUpdate(
+ kind="rewrite_suffix",
+ y=y,
+ start_cell=start_cell,
+ start_x=start_x,
+ cells=suffix_cells,
+ char_width=sum(cell.width for cell in suffix_cells),
+ clear_eol=oldline.width > newline.width,
+ reset_to_margin=(
+ requires_cursor_resync(suffix_cells)
+ or any("\x1a" in cell.text for cell in suffix_cells)
+ ),
+ )
- self._hide_cursor()
- self._move_relative(x_coord, y)
- if wlen(oldline) > wlen(newline):
+ def __apply_line_update(
+ self,
+ update: LineUpdate,
+ visual_style: str | None = None,
+ ) -> None:
+ text = render_cells(update.cells, visual_style) if visual_style else update.text
+ trace(
+ "windows.refresh update kind={kind} y={y} x={x} text={text} "
+ "clear_eol={clear_eol} reset_to_margin={reset}",
+ kind=update.kind,
+ y=update.y,
+ x=update.start_x,
+ text=trace_text(text),
+ clear_eol=update.clear_eol,
+ reset=update.reset_to_margin,
+ )
+ original_y = self.posxy[1]
+ self._move_relative(update.start_x, update.y)
+ if update.clear_eol:
self._erase_to_end()
- self.__write(newline[x_pos:])
- self.posxy = min(wlen(newline), self.width - 1), y
+ self.__write(text)
+ self.posxy = min(update.start_x + update.char_width, self.width - 1), update.y
- if "\x1b" in newline or y != self.posxy[1] or '\x1a' in newline:
- # ANSI escape characters are present, so we can't assume
- # anything about the position of the cursor. Moving the cursor
- # to the left margin should work to get to a known position.
- self.move_cursor(0, y)
+ if update.reset_to_margin or update.y != original_y:
+ # Non-SGR terminal controls or vertical movement require a cursor sync.
+ self.move_cursor(0, update.y)
def _scroll(
self, top: int, bottom: int, left: int | None = None, right: int | None = None
def __write(self, text: str) -> None:
if "\x1a" in text:
- text = ''.join(["^Z" if x == '\x1a' else x for x in text])
+ text = text.replace("\x1a", "^Z")
if self.out is not None:
self.out.write(text.encode(self.encoding, "replace"))
self.__write(ERASE_IN_LINE)
def prepare(self) -> None:
- trace("prepare")
- self.screen = []
+ trace("windows.prepare")
self.height, self.width = self.getheightwidth()
self.posxy = 0, 0
self.__offset = 0
+ self.sync_rendered_screen(RenderedScreen.empty(), self.posxy)
if self.__vt_support:
if not SetConsoleMode(InHandle, self.__original_input_mode | ENABLE_VIRTUAL_TERMINAL_INPUT):
self._enable_bracketed_paste()
def restore(self) -> None:
+ trace("windows.restore")
if self.__vt_support:
# Recover to original mode before running REPL
self._disable_bracketed_paste()
raise ValueError(f"Bad cursor position {x}, {y}")
if y < self.__offset or y >= self.__offset + self.height:
+ trace(
+ "windows.move_cursor offscreen x={x} y={y} offset={offset} height={height}",
+ x=x,
+ y=y,
+ offset=self.__offset,
+ height=self.height,
+ )
self.event_queue.insert(Event("scroll", ""))
else:
+ trace("windows.move_cursor x={x} y={y}", x=x, y=y)
self._move_relative(x, y)
self.posxy = x, y
def clear(self) -> None:
"""Wipe the screen"""
+ trace("windows.clear")
self.__write(CLEAR)
self.posxy = 0, 0
- self.screen = []
+ self.sync_rendered_screen(RenderedScreen.empty(), self.posxy)
def finish(self) -> None:
"""Move the cursor to the end of the display and otherwise get
ready for end. XXX could be merged with restore? Hmm."""
- y = len(self.screen) - 1
- while y >= 0 and not self.screen[y]:
+ rendered_lines = self._rendered_screen.composed_lines
+ y = len(rendered_lines) - 1
+ while y >= 0 and not rendered_lines[y].text:
y -= 1
self._move_relative(0, min(y, self.height + self.__offset - 1))
self.__write("\r\n")
# ignore SHIFT_PRESSED and special keys
continue
if ch == "\r":
- ch += "\n"
+ ch = "\n"
e.data += ch
return e
)
def repaint(self) -> None:
+ trace("windows.repaint unsupported")
raise NotImplementedError("No repaint support")
from unittest.mock import MagicMock
from _pyrepl.console import Console, Event
+from _pyrepl.render import RenderLine, RenderedScreen
from _pyrepl.readline import ReadlineAlikeReader, ReadlineConfig
from _pyrepl.simple_interact import _strip_final_indent
from _pyrepl.utils import unbracket, ANSI_ESCAPE_SEQUENCE
):
actual = clean_screen(reader) if clean else reader.screen
expected = expected.split("\n")
- self.assertListEqual(actual, expected)
+ if clean:
+ self.assertListEqual(actual, expected)
+ return
+
+ actual_lines = [RenderLine.from_rendered_text(line) for line in actual]
+ expected_lines = [RenderLine.from_rendered_text(line) for line in expected]
+ self.assertListEqual(actual_lines, expected_lines)
def multiline_input(reader: ReadlineAlikeReader, namespace: dict | None = None):
def __init__(self, events, encoding="utf-8") -> None:
self.events = iter(events)
self.encoding = encoding
- self.screen = []
+ self._rendered_screen = RenderedScreen.empty()
self.height = 100
self.width = 80
+ self.posxy = (0, 0)
+ self._redraw_visual_cycle = 0
def get_event(self, block: bool = True) -> Event | None:
return next(self.events)
def getheightwidth(self) -> tuple[int, int]:
return self.height, self.width
- def refresh(self, screen: list[str], xy: tuple[int, int]) -> None:
+ def refresh(self, rendered_screen: RenderedScreen) -> None:
pass
def prepare(self) -> None:
--- /dev/null
+from unittest import TestCase
+from _pyrepl.content import (
+ ContentFragment,
+ ContentLine,
+ PromptContent,
+ SourceLine,
+)
+from _pyrepl.layout import (
+ LayoutMap,
+ LayoutRow,
+ layout_content_lines,
+)
+
+
+def _source(text, lineno=0, start_offset=0, has_newline=True, cursor_index=None):
+ return SourceLine(lineno, text, start_offset, has_newline, cursor_index)
+
+
+def _prompt(text=">>> ", width=4):
+ return PromptContent((), text, width)
+
+
+def _body_from_text(text):
+ return tuple(ContentFragment(ch, 1) for ch in text)
+
+
+def _content_line(text, prompt=None, has_newline=True, start_offset=0):
+ if prompt is None:
+ prompt = _prompt()
+ body = _body_from_text(text)
+ source = _source(text, start_offset=start_offset, has_newline=has_newline)
+ return ContentLine(source, prompt, body)
+
+
+class TestLayoutRow(TestCase):
+ def test_width_basic(self):
+ row = LayoutRow(4, (1, 1, 1))
+ self.assertEqual(row.width, 7)
+
+ def test_width_with_suffix(self):
+ row = LayoutRow(4, (1, 1), suffix_width=1)
+ self.assertEqual(row.width, 7)
+
+ def test_screeninfo_without_suffix(self):
+ row = LayoutRow(4, (1, 1, 1))
+ prompt_w, widths = row.screeninfo
+ self.assertEqual(prompt_w, 4)
+ self.assertEqual(widths, [1, 1, 1])
+
+ def test_screeninfo_with_suffix(self):
+ row = LayoutRow(4, (1, 1), suffix_width=1)
+ prompt_w, widths = row.screeninfo
+ self.assertEqual(prompt_w, 4)
+ self.assertEqual(widths, [1, 1, 1])
+
+
+class TestLayoutMap(TestCase):
+ def test_empty(self):
+ lm = LayoutMap.empty()
+ self.assertEqual(len(lm.rows), 1)
+ self.assertEqual(lm.max_row(), 0)
+ self.assertEqual(lm.max_column(0), 0)
+
+ def test_screeninfo(self):
+ lm = LayoutMap((
+ LayoutRow(4, (1, 1, 1)),
+ LayoutRow(0, (1, 1)),
+ ))
+ info = lm.screeninfo
+ self.assertEqual(len(info), 2)
+ self.assertEqual(info[0], (4, [1, 1, 1]))
+ self.assertEqual(info[1], (0, [1, 1]))
+
+ def test_max_column(self):
+ lm = LayoutMap((LayoutRow(4, (1, 1, 1)),))
+ self.assertEqual(lm.max_column(0), 7)
+
+ def test_max_row(self):
+ lm = LayoutMap((LayoutRow(0, ()), LayoutRow(0, ())))
+ self.assertEqual(lm.max_row(), 1)
+
+ def test_pos_to_xy_empty_rows(self):
+ lm = LayoutMap(())
+ self.assertEqual(lm.pos_to_xy(0), (0, 0))
+
+ def test_pos_to_xy_single_row(self):
+ lm = LayoutMap((LayoutRow(4, (1, 1, 1), buffer_advance=3),))
+ self.assertEqual(lm.pos_to_xy(0), (4, 0))
+ self.assertEqual(lm.pos_to_xy(1), (5, 0))
+ self.assertEqual(lm.pos_to_xy(2), (6, 0))
+ self.assertEqual(lm.pos_to_xy(3), (7, 0))
+
+ def test_pos_to_xy_multi_row(self):
+ lm = LayoutMap((
+ LayoutRow(4, (1, 1, 1), buffer_advance=4),
+ LayoutRow(4, (1, 1), buffer_advance=2),
+ ))
+ # First row: pos 0-3
+ self.assertEqual(lm.pos_to_xy(0), (4, 0))
+ self.assertEqual(lm.pos_to_xy(3), (7, 0))
+ # Second row: pos 4-5
+ self.assertEqual(lm.pos_to_xy(4), (4, 1))
+ self.assertEqual(lm.pos_to_xy(5), (5, 1))
+
+ def test_pos_to_xy_past_end_clamps(self):
+ lm = LayoutMap((LayoutRow(4, (1, 1), buffer_advance=2),))
+ self.assertEqual(lm.pos_to_xy(99), (6, 0))
+
+ def test_pos_to_xy_skips_prompt_only_leading_rows(self):
+ lm = LayoutMap((
+ LayoutRow(0, (), buffer_advance=0), # leading prompt-only row
+ LayoutRow(4, (1, 1), buffer_advance=3),
+ ))
+ # pos 0 should skip the leading row and land on the real row
+ self.assertEqual(lm.pos_to_xy(0), (4, 1))
+
+ def test_pos_to_xy_with_suffix(self):
+ lm = LayoutMap((
+ LayoutRow(4, (1, 1), suffix_width=1, buffer_advance=2),
+ LayoutRow(0, (1,), buffer_advance=2),
+ ))
+ # pos=2 fits within first row's char_widths (len=2), cursor at end
+ self.assertEqual(lm.pos_to_xy(2), (6, 0))
+ # pos=3 exceeds first row, lands on second row
+ self.assertEqual(lm.pos_to_xy(3), (1, 1))
+
+ def test_xy_to_pos_empty_rows(self):
+ lm = LayoutMap(())
+ self.assertEqual(lm.xy_to_pos(0, 0), 0)
+
+ def test_xy_to_pos_single_row(self):
+ lm = LayoutMap((LayoutRow(4, (1, 1, 1), buffer_advance=3),))
+ self.assertEqual(lm.xy_to_pos(4, 0), 0)
+ self.assertEqual(lm.xy_to_pos(5, 0), 1)
+ self.assertEqual(lm.xy_to_pos(6, 0), 2)
+ self.assertEqual(lm.xy_to_pos(7, 0), 3)
+
+ def test_xy_to_pos_multi_row(self):
+ lm = LayoutMap((
+ LayoutRow(4, (1, 1, 1), buffer_advance=4),
+ LayoutRow(4, (1, 1), buffer_advance=2),
+ ))
+ self.assertEqual(lm.xy_to_pos(4, 0), 0)
+ self.assertEqual(lm.xy_to_pos(4, 1), 4)
+ self.assertEqual(lm.xy_to_pos(5, 1), 5)
+
+ def test_xy_to_pos_before_prompt_returns_zero(self):
+ lm = LayoutMap((LayoutRow(4, (1, 1), buffer_advance=2),))
+ self.assertEqual(lm.xy_to_pos(0, 0), 0)
+
+ def test_xy_to_pos_with_zero_width_chars(self):
+ # Simulates combining characters (zero-width) after a base char
+ lm = LayoutMap((LayoutRow(4, (1, 0, 1), buffer_advance=3),))
+ # x=5 is past the first char; trailing zero-width combining is included
+ self.assertEqual(lm.xy_to_pos(5, 0), 2)
+
+ def test_xy_to_pos_zero_width_skipped(self):
+ lm = LayoutMap((LayoutRow(0, (0, 1, 1), buffer_advance=3),))
+ # x=0: the zero-width char at index 0 is skipped, pos advances
+ self.assertEqual(lm.xy_to_pos(0, 0), 1)
+
+ def test_xy_to_pos_zero_width_before_target(self):
+ # Zero-width char between two normal chars; target x is past it
+ lm = LayoutMap((LayoutRow(0, (1, 0, 1), buffer_advance=3),))
+ # x=2: passes char at x=0 (w=1), skips zero-width at x=1, lands at x=2
+ self.assertEqual(lm.xy_to_pos(2, 0), 3)
+
+ def test_xy_to_pos_trailing_all_zero_width(self):
+ # All remaining chars from cursor position are zero-width
+ lm = LayoutMap((LayoutRow(0, (1, 0), buffer_advance=2),))
+ # x=1: past first char, trailing loop exhausts (all remaining are 0-width)
+ self.assertEqual(lm.xy_to_pos(1, 0), 2)
+
+
+class TestLayoutContentLines(TestCase):
+ def test_zero_width_returns_empty(self):
+ result = layout_content_lines((), 0, 0)
+ self.assertEqual(result.wrapped_rows, ())
+ self.assertEqual(result.layout_map.rows, ())
+
+ def test_negative_width_returns_empty(self):
+ result = layout_content_lines((), -1, 0)
+ self.assertEqual(result.wrapped_rows, ())
+
+ def test_single_short_line(self):
+ line = _content_line("abc")
+ result = layout_content_lines((line,), 80, 0)
+
+ self.assertEqual(len(result.wrapped_rows), 1)
+ row = result.wrapped_rows[0]
+ self.assertEqual(row.prompt_text, ">>> ")
+ self.assertEqual(row.prompt_width, 4)
+ self.assertEqual(row.suffix, "")
+ self.assertEqual(row.buffer_advance, 4) # 3 chars + newline
+
+ def test_single_line_no_newline(self):
+ line = _content_line("ab", has_newline=False)
+ result = layout_content_lines((line,), 80, 0)
+
+ self.assertEqual(len(result.wrapped_rows), 1)
+ self.assertEqual(result.wrapped_rows[0].buffer_advance, 2)
+
+ def test_empty_body(self):
+ source = _source("", has_newline=True)
+ prompt = _prompt()
+ line = ContentLine(source, prompt, ())
+ result = layout_content_lines((line,), 80, 0)
+
+ self.assertEqual(len(result.wrapped_rows), 1)
+ self.assertEqual(result.wrapped_rows[0].buffer_advance, 1) # just newline
+
+ def test_line_wraps(self):
+ # prompt ">>> " is 4 wide, terminal is 10 wide, so 6 chars fit per row
+ line = _content_line("abcdefgh") # 8 chars, needs 2 rows
+ result = layout_content_lines((line,), 10, 0)
+
+ self.assertEqual(len(result.wrapped_rows), 2)
+ first, second = result.wrapped_rows
+ self.assertEqual(first.prompt_text, ">>> ")
+ self.assertEqual(first.suffix, "\\")
+ self.assertEqual(first.suffix_width, 1)
+ # First row: 10 - 4(prompt) - 1(suffix) = 5 chars
+ self.assertEqual(first.buffer_advance, 5)
+ # Second row: continuation with no prompt
+ self.assertEqual(second.prompt_text, "")
+ self.assertEqual(second.prompt_width, 0)
+ self.assertEqual(second.buffer_advance, 4) # remaining 3 + newline
+ self.assertEqual(second.suffix, "")
+
+ def test_wrapping_forces_progress(self):
+ # When a single character is wider than available space, force 1 char
+ prompt = _prompt("P", 1)
+ body = (ContentFragment("W", 1),)
+ source = _source("W", has_newline=False)
+ line = ContentLine(source, prompt, body)
+ # width=2 means prompt(1) + char(1) = 2, which fits (< width would be
+ # false for width=2 since 1+1 >= 2), so it wraps but forces progress
+ result = layout_content_lines((line,), 2, 0)
+
+ self.assertEqual(len(result.wrapped_rows), 1)
+ self.assertEqual(result.wrapped_rows[0].buffer_advance, 1)
+
+ def test_layout_map_matches_wrapped_rows(self):
+ line = _content_line("abc")
+ result = layout_content_lines((line,), 80, 0)
+
+ self.assertEqual(len(result.layout_map.rows), len(result.wrapped_rows))
+ self.assertEqual(result.layout_map.rows[0].prompt_width, 4)
+ self.assertEqual(result.layout_map.rows[0].char_widths, (1, 1, 1))
+
+ def test_line_end_offsets(self):
+ line1 = _content_line("ab")
+ line2 = _content_line("cd")
+ result = layout_content_lines((line1, line2), 80, 0)
+
+ self.assertEqual(len(result.line_end_offsets), 2)
+ # line1: 2 chars + 1 newline = offset 3
+ self.assertEqual(result.line_end_offsets[0], 3)
+ # line2: offset 3 + 2 chars + 1 newline = 6
+ self.assertEqual(result.line_end_offsets[1], 6)
+
+ def test_start_offset_shifts_offsets(self):
+ line = _content_line("ab")
+ result = layout_content_lines((line,), 80, 10)
+
+ self.assertEqual(result.line_end_offsets[0], 13)
+
+ def test_multiple_lines(self):
+ line1 = _content_line("abc")
+ line2 = _content_line("de")
+ result = layout_content_lines((line1, line2), 80, 0)
+
+ self.assertEqual(len(result.wrapped_rows), 2)
+ self.assertEqual(result.wrapped_rows[0].buffer_advance, 4) # abc + \n
+ self.assertEqual(result.wrapped_rows[1].buffer_advance, 3) # de + \n
+
+ def test_leading_prompt_lines(self):
+ leading = (ContentFragment("header", 6),)
+ prompt = PromptContent(leading, ">>> ", 4)
+ body = _body_from_text("x")
+ source = _source("x", has_newline=False)
+ line = ContentLine(source, prompt, body)
+ result = layout_content_lines((line,), 80, 0)
+
+ # Leading line + body line
+ self.assertEqual(len(result.wrapped_rows), 2)
+ # Leading row has the fragment but no prompt
+ self.assertEqual(result.wrapped_rows[0].fragments, leading)
+ # Body row has prompt and content
+ self.assertEqual(result.wrapped_rows[1].prompt_text, ">>> ")
+
+ def test_wrapped_line_layout_rows_have_suffix(self):
+ line = _content_line("abcdefgh")
+ result = layout_content_lines((line,), 10, 0)
+
+ first_layout = result.layout_map.rows[0]
+ self.assertEqual(first_layout.suffix_width, 1)
+ second_layout = result.layout_map.rows[1]
+ self.assertEqual(second_layout.suffix_width, 0)
+
+ def test_pos_to_xy_through_layout(self):
+ line = _content_line("abc")
+ result = layout_content_lines((line,), 80, 0)
+ lm = result.layout_map
+
+ self.assertEqual(lm.pos_to_xy(0), (4, 0)) # after prompt
+ self.assertEqual(lm.pos_to_xy(1), (5, 0))
+ self.assertEqual(lm.pos_to_xy(2), (6, 0))
+ self.assertEqual(lm.pos_to_xy(3), (7, 0)) # end of line
import subprocess
import sys
import tempfile
+from functools import partial
from pkgutil import ModuleInfo
from unittest import TestCase, skipUnless, skipIf, SkipTest
from unittest.mock import Mock, patch
self.assert_screen_equal(reader, expected, clean=True)
self.assertEqual(output, expected)
+ def test_up_arrow_stays_within_recalled_multiline_entry(self):
+ code = (
+ "def fo():\n"
+ "...\n"
+ "...\n"
+ "a = 1\n"
+ "b = 2\n"
+ "x = 1\n"
+ "\n"
+ "def fo():\n"
+ "...\n"
+ "...\n"
+ "a = 1\n"
+ "b = 2\n"
+ "x = 1\n"
+ "z = 2\n"
+ "\n"
+ )
+ events = list(itertools.chain(
+ code_to_events(code),
+ [
+ Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+ Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+ ]
+ ))
+
+ reader = self.prepare_reader(events)
+ multiline_input(reader)
+ multiline_input(reader)
+
+ expected = (
+ "def fo():\n"
+ " ...\n"
+ " ...\n"
+ " a = 1\n"
+ " b = 2\n"
+ " x = 1\n"
+ " z = 2"
+ )
+ reader.more_lines = partial(more_lines, namespace=None)
+ reader.ps1 = reader.ps2 = ">>> "
+ reader.ps3 = reader.ps4 = "... "
+ try:
+ reader.prepare()
+ reader.refresh()
+
+ reader.handle1()
+ self.assertEqual(reader.historyi, 1)
+ self.assertEqual(reader.get_unicode(), expected)
+ first_cxy = reader.cxy
+
+ reader.handle1()
+ self.assertEqual(reader.historyi, 1)
+ self.assertEqual(reader.get_unicode(), expected)
+ self.assertLess(reader.cxy[1], first_cxy[1])
+ finally:
+ reader.restore()
+
def test_history_navigation_with_down_arrow(self):
events = itertools.chain(
self.assertEqual(output, "1+1")
self.assert_screen_equal(reader, "1+1", clean=True)
+ def test_history_file_embedded_nuls_are_sanitized(self):
+ reader = self.prepare_reader([])
+ wrapper = _ReadlineWrapper(reader=reader, f_in=0, f_out=1)
+ with tempfile.NamedTemporaryFile("wb", delete=False) as history_file:
+ history_file.write(b"good\n")
+ history_file.write(b"ba\0d\n")
+ history_file.write(b"line1\r\nline2\0\n")
+ filename = history_file.name
+
+ try:
+ wrapper.read_history_file(filename)
+ finally:
+ unlink(filename)
+
+ self.assertEqual(reader.history, ["good", "bad", "line1\nline2"])
+
def test_control_character(self):
events = code_to_events("c\x1d\n")
reader = self.prepare_reader(events)
output = multiline_input(reader)
self.assertEqual(output, "c\x1d")
- self.assert_screen_equal(reader, "c\x1d", clean=True)
+ self.assert_screen_equal(reader, "c^]", clean=True)
def test_history_search_backward(self):
# Test <page up> history search backward with "imp" input
class TestWindowsConsoleEolWrap(TestCase):
def _make_mock_console(self, width=80):
from _pyrepl import windows_console as wc
+ from _pyrepl.render import RenderedScreen
console = object.__new__(wc.WindowsConsole)
console.width = width
console.posxy = (0, 0)
- console.screen = [""]
+ console._rendered_screen = RenderedScreen.from_screen_lines([""], (0, 0))
console._hide_cursor = Mock()
console._show_cursor = Mock()
return console, wc
+ def _apply_changed_line(self, console, wc, y, old_line, new_line, px=0):
+ from _pyrepl.render import RenderLine
+
+ old_render = RenderLine.from_rendered_text(old_line)
+ new_render = RenderLine.from_rendered_text(new_line)
+ update = wc.WindowsConsole._WindowsConsole__plan_changed_line(
+ console, y, old_render, new_render, px
+ )
+ if update is not None:
+ wc.WindowsConsole._WindowsConsole__apply_line_update(
+ console, update
+ )
+
def test_short_line_sets_posxy_normally(self):
width = 10
y = 3
console, wc = self._make_mock_console(width=width)
- old_line = ""
- new_line = "a" * 3
- wc.WindowsConsole._WindowsConsole__write_changed_line(
- console, y, old_line, new_line, 0
- )
+ self._apply_changed_line(console, wc, y, "", "a" * 3)
self.assertEqual(console.posxy, (3, y))
def test_exact_width_line_does_not_wrap(self):
width = 10
y = 3
console, wc = self._make_mock_console(width=width)
- old_line = ""
- new_line = "a" * width
-
- wc.WindowsConsole._WindowsConsole__write_changed_line(
- console, y, old_line, new_line, 0
- )
+ self._apply_changed_line(console, wc, y, "", "a" * width)
self.assertEqual(console.posxy, (width - 1, y))
from .support import ScreenEqualMixin, code_to_events
from .support import prepare_reader, prepare_console
from _pyrepl.console import Event
+from _pyrepl.layout import LayoutMap
+from _pyrepl.readline import ReadlineAlikeReader, ReadlineConfig
from _pyrepl.reader import Reader
from _colorize import default_theme
colors = {overrides.get(k, k[0].lower()): v for k, v in default_theme.syntax.items()}
+def prepare_reader_multiline_prompt(*args, **kwargs):
+ reader = prepare_reader(*args, **kwargs)
+ del reader.get_prompt
+ reader.ps1 = "Python 3.15\n>>> "
+ reader.ps2 = "Python 3.15\n>>> "
+ reader.ps3 = "Python 3.15\n... "
+ reader.ps4 = "Python 3.15\n... "
+ reader.can_colorize = False
+ reader.paste_mode = False
+ return reader
+
+
@force_not_colorized_test_class
class TestReader(ScreenEqualMixin, TestCase):
+ def assert_multiline_prompt_screen(self, code, expected_screen, expected_cxy):
+ reader, _ = handle_all_events(
+ code_to_events(code),
+ prepare_reader=prepare_reader_multiline_prompt,
+ )
+
+ self.assertEqual(reader.screen, expected_screen)
+ self.assertEqual(reader.cxy, expected_cxy)
+
+ def test_multiline_prompt_does_not_duplicate_leading_lines(self):
+ self.assert_multiline_prompt_screen(
+ "abc",
+ ["Python 3.15", ">>> abc"],
+ (7, 1),
+ )
+
+ def test_multiline_prompt_does_not_duplicate_leading_lines_across_buffer_lines(self):
+ self.assert_multiline_prompt_screen(
+ "if x:\n y",
+ [
+ "Python 3.15",
+ ">>> if x:",
+ "Python 3.15",
+ "... y",
+ ],
+ (13, 3),
+ )
+
def test_calc_screen_wrap_simple(self):
events = code_to_events(10 * "a")
reader, _ = handle_events_narrow_console(events)
reader, _ = handle_all_events(events)
self.assert_screen_equal(reader, "aa")
+ def test_refresh_escapes_control_bytes_in_buffer(self):
+ console = prepare_console(())
+ config = ReadlineConfig(readline_completer=None)
+ reader = ReadlineAlikeReader(console=console, config=config)
+ reader.can_colorize = False
+ reader.ps1 = reader.ps2 = ">>> "
+ reader.ps3 = reader.ps4 = "... "
+ reader.buffer = ["\x00", "\x1b"]
+ reader.pos = len(reader.buffer)
+ reader.invalidate_full()
+
+ reader.refresh()
+
+ self.assert_screen_equal(reader, ">>> ^@^[")
+ self.assertEqual(reader.cxy, (8, 0))
+
def test_calc_screen_wrap_removes_after_backspace(self):
events = itertools.chain(
code_to_events(10 * "a"),
)
reader, _ = handle_all_events(events)
- self.assert_screen_equal(reader, "")
+ self.assertIn(reader.screen, ([], [""]))
def test_newline_within_block_trailing_whitespace(self):
# fmt: off
self.assertEqual(prompt, "\033[0;32m樂>\033[0m> ")
self.assertEqual(l, 5)
+ def test_prepare_with_zero_width_does_not_crash(self):
+ console = prepare_console([], width=0)
+ reader = ReadlineAlikeReader(console=console, config=ReadlineConfig())
+ reader.ps1 = ">>> "
+ reader.ps2 = ">>> "
+ reader.ps3 = "... "
+ reader.ps4 = ""
+ reader.can_colorize = False
+ reader.paste_mode = False
+
+ reader.prepare()
+
+ self.assertEqual(reader.cxy, (0, 0))
+ self.assertEqual(reader.screen, [])
+
def test_completions_updated_on_key_press(self):
namespace = {"itertools": itertools}
code = "itertools."
def test_pos2xy_with_no_columns(self):
console = prepare_console([])
reader = prepare_reader(console)
- # Simulate a resize to 0 columns
- reader.screeninfo = []
+ reader.layout = LayoutMap(())
self.assertEqual(reader.pos2xy(), (0, 0))
def test_setpos_from_xy_for_non_printing_char(self):
--- /dev/null
+from unittest import TestCase
+from _pyrepl.render import (
+ LineUpdate,
+ RenderCell,
+ RenderLine,
+ RenderedScreen,
+ ScreenOverlay,
+ StyleRef,
+ diff_render_lines,
+ render_cells,
+ requires_cursor_resync,
+)
+
+
+class TestRenderLine(TestCase):
+ def test_from_rendered_text_parses_style_state(self):
+ line = RenderLine.from_rendered_text("\x1b[31ma\x1b[0mb")
+
+ self.assertEqual(line.width, 2)
+ self.assertEqual([cell.text for cell in line.cells], ["a", "b"])
+ self.assertEqual([cell.style.sgr for cell in line.cells], ["\x1b[31m", ""])
+ self.assertEqual([cell.controls for cell in line.cells], [(), ()])
+
+ def test_from_rendered_text_round_trips_trailing_reset(self):
+ line = RenderLine.from_rendered_text("\x1b[31ma\x1b[0m")
+
+ self.assertEqual([cell.text for cell in line.cells], ["a"])
+ self.assertEqual([cell.style.sgr for cell in line.cells], ["\x1b[31m"])
+ self.assertEqual(line.text, "\x1b[31ma\x1b[0m")
+
+ def test_from_parts_accepts_style_refs(self):
+ line = RenderLine.from_parts(
+ ["d", "e", "f", " "],
+ [1, 1, 1, 1],
+ [StyleRef.from_sgr("\x1b[1;34m")] * 3 + [None],
+ )
+
+ self.assertEqual([cell.text for cell in line.cells], ["d", "e", "f", " "])
+ self.assertEqual(
+ [cell.style.sgr for cell in line.cells],
+ ["\x1b[1;34m", "\x1b[1;34m", "\x1b[1;34m", ""],
+ )
+ self.assertEqual(line.text, "\x1b[1;34mdef\x1b[0m ")
+
+ def test_from_parts_without_styles(self):
+ line = RenderLine.from_parts(["a", "b"], [1, 1])
+
+ self.assertEqual(line.text, "ab")
+ self.assertEqual(line.width, 2)
+
+ def test_from_rendered_text_empty_string(self):
+ line = RenderLine.from_rendered_text("")
+
+ self.assertEqual(line.cells, ())
+ self.assertEqual(line.text, "")
+ self.assertEqual(line.width, 0)
+
+ def test_from_rendered_text_with_non_sgr_controls(self):
+ # \x1b[H is a cursor-home control (not SGR since it doesn't end with 'm')
+ line = RenderLine.from_rendered_text("\x1b[Hx")
+
+ self.assertEqual(len(line.cells), 2)
+ self.assertEqual(line.cells[0].controls, ("\x1b[H",))
+ self.assertEqual(line.cells[0].text, "")
+ self.assertEqual(line.cells[1].text, "x")
+
+ def test_from_rendered_text_trailing_non_sgr_control(self):
+ line = RenderLine.from_rendered_text("a\x1b[K")
+
+ self.assertEqual(len(line.cells), 2)
+ self.assertEqual(line.cells[0].text, "a")
+ self.assertEqual(line.cells[1].controls, ("\x1b[K",))
+ self.assertEqual(line.cells[1].text, "")
+
+ def test_from_rendered_text_non_sgr_before_text(self):
+ # Non-SGR control immediately before text should produce a control cell
+ # then text cells.
+ line = RenderLine.from_rendered_text("\x1b[Kab")
+
+ texts = [c.text for c in line.cells]
+ self.assertEqual(texts, ["", "a", "b"])
+ self.assertEqual(line.cells[0].controls, ("\x1b[K",))
+
+
+class TestLineDiff(TestCase):
+ def test_diff_render_lines_ignores_unchanged_ansi_prefix(self):
+ old = RenderLine.from_rendered_text("\x1b[31ma\x1b[0mb")
+ new = RenderLine.from_rendered_text("\x1b[31ma\x1b[0mc")
+
+ diff = diff_render_lines(old, new)
+
+ self.assertIsNotNone(diff)
+ assert diff is not None
+ self.assertEqual(diff.start_x, 1)
+ self.assertEqual(diff.old_text, "b")
+ self.assertEqual(diff.new_text, "c")
+
+ def test_diff_render_lines_detects_single_cell_insertion(self):
+ old = RenderLine.from_rendered_text("ab")
+ new = RenderLine.from_rendered_text("acb")
+
+ diff = diff_render_lines(old, new)
+
+ self.assertIsNotNone(diff)
+ assert diff is not None
+ self.assertEqual(diff.start_x, 1)
+ self.assertEqual(diff.old_text, "")
+ self.assertEqual(diff.new_text, "c")
+
+ def test_colored_append_only_emits_new_character_and_reset(self):
+ old = RenderLine.from_rendered_text("\x1b[1mabc\x1b[0m")
+ new = RenderLine.from_rendered_text("\x1b[1mabcd\x1b[0m")
+
+ diff = diff_render_lines(old, new)
+
+ self.assertIsNotNone(diff)
+ assert diff is not None
+ self.assertEqual(diff.start_x, 3)
+ self.assertEqual(render_cells(diff.new_cells), "\x1b[1md\x1b[0m")
+
+ def test_keyword_space_inserts_only_space_after_reset(self):
+ old = RenderLine.from_parts(
+ ["d", "e", "f"],
+ [1, 1, 1],
+ ["keyword", "keyword", "keyword"],
+ )
+ new = RenderLine.from_parts(
+ ["d", "e", "f", " "],
+ [1, 1, 1, 1],
+ ["keyword", "keyword", "keyword", None],
+ )
+
+ diff = diff_render_lines(old, new)
+
+ self.assertIsNotNone(diff)
+ assert diff is not None
+ self.assertEqual(diff.start_x, 3)
+ self.assertEqual(render_cells(diff.new_cells), " ")
+
+ def test_diff_render_lines_returns_none_for_identical(self):
+ line = RenderLine.from_rendered_text("abc")
+ self.assertIsNone(diff_render_lines(line, line))
+
+ def test_diff_render_lines_breaks_on_controls_in_prefix(self):
+ old = RenderLine.from_cells([
+ RenderCell("a", 1),
+ RenderCell("", 0, controls=("\x1b[K",)),
+ RenderCell("b", 1),
+ ])
+ new = RenderLine.from_cells([
+ RenderCell("a", 1),
+ RenderCell("", 0, controls=("\x1b[K",)),
+ RenderCell("c", 1),
+ ])
+
+ diff = diff_render_lines(old, new)
+
+ self.assertIsNotNone(diff)
+ assert diff is not None
+ # Prefix scan stops at control cell, so diff starts at cell 1
+ self.assertEqual(diff.start_cell, 1)
+ self.assertEqual(diff.start_x, 1)
+
+ def test_diff_render_lines_extends_past_combining_chars(self):
+ # \u0301 is a combining acute accent (zero-width)
+ old = RenderLine.from_parts(["a", "b", "\u0301"], [1, 1, 0])
+ new = RenderLine.from_parts(["a", "c", "\u0301"], [1, 1, 0])
+
+ diff = diff_render_lines(old, new)
+
+ self.assertIsNotNone(diff)
+ assert diff is not None
+ # The combining char is included since it's zero-width
+ self.assertEqual(len(diff.new_cells), 2)
+
+ def test_diff_old_and_new_changed_width(self):
+ old = RenderLine.from_rendered_text("ab")
+ new = RenderLine.from_rendered_text("acd")
+
+ diff = diff_render_lines(old, new)
+
+ self.assertIsNotNone(diff)
+ assert diff is not None
+ self.assertEqual(diff.old_changed_width, 1)
+ self.assertEqual(diff.new_changed_width, 2)
+
+ def test_rendered_screen_round_trips_screen_lines(self):
+ screen = RenderedScreen.from_screen_lines(
+ ["a", "\x1b[31mb\x1b[0m"],
+ (0, 1),
+ )
+
+ self.assertEqual(screen.screen_lines, ("a", "\x1b[31mb\x1b[0m"))
+
+
+class TestRenderedScreen(TestCase):
+ def test_empty(self):
+ screen = RenderedScreen.empty()
+
+ self.assertEqual(screen.lines, ())
+ self.assertEqual(screen.cursor, (0, 0))
+ self.assertEqual(screen.overlays, ())
+ self.assertEqual(screen.composed_lines, ())
+
+ def test_with_overlay(self):
+ screen = RenderedScreen.from_screen_lines(["aaa", "bbb"], (0, 0))
+ overlay_line = RenderLine.from_rendered_text("xxx")
+ result = screen.with_overlay(1, [overlay_line])
+
+ self.assertEqual(len(result.overlays), 1)
+ self.assertEqual(result.composed_lines[0].text, "aaa")
+ self.assertEqual(result.composed_lines[1].text, "xxx")
+
+ def test_compose_replace_overlay(self):
+ base = RenderedScreen.from_screen_lines(["aa", "bb", "cc"], (0, 0))
+ overlay_line = RenderLine.from_rendered_text("XX")
+ screen = RenderedScreen(
+ base.lines,
+ (0, 0),
+ (ScreenOverlay(y=1, lines=(overlay_line,)),),
+ )
+
+ self.assertEqual(screen.composed_lines[0].text, "aa")
+ self.assertEqual(screen.composed_lines[1].text, "XX")
+ self.assertEqual(screen.composed_lines[2].text, "cc")
+
+ def test_compose_insert_overlay(self):
+ base = RenderedScreen.from_screen_lines(["aa", "bb"], (0, 0))
+ overlay_line = RenderLine.from_rendered_text("INS")
+ screen = RenderedScreen(
+ base.lines,
+ (0, 0),
+ (ScreenOverlay(y=1, lines=(overlay_line,), insert=True),),
+ )
+
+ self.assertEqual(len(screen.composed_lines), 3)
+ self.assertEqual(screen.composed_lines[0].text, "aa")
+ self.assertEqual(screen.composed_lines[1].text, "INS")
+ self.assertEqual(screen.composed_lines[2].text, "bb")
+
+ def test_compose_replace_extends_beyond_lines(self):
+ base = RenderedScreen.from_screen_lines(["aa"], (0, 0))
+ overlay_line = RenderLine.from_rendered_text("XX")
+ screen = RenderedScreen(
+ base.lines,
+ (0, 0),
+ (ScreenOverlay(y=1, lines=(overlay_line,)),),
+ )
+
+ self.assertEqual(len(screen.composed_lines), 2)
+ self.assertEqual(screen.composed_lines[1].text, "XX")
+
+
+class TestRenderCell(TestCase):
+ def test_terminal_text(self):
+ cell = RenderCell("x", 1, style=StyleRef.from_sgr("\x1b[32m"))
+ self.assertEqual(cell.terminal_text, "\x1b[32mx\x1b[0m")
+
+ def test_terminal_text_plain(self):
+ cell = RenderCell("y", 1)
+ self.assertEqual(cell.terminal_text, "y")
+
+
+class TestRenderCells(TestCase):
+ def test_render_cells_with_controls(self):
+ cells = [
+ RenderCell("", 0, controls=("\x1b[K",)),
+ RenderCell("a", 1),
+ ]
+ result = render_cells(cells)
+ self.assertEqual(result, "\x1b[Ka")
+
+ def test_render_cells_skips_empty_text(self):
+ cells = [
+ RenderCell("", 0),
+ RenderCell("a", 1),
+ ]
+ result = render_cells(cells)
+ self.assertEqual(result, "a")
+
+ def test_render_cells_with_visual_style(self):
+ cells = [RenderCell("a", 1)]
+ result = render_cells(cells, visual_style="\x1b[7m")
+ self.assertEqual(result, "\x1b[7ma\x1b[0m")
+
+
+class TestLineUpdate(TestCase):
+ def test_post_init_renders_text(self):
+ cells = (RenderCell("a", 1), RenderCell("b", 1))
+ update = LineUpdate(
+ kind="insert_char",
+ y=0,
+ start_cell=0,
+ start_x=0,
+ cells=cells,
+ )
+ self.assertEqual(update.text, "ab")
+
+
+class TestRequiresCursorResync(TestCase):
+ def test_no_controls(self):
+ cells = [RenderCell("a", 1)]
+ self.assertFalse(requires_cursor_resync(cells))
+
+ def test_sgr_only_does_not_require_resync(self):
+ cells = [RenderCell("", 0, controls=("\x1b[31m",))]
+ self.assertFalse(requires_cursor_resync(cells))
+
+ def test_non_sgr_requires_resync(self):
+ cells = [RenderCell("", 0, controls=("\x1b[H",))]
+ self.assertTrue(requires_cursor_resync(cells))
import threading
import unittest
from functools import partial
-from test.support import os_helper, force_not_colorized_test_class
+from test.support import force_color, os_helper, force_not_colorized_test_class
from test.support import threading_helper
from unittest import TestCase
from unittest.mock import MagicMock, call, patch, ANY, Mock
-from .support import handle_all_events, code_to_events
+from .support import handle_all_events, code_to_events, more_lines
try:
from _pyrepl.console import Event
@patch("os.write")
@force_not_colorized_test_class
class TestConsole(TestCase):
+ @staticmethod
+ def _prepare_reader_with_prompts(console, **kwargs):
+ from _pyrepl.readline import ReadlineAlikeReader, ReadlineConfig
+
+ config = ReadlineConfig(
+ readline_completer=kwargs.pop("readline_completer", None)
+ )
+ reader = ReadlineAlikeReader(console=console, config=config)
+ reader.paste_mode = False
+ for key, val in kwargs.items():
+ setattr(reader, key, val)
+ return reader
+
+ def test_colorized_multiline_typing_does_not_redraw_previous_line(self, _os_write):
+ def prepare_reader_with_prompts(console, **kwargs):
+ reader = self._prepare_reader_with_prompts(console, **kwargs)
+ reader.more_lines = partial(more_lines, namespace=None)
+ return reader
+
+ with force_color(True):
+ events = itertools.chain(
+ code_to_events("def foo():"),
+ [Event(evt="key", data="\n", raw=bytearray(b"\n"))],
+ code_to_events("x = 1"),
+ [Event(evt="key", data="\n", raw=bytearray(b"\n"))],
+ code_to_events("y"),
+ )
+ _, con = handle_all_events(
+ events,
+ prepare_console=unix_console,
+ prepare_reader=prepare_reader_with_prompts,
+ )
+ con.restore()
+
+ self.assertNotIn(
+ call(ANY, b" \x1b[0m x \x1b[0m=\x1b[0m "),
+ _os_write.mock_calls,
+ )
+ self.assertIn(call(ANY, b"y"), _os_write.mock_calls)
+
def test_no_newline(self, _os_write):
code = "1"
events = code_to_events(code)
def _mock_console_init(self, f_in=0, f_out=1, term="", encoding="utf-8"):
"""Mock __init__ to avoid real Windows API calls in headless environments."""
super(WindowsConsole, self).__init__(f_in, f_out, term, encoding)
- self.screen = []
self.width = 80
self.height = 25
self._WindowsConsole__offset = 0
call(self.move_left(5)),
call(self.move_up()),
call(b"def f():"),
- call(self.move_left(3)),
+ call(self.move_left(8)),
+ call(self.move_right(5)),
call(self.move_down()),
]
)