import importlib
import os
import pkgutil
+import re
import sys
import token
import tokenize
TYPE_CHECKING = False
if TYPE_CHECKING:
+ from types import ModuleType
from typing import Any, Iterable, Iterator, Mapping
+ from .types import CompletionAction
HARDCODED_SUBMODULES = {
"xml.parsers.expat": ["errors", "model"],
}
+AUTO_IMPORT_DENYLIST = {
+ # Standard library modules/submodules that have import side effects
+ # and must not be automatically imported to complete attributes
+ re.compile(r"antigravity"), # Calls webbrowser.open
+ re.compile(r"idlelib\..+"), # May open IDLE GUI
+ re.compile(r"test\..+"), # Various side-effects
+ re.compile(r"this"), # Prints to stdout
+ re.compile(r"_ios_support"), # Spawns a subprocess
+ re.compile(r".+\.__main__"), # Should not be imported
+}
+
def make_default_module_completer() -> ModuleCompleter:
# Inside pyrepl, __package__ is set to None by default
def __init__(self, namespace: Mapping[str, Any] | None = None) -> None:
self.namespace = namespace or {}
self._global_cache: list[pkgutil.ModuleInfo] = []
+ self._failed_imports: set[str] = set()
self._curr_sys_path: list[str] = sys.path[:]
self._stdlib_path = os.path.dirname(importlib.__path__[0])
- def get_completions(self, line: str) -> list[str] | None:
- """Return the next possible import completions for 'line'."""
+ def get_completions(self, line: str) -> tuple[list[str], CompletionAction | None] | None:
+ """Return the next possible import completions for 'line'.
+
+ For attributes completion, if the module to complete from is not
+ imported, also return an action (prompt + callback to run if the
+ user press TAB again) to import the module.
+ """
result = ImportParser(line).parse()
if not result:
return None
except Exception:
# Some unexpected error occurred, make it look like
# no completions are available
- return []
+ return [], None
- def complete(self, from_name: str | None, name: str | None) -> list[str]:
+ def complete(self, from_name: str | None, name: str | None) -> tuple[list[str], CompletionAction | None]:
if from_name is None:
# import x.y.z<tab>
assert name is not None
path, prefix = self.get_path_and_prefix(name)
modules = self.find_modules(path, prefix)
- return [self.format_completion(path, module) for module in modules]
+ return [self.format_completion(path, module) for module in modules], None
if name is None:
# from x.y.z<tab>
path, prefix = self.get_path_and_prefix(from_name)
modules = self.find_modules(path, prefix)
- return [self.format_completion(path, module) for module in modules]
+ return [self.format_completion(path, module) for module in modules], None
# from x.y import z<tab>
- return self.find_modules(from_name, name)
+ submodules = self.find_modules(from_name, name)
+ attributes, action = self.find_attributes(from_name, name)
+ return sorted({*submodules, *attributes}), action
def find_modules(self, path: str, prefix: str) -> list[str]:
"""Find all modules under 'path' that start with 'prefix'."""
if self.is_suggestion_match(module.name, prefix)]
return sorted(builtin_modules + third_party_modules)
- if path.startswith('.'):
- # Convert relative path to absolute path
- package = self.namespace.get('__package__', '')
- path = self.resolve_relative_name(path, package) # type: ignore[assignment]
- if path is None:
- return []
+ path = self._resolve_relative_path(path) # type: ignore[assignment]
+ if path is None:
+ return []
modules: Iterable[pkgutil.ModuleInfo] = self.global_cache
imported_module = sys.modules.get(path.split('.')[0])
if imported_module:
- # Filter modules to those who name and specs match the
+ # Filter modules to those whose name and specs match the
# imported module to avoid invalid suggestions
spec = imported_module.__spec__
if spec:
+ def _safe_find_spec(mod: pkgutil.ModuleInfo) -> bool:
+ try:
+ return mod.module_finder.find_spec(mod.name, None) == spec
+ except Exception:
+ return False
modules = [mod for mod in modules
if mod.name == spec.name
- and mod.module_finder.find_spec(mod.name, None) == spec]
+ and _safe_find_spec(mod)]
else:
modules = []
return (isinstance(module_info.module_finder, FileFinder)
and module_info.module_finder.path == self._stdlib_path)
+ def find_attributes(self, path: str, prefix: str) -> tuple[list[str], CompletionAction | None]:
+ """Find all attributes of module 'path' that start with 'prefix'."""
+ attributes, action = self._find_attributes(path, prefix)
+ # Filter out invalid attribute names
+ # (for example those containing dashes that cannot be imported with 'import')
+ return [attr for attr in attributes if attr.isidentifier()], action
+
+ def _find_attributes(self, path: str, prefix: str) -> tuple[list[str], CompletionAction | None]:
+ path = self._resolve_relative_path(path) # type: ignore[assignment]
+ if path is None:
+ return [], None
+
+ imported_module = sys.modules.get(path)
+ if not imported_module:
+ if path in self._failed_imports: # Do not propose to import again
+ return [], None
+ imported_module = self._maybe_import_module(path)
+ if not imported_module:
+ return [], self._get_import_completion_action(path)
+ try:
+ module_attributes = dir(imported_module)
+ except Exception:
+ module_attributes = []
+ return [attr_name for attr_name in module_attributes
+ if self.is_suggestion_match(attr_name, prefix)], None
+
def is_suggestion_match(self, module_name: str, prefix: str) -> bool:
if prefix:
return module_name.startswith(prefix)
return f'{path}{module}'
return f'{path}.{module}'
+ def _resolve_relative_path(self, path: str) -> str | None:
+ """Resolve a relative import path to absolute. Returns None if unresolvable."""
+ if path.startswith('.'):
+ package = self.namespace.get('__package__', '')
+ return self.resolve_relative_name(path, package)
+ return path
+
def resolve_relative_name(self, name: str, package: str) -> str | None:
"""Resolve a relative module name to an absolute name.
if not self._global_cache or self._curr_sys_path != sys.path:
self._curr_sys_path = sys.path[:]
self._global_cache = list(pkgutil.iter_modules())
+ self._failed_imports.clear() # retry on sys.path change
return self._global_cache
+ def _maybe_import_module(self, fqname: str) -> ModuleType | None:
+ if any(pattern.fullmatch(fqname) for pattern in AUTO_IMPORT_DENYLIST):
+ # Special-cased modules with known import side-effects
+ return None
+ root = fqname.split(".")[0]
+ mod_info = next((m for m in self.global_cache if m.name == root), None)
+ if not mod_info or not self._is_stdlib_module(mod_info):
+ # Only import stdlib modules (no risk of import side-effects)
+ return None
+ try:
+ return importlib.import_module(fqname)
+ except Exception:
+ sys.modules.pop(fqname, None) # Clean half-imported module
+ return None
+
+ def _get_import_completion_action(self, path: str) -> CompletionAction:
+ prompt = ("[ module not imported, press again to import it "
+ "and propose attributes ]")
+
+ def _do_import() -> str | None:
+ try:
+ importlib.import_module(path)
+ return None
+ except Exception as exc:
+ sys.modules.pop(path, None) # Clean half-imported module
+ self._failed_imports.add(path)
+ return f"[ error during import: {exc} ]"
+
+ return (prompt, _do_import)
+
class ImportParser:
"""
+import contextlib
import importlib
import io
import itertools
from pkgutil import ModuleInfo
from unittest import TestCase, skipUnless, skipIf, SkipTest
from unittest.mock import Mock, patch
-from test.support import force_not_colorized, make_clean_env, Py_DEBUG
+import warnings
+from test.support import (
+ captured_stdout,
+ captured_stderr,
+ force_not_colorized,
+ make_clean_env,
+ Py_DEBUG,
+)
from test.support import has_subprocess_support, SHORT_TIMEOUT, STDLIB_DIR
from test.support.import_helper import import_module
from test.support.os_helper import EnvironmentVarGuard, unlink
import readline as readline_module
except ImportError:
readline_module = None
+try:
+ import tkinter
+except ImportError:
+ tkinter = None
class ReplTestCase(TestCase):
reader = ReadlineAlikeReader(console=console, config=config)
return reader
- def test_import_completions(self):
+ @patch.dict(sys.modules,
+ {"importlib.resources": object()}) # don't propose to import it
+ def test_completions(self):
cases = (
("import path\t\n", "import pathlib"),
("import importlib.\t\tres\t\n", "import importlib.resources"),
# Return public methods by default
("from foo import \t\n", "from foo import public"),
# Return private methods if explicitly specified
- ("from foo import _\t\n", "from foo import _private"),
+ ("from foo import _p\t\n", "from foo import _private"),
)
for code, expected in cases:
with self.subTest(code=code):
output = reader.readline()
self.assertEqual(output, expected)
- def test_relative_import_completions(self):
+ def test_relative_completions(self):
cases = (
(None, "from .readl\t\n", "from .readl"),
(None, "from . import readl\t\n", "from . import readl"),
("_pyrepl", "from .readl\t\n", "from .readline"),
("_pyrepl", "from . import readl\t\n", "from . import readline"),
+ ("_pyrepl", "from .readline import mul\t\n", "from .readline import multiline_input"),
("_pyrepl", "from .. import toodeep\t\n", "from .. import toodeep"),
("concurrent", "from .futures.i\t\n", "from .futures.interpreter"),
)
cases = (
("import pri\t\n", "import pri"),
("from pri\t\n", "from pri"),
- ("from typing import Na\t\n", "from typing import Na"),
+ ("from typong import Na\t\n", "from typong import Na"),
)
for code, expected in cases:
with self.subTest(code=code):
with (tempfile.TemporaryDirectory() as _dir1,
patch.object(sys, "path", [_dir1, *sys.path])):
dir1 = pathlib.Path(_dir1)
- (dir1 / "mod_aa.py").mkdir()
- (dir1 / "mod_bb.py").mkdir()
+ (dir1 / "mod_aa.py").touch()
+ (dir1 / "mod_bb.py").touch()
events = code_to_events("import mod_a\t\nimport mod_b\t\n")
reader = self.prepare_reader(events, namespace={})
output_1, output_2 = reader.readline(), reader.readline()
def test_hardcoded_stdlib_submodules(self):
cases = (
("import collections.\t\n", "import collections.abc"),
- ("from os import \t\n", "from os import path"),
+ ("import os.\t\n", "import os.path"),
("import math.\t\n", "import math.integer"),
("import xml.parsers.expat.\t\te\t\n\n", "import xml.parsers.expat.errors"),
("from xml.parsers.expat import \t\tm\t\n\n", "from xml.parsers.expat import model"),
self.assertEqual(output, f"import {mod}.")
del sys.modules[mod]
+ @patch.dict(sys.modules)
+ def test_attribute_completion(self):
+ with tempfile.TemporaryDirectory() as _dir:
+ dir = pathlib.Path(_dir)
+ (dir / "foo.py").write_text("bar = 42")
+ (dir / "bar.py").write_text("baz = 42")
+ (dir / "pack").mkdir()
+ (dir / "pack" / "__init__.py").write_text("attr = 42")
+ (dir / "pack" / "foo.py").touch()
+ (dir / "pack" / "bar.py").touch()
+ (dir / "pack" / "baz.py").touch()
+ sys.modules.pop("graphlib", None) # test modules may have been imported by previous tests
+ sys.modules.pop("antigravity", None)
+ sys.modules.pop("unittest.__main__", None)
+ with patch.object(sys, "path", [_dir, *sys.path]):
+ pkgutil.get_importer(_dir).invalidate_caches()
+ importlib.import_module("bar")
+ cases = (
+ # needs 2 tabs to import (show prompt, then import)
+ ("from foo import \t\n", "from foo import ", set()),
+ ("from foo import \t\t\n", "from foo import bar", {"foo"}),
+ ("from foo import ba\t\n", "from foo import ba", set()),
+ ("from foo import ba\t\t\n", "from foo import bar", {"foo"}),
+ # reset if a character is inserted between tabs
+ ("from foo import \tb\ta\t\n", "from foo import ba", set()),
+ # packages: needs 3 tabs ([ not unique ], prompt, import)
+ ("from pack import \t\t\n", "from pack import ", set()),
+ ("from pack import \t\t\t\n", "from pack import ", {"pack"}),
+ ("from pack import \t\t\ta\t\n", "from pack import attr", {"pack"}),
+ # one match: needs 2 tabs (insert + show prompt, import)
+ ("from pack import f\t\n", "from pack import foo", set()),
+ ("from pack import f\t\t\n", "from pack import foo", {"pack"}),
+ # common prefix: needs 3 tabs (insert + [ not unique ], prompt, import)
+ ("from pack import b\t\n", "from pack import ba", set()),
+ ("from pack import b\t\t\n", "from pack import ba", set()),
+ ("from pack import b\t\t\t\n", "from pack import ba", {"pack"}),
+ # module already imported
+ ("from bar import b\t\n", "from bar import baz", set()),
+ # stdlib modules are automatically imported
+ ("from graphlib import T\t\n", "from graphlib import TopologicalSorter", {"graphlib"}),
+ # except those with known side-effects
+ ("from antigravity import g\t\n", "from antigravity import g", set()),
+ ("from unittest.__main__ import \t\n", "from unittest.__main__ import ", set()),
+ )
+ for code, expected, expected_imports in cases:
+ with self.subTest(code=code), patch.dict(sys.modules):
+ _imported = set(sys.modules.keys())
+ events = code_to_events(code)
+ reader = self.prepare_reader(events, namespace={})
+ output = reader.readline()
+ self.assertEqual(output, expected)
+ new_imports = sys.modules.keys() - _imported
+ self.assertEqual(new_imports, expected_imports)
+
+ @patch.dict(sys.modules)
+ def test_attribute_completion_error_on_import(self):
+ with tempfile.TemporaryDirectory() as _dir:
+ dir = pathlib.Path(_dir)
+ (dir / "foo.py").write_text("bar = 42")
+ (dir / "boom.py").write_text("1 <> 2")
+ with patch.object(sys, "path", [_dir, *sys.path]):
+ cases = (
+ ("from boom import \t\t\n", "from boom import "),
+ ("from foo import \t\t\n", "from foo import bar"), # still working
+ )
+ for code, expected in cases:
+ with self.subTest(code=code):
+ events = code_to_events(code)
+ reader = self.prepare_reader(events, namespace={})
+ output = reader.readline()
+ self.assertEqual(output, expected)
+ self.assertNotIn("boom", sys.modules)
+
+ @patch.dict(sys.modules)
+ def test_attribute_completion_error_on_attributes_access(self):
+ with tempfile.TemporaryDirectory() as _dir:
+ dir = pathlib.Path(_dir)
+ (dir / "boom").mkdir()
+ (dir / "boom"/"__init__.py").write_text("def __dir__(): raise ValueError()")
+ (dir / "boom"/"submodule.py").touch()
+ with patch.object(sys, "path", [_dir, *sys.path]):
+ events = code_to_events("from boom import \t\t\n") # trigger import
+ reader = self.prepare_reader(events, namespace={})
+ output = reader.readline()
+ self.assertIn("boom", sys.modules)
+ # ignore attributes, just propose submodule
+ self.assertEqual(output, "from boom import submodule")
+
+ @patch.dict(sys.modules)
+ def test_attribute_completion_private_and_invalid_names(self):
+ with tempfile.TemporaryDirectory() as _dir:
+ dir = pathlib.Path(_dir)
+ (dir / "foo.py").write_text("_secret = 'bar'")
+ with patch.object(sys, "path", [_dir, *sys.path]):
+ mod = importlib.import_module("foo")
+ mod.__dict__["invalid-identifier"] = "baz"
+ cases = (
+ ("from foo import \t\n", "from foo import "),
+ ("from foo import _s\t\n", "from foo import _secret"),
+ ("from foo import inv\t\n", "from foo import inv"),
+ )
+ for code, expected in cases:
+ with self.subTest(code=code):
+ events = code_to_events(code)
+ reader = self.prepare_reader(events, namespace={})
+ output = reader.readline()
+ self.assertEqual(output, expected)
+
+
def test_get_path_and_prefix(self):
cases = (
('', ('', '')),
with self.subTest(code=code):
self.assertEqual(actual, None)
+ @patch.dict(sys.modules)
+ def test_suggestions_and_messages(self) -> None:
+ # more unitary tests checking the exact suggestions provided
+ # (sorting, de-duplication, import action...)
+ _prompt = ("[ module not imported, press again to import it "
+ "and propose attributes ]")
+ _error = "[ error during import: division by zero ]"
+ with tempfile.TemporaryDirectory() as _dir:
+ dir = pathlib.Path(_dir)
+ (dir / "foo.py").write_text("bar = 42")
+ (dir / "boom.py").write_text("1/0")
+ (dir / "pack").mkdir()
+ (dir / "pack" / "__init__.py").write_text("foo = 1; bar = 2;")
+ (dir / "pack" / "bar.py").touch()
+ sys.modules.pop("graphlib", None) # test modules may have been imported by previous tests
+ sys.modules.pop("string.templatelib", None)
+ with patch.object(sys, "path", [_dir, *sys.path]):
+ pkgutil.get_importer(_dir).invalidate_caches()
+ # NOTE: Cases are intentionally sequential and share completer
+ # state. Earlier cases may import modules that later cases
+ # depend on. Do NOT reorder without understanding dependencies.
+ cases = (
+ # no match != not an import
+ ("import nope", ([], None), set()),
+ ("improt nope", None, set()),
+ # names sorting
+ ("import col", (["collections", "colorsys"], None), set()),
+ # module auto-import
+ ("import fo", (["foo"], None), set()),
+ ("from foo import ", ([], (_prompt, None)), {"foo"}),
+ ("from foo import ", (["bar"], None), set()), # now imported
+ ("from foo import ba", (["bar"], None), set()),
+ # error during import
+ ("from boom import ", ([], (_prompt, _error)), set()),
+ ("from boom import ", ([], None), set()), # do not retry
+ # packages
+ ("from collections import a", (["abc"], None), set()),
+ ("from pack import ", (["bar"], (_prompt, None)), {"pack"}),
+ ("from pack import ", (["bar", "foo"], None), set()),
+ ("from pack.bar import ", ([], (_prompt, None)), {"pack.bar"}),
+ ("from pack.bar import ", ([], None), set()),
+ # stdlib = auto-imported
+ ("from graphlib import T", (["TopologicalSorter"], None), {"graphlib"}),
+ ("from string.templatelib import c", (["convert"], None), {"string.templatelib"}),
+ )
+ completer = ModuleCompleter()
+ for i, (code, expected, expected_imports) in enumerate(cases):
+ with self.subTest(code=code, i=i):
+ _imported = set(sys.modules.keys())
+ result = completer.get_completions(code)
+ self.assertEqual(result is None, expected is None)
+ if result:
+ compl, act = result
+ self.assertEqual(compl, expected[0])
+ self.assertEqual(act is None, expected[1] is None)
+ if act:
+ msg, func = act
+ self.assertEqual(msg, expected[1][0])
+ act_result = func()
+ self.assertEqual(act_result, expected[1][1])
+
+ new_imports = sys.modules.keys() - _imported
+ self.assertSetEqual(new_imports, expected_imports)
+
+
+# Audit hook used to check for stdlib modules import side-effects
+# Defined globally to avoid adding one hook per test run (refleak)
+_audit_events: set[str] | None = None
+
+
+def _hook(name: str, _args: tuple):
+ if _audit_events is not None: # No-op when not activated
+ _audit_events.add(name)
+sys.addaudithook(_hook)
+
+
+@contextlib.contextmanager
+def _capture_audit_events():
+ global _audit_events
+ _audit_events = set()
+ try:
+ yield _audit_events
+ finally:
+ _audit_events = None
+
+
+class TestModuleCompleterAutomaticImports(TestCase):
+ def test_no_side_effects(self):
+ from test.test___all__ import AllTest # TODO: extract to a helper?
+
+ completer = ModuleCompleter()
+ for _, modname in AllTest().walk_modules(completer._stdlib_path, ""):
+ with self.subTest(modname=modname):
+ with (captured_stdout() as out,
+ captured_stderr() as err,
+ _capture_audit_events() as audit_events,
+ (patch("tkinter._tkinter.create") if tkinter
+ else contextlib.nullcontext()) as tk_mock,
+ warnings.catch_warnings(action="ignore")):
+ completer._maybe_import_module(modname)
+ # Test no module is imported that
+ # 1. prints any text
+ self.assertEqual(out.getvalue(), "")
+ self.assertEqual(err.getvalue(), "")
+ # 2. spawn any subprocess (eg. webbrowser.open)
+ self.assertNotIn("subprocess.Popen", audit_events)
+ # 3. launch a Tk window
+ if tk_mock is not None:
+ tk_mock.assert_not_called()
+
class TestHardcodedSubmodules(TestCase):
+ @patch.dict(sys.modules)
def test_hardcoded_stdlib_submodules_are_importable(self):
for parent_path, submodules in HARDCODED_SUBMODULES.items():
for module_name in submodules: