return d
-# Accumulated entry points from .start files across all site-packages
-# directories. Execution is deferred until all paths in .pth files have been
-# appended to sys.path. Map the .pth/.start file the data is found in to the
-# data.
-_pending_entrypoints = {}
-_pending_syspaths = {}
-_pending_importexecs = {}
+# PEP 829 implementation notes.
+#
+# Startup information (.pth and .start file information) can be processed in
+# implicit or explicit batches. Implicit batches are handled by the site.py
+# machinery automatically, while explicit batches are driven by user code and
+# processed on boundaries defined by that code.
+#
+# addsitedir() calls which use the default defer_processing_start_files=False
+# are self-contained: they create a per-call _StartupState, populate it from
+# the site directory's .pth/.start files, run process() on it, and then throw
+# the state away. This is implicit batching and in that case the
+# _startup_state global variable stays None.
+#
+# main() needs different semantics: it accumulates state across multiple
+# addsitedir() calls (user-site plus all global site-packages) so that
+# every sys.path extension is visible *before* any startup code (.pth
+# import lines and .start entry points) runs. Callers opt into this by
+# passing defer_processing_start_files=True, which preserves the _StartupState
+# into the global _startup_state. Subsequent addsitedir() calls (with
+# or without defer_processing_start_files=True) then write into that
+# same shared state, and a later process_startup_files() call flushes
+# all the state and resets the global to None.
+#
+# Here's the CRITICAL reentrancy invariant: process_startup_files() must clear
+# the global _startup_state *before* calling state.process(), so that any
+# reentrant site.addsitedir() calls reached from an exec'd .pth import line or
+# a .start entry point falls into the per-call branch and gets its own fresh
+# state. Otherwise the recursive addsitedir() would mutate the very dicts
+# that the outer state.process() is iterating. This is the bug reported in
+# gh-149504.
+_startup_state = None
def _read_pthstart_file(sitedir, name, suffix):
return None, filename
try:
- # Accept BOM markers in .start and .pth files as we do in source files (Windows PowerShell
- # 5.1 makes it hard to emit UTF-8 files without a BOM).
+ # Accept BOM markers in .start and .pth files as we do in source files
+ # (Windows PowerShell 5.1 makes it hard to emit UTF-8 files without a BOM).
content = raw_content.decode("utf-8-sig")
except UnicodeDecodeError:
_trace(f"Cannot read {filename!r} as UTF-8.")
- # For .pth files only, and then only until Python 3.20, fallback to locale encoding for
- # backward compatibility.
+ # For .pth files only, and then only until Python 3.20, fall back to
+ # locale encoding for backward compatibility.
_warn_future_us(
".pth files decoded to locale encoding as a fallback",
remove=(3, 20)
return content.splitlines(), filename
-def _read_pth_file(sitedir, name, known_paths):
- """Parse a .pth file, accumulating sys.path extensions and import lines.
-
- Errors on individual lines do not abort processing of the rest of the
- file (PEP 829).
- """
- lines, filename = _read_pthstart_file(sitedir, name, ".pth")
- if lines is None:
- return
-
- for n, line in enumerate(lines, 1):
- line = line.strip()
- if not line or line.startswith("#"):
- continue
-
- # In Python 3.18 and 3.19, `import` lines are silently ignored. In
- # Python 3.20 and beyond, issue a warning when `import` lines in .pth
- # files are detected.
- if line.startswith(("import ", "import\t")):
- _warn_future_us(
- "import lines in .pth files are silently ignored",
- remove=(3, 18)
- )
- _warn_future_us(
- "import lines in .pth files are noisily ignored",
- remove=(3, 20)
- )
- _pending_importexecs.setdefault(filename, []).append(line)
- continue
-
- try:
- dir_, dircase = makepath(sitedir, line)
- except Exception as exc:
- _trace(f"Error in {filename!r}, line {n:d}: {line!r}", exc)
- continue
-
- if dircase in known_paths:
- _trace(f"In {filename!r}, line {n:d}: "
- f"skipping duplicate sys.path entry: {dir_}")
- else:
- _pending_syspaths.setdefault(filename, []).append(dir_)
- known_paths.add(dircase)
+class _StartupState:
+ """Per-batch accumulator for .pth and .start file processing.
+ A _StartupState collects sys.path extensions, deprecated .pth import
+ lines, and .start entry points read from one or more site-packages
+ directories. Calling process() applies them in PEP 829 order: paths
+ are added to sys.path first, then import lines from .pth files (skipping
+ any with a matching .start), then entry points from .start files.
-def _read_start_file(sitedir, name):
- """Parse a .start file for a list of entry point strings."""
- lines, filename = _read_pthstart_file(sitedir, name, ".start")
- if lines is None:
- return
+ State lives entirely on the instance; there is no module-level pending
+ state. This is what makes the module reentrancy-safe: a site.addsitedir()
+ call reached recursively from an exec'd import line or a .start entry
+ point operates on a different _StartupState than the one being processed
+ by the outer call.
- # PEP 829: the *presence* of a matching .start file disables `import`
- # line processing in the matched .pth file, regardless of whether the
- # .start file produced any entry points. Register the filename as a
- # key now so an empty (or comment-only) .start file still suppresses.
- entrypoints = _pending_entrypoints.setdefault(filename, [])
+ The internal data is intentionally private; the public methods
+ (read_pth_file, read_start_file, process) are the only supported write
+ APIs.
+ """
+ __slots__ = ('_syspaths', '_importexecs', '_entrypoints')
+
+ def __init__(self):
+ # All three dicts map "<full path to .pth or .start file>" -> list
+ # of items collected from that file. Mapping by filename lets us
+ # cross-reference a .pth and its matching .start (PEP 829 import
+ # suppression rule) and lets _print_error report the source file
+ # when an entry fails.
+ self._syspaths = {}
+ self._importexecs = {}
+ self._entrypoints = {}
+
+ def read_pth_file(self, sitedir, name, known_paths):
+ """Parse a .pth file, accumulating sys.path extensions and import lines.
+
+ Errors on individual lines do not abort processing of the rest of
+ the file (PEP 829). ``known_paths`` is the per-batch dedup
+ ledger: any path already in it is skipped, and newly accepted
+ paths are added to it so that subsequent .pth files in the same
+ batch don't add them more than once.
+ """
+ lines, filename = _read_pthstart_file(sitedir, name, ".pth")
+ if lines is None:
+ return
- for n, line in enumerate(lines, 1):
- line = line.strip()
- if not line or line.startswith("#"):
- continue
- # Syntax validation is deferred to entry-point execution time,
- # where pkgutil.resolve_name(strict=True) enforces the
- # pkg.mod:callable form.
- entrypoints.append(line)
-
-
-def _extend_syspath():
- # We've already filtered out duplicates, either in the existing sys.path
- # or in all the .pth files we've seen. We've also abspath/normpath'd all
- # the entries, so all that's left to do is to ensure that the path exists.
- for filename, dirs in _pending_syspaths.items():
- for dir_ in dirs:
- if os.path.exists(dir_):
- _trace(f"Extending sys.path with {dir_} from {filename}")
- sys.path.append(dir_)
- else:
- _print_error(
- f"In {filename}: {dir_} does not exist; "
- f"skipping sys.path append")
-
-
-def _exec_imports():
- # For all the `import` lines we've seen in .pth files, exec() them in
- # order. However, if they come from a file with a matching .start, then
- # we ignore these import lines. For the ones we do process, print a
- # warning but only when -v was given.
- for filename, imports in _pending_importexecs.items():
- name, dot, pth = filename.rpartition(".")
- assert dot == "." and pth == "pth", f"Bad startup filename: {filename}"
-
- if f"{name}.start" in _pending_entrypoints:
- # Skip import lines in favor of entry points.
- continue
+ for n, line in enumerate(lines, 1):
+ line = line.strip()
+ if not line or line.startswith("#"):
+ continue
- _trace(
- f"import lines in {filename} are deprecated, "
- f"use entry points in a {name}.start file instead."
- )
+ # In Python 3.18 and 3.19, `import` lines are silently
+ # ignored. In Python 3.20 and beyond, issue a warning when
+ # `import` lines in .pth files are detected.
+ if line.startswith(("import ", "import\t")):
+ _warn_future_us(
+ "import lines in .pth files are silently ignored",
+ remove=(3, 18),
+ )
+ _warn_future_us(
+ "import lines in .pth files are noisily ignored",
+ remove=(3, 20),
+ )
+ self._importexecs.setdefault(filename, []).append(line)
+ continue
- for line in imports:
try:
- _trace(f"Exec'ing from {filename}: {line}")
- exec(line)
+ dir_, dircase = makepath(sitedir, line)
except Exception as exc:
- _print_error(
- f"Error in import line from {filename}: {line}", exc)
+ _trace(f"Error in {filename!r}, line {n:d}: {line!r}", exc)
+ continue
+ # PEP 829 dedup: skip paths already seen in this batch. See
+ # _startup_state docstring above for batch lifetimes.
+ if dircase in known_paths:
+ _trace(
+ f"In {filename!r}, line {n:d}: "
+ f"skipping duplicate sys.path entry: {dir_}"
+ )
+ else:
+ self._syspaths.setdefault(filename, []).append(dir_)
+ known_paths.add(dircase)
-def _execute_start_entrypoints():
- """Execute all accumulated .start file entry points.
+ def read_start_file(self, sitedir, name):
+ """Parse a .start file for a list of entry point strings."""
+ lines, filename = _read_pthstart_file(sitedir, name, ".start")
+ if lines is None:
+ return
- Called after all site-packages directories have been processed so that
- sys.path is fully populated before any entry point code runs. Uses
- pkgutil.resolve_name(strict=True) which both validates the strict
- pkg.mod:callable form and resolves the entry point in one step.
- """
- for filename, entrypoints in _pending_entrypoints.items():
- for entrypoint in entrypoints:
- try:
- _trace(f"Executing entry point: {entrypoint} from {filename}")
- callable_ = pkgutil.resolve_name(entrypoint, strict=True)
- except ValueError as exc:
- _print_error(
- f"Invalid entry point syntax in {filename}: "
- f"{entrypoint!r}", exc)
+ # PEP 829: the *presence* of a matching .start file disables `import`
+ # line processing in the matched .pth file, regardless of whether this
+ # .start file contains any entry points. Register the filename as a
+ # key now so an empty (or comment-only) .start file still suppresses.
+ entrypoints = self._entrypoints.setdefault(filename, [])
+
+ for n, line in enumerate(lines, 1):
+ line = line.strip()
+ if not line or line.startswith("#"):
continue
- except Exception as exc:
- _print_error(
- f"Error resolving entry point {entrypoint} "
- f"from {filename}", exc)
+ # Syntax validation is deferred to entry point execution
+ # time, where pkgutil.resolve_name(strict=True) enforces the
+ # pkg.mod:callable form.
+ entrypoints.append(line)
+
+ def process(self):
+ """Apply accumulated state in PEP 829 order.
+
+ Phase order matters: all .pth path extensions are applied to
+ sys.path *before* any import line or .start entry point runs, so
+ that an entry point may live in a module reachable only via a
+ .pth-extended path.
+ """
+ self._extend_syspath()
+ self._exec_imports()
+ self._execute_start_entrypoints()
+
+ def _extend_syspath(self):
+ # Duplicates have already been filtered (in existing sys.path or
+ # across .pth files via known_paths), and entries are already
+ # abspath/normpath'd, so all that remains is to confirm the path
+ # exists.
+ for filename, dirs in self._syspaths.items():
+ for dir_ in dirs:
+ if os.path.exists(dir_):
+ _trace(f"Extending sys.path with {dir_} from {filename}")
+ sys.path.append(dir_)
+ else:
+ _print_error(
+ f"In {filename}: {dir_} does not exist; "
+ f"skipping sys.path append"
+ )
+
+ def _exec_imports(self):
+ # For each `import` line we've seen in a .pth file, exec() it in
+ # order, unless the .pth has a matching .start file in this same
+ # batch. In that case, PEP 829 says the import lines are
+ # suppressed in favor of the .start's entry points.
+ for filename, imports in self._importexecs.items():
+ # Given "/path/to/foo.pth", check whether "/path/to/foo.start" was
+ # registered in this same batch.
+ name, dot, pth = filename.rpartition(".")
+ assert dot == "." and pth == "pth", (
+ f"Bad startup filename: {filename}"
+ )
+ if f"{name}.start" in self._entrypoints:
+ _trace(
+ f"import lines in {filename} are suppressed "
+ f"due to matching {name}.start file."
+ )
continue
- try:
- callable_()
- except Exception as exc:
- _print_error(
- f"Error in entry point {entrypoint} from {filename}",
- exc)
+
+ _trace(
+ f"import lines in {filename} are deprecated, "
+ f"use entry points in a {name}.start file instead."
+ )
+ for line in imports:
+ try:
+ _trace(f"Exec'ing from {filename}: {line}")
+ exec(line)
+ except Exception as exc:
+ _print_error(
+ f"Error in import line from {filename}: {line}",
+ exc,
+ )
+
+ def _execute_start_entrypoints(self):
+ # Resolve each entry point string to a callable via
+ # pkgutil.resolve_name(strict=True), which both validates the
+ # required pkg.mod:callable form and performs the import in one
+ # step, then call it with no arguments.
+ for filename, entrypoints in self._entrypoints.items():
+ for entrypoint in entrypoints:
+ try:
+ _trace(
+ f"Executing entry point: {entrypoint} from {filename}"
+ )
+ callable_ = pkgutil.resolve_name(entrypoint, strict=True)
+ except ValueError as exc:
+ _print_error(
+ f"Invalid entry point syntax in {filename}: "
+ f"{entrypoint!r}",
+ exc,
+ )
+ except Exception as exc:
+ _print_error(
+ f"Error resolving entry point {entrypoint} "
+ f"from {filename}",
+ exc,
+ )
+ else:
+ try:
+ callable_()
+ except Exception as exc:
+ _print_error(
+ f"Error in entry point {entrypoint} from {filename}",
+ exc,
+ )
def process_startup_files():
- """Flush all pending sys.path and entry points."""
- _extend_syspath()
- _exec_imports()
- _execute_start_entrypoints()
- _pending_syspaths.clear()
- _pending_importexecs.clear()
- _pending_entrypoints.clear()
+ """Flush any pending startup-file state accumulated during a batch.
+
+ Used by main() (and any external caller that drove addsitedir() with
+ defer_processing_start_files=True) to apply the accumulated paths
+ and run the deferred import lines / entry points.
+
+ Reentrancy: the active batch state is detached from _startup_state
+ *before* state.process() runs. This way, if an exec'd import line
+ or .start entry point itself calls site.addsitedir(), that call
+ creates its own per-call _StartupState rather than mutating the dicts
+ being iterated here. See gh-149504.
+ """
+ global _startup_state
+ if _startup_state is None:
+ return
+ state, _startup_state = _startup_state, None
+ state.process()
def addpackage(sitedir, name, known_paths):
reset = True
else:
reset = False
- _read_pth_file(sitedir, name, known_paths)
- process_startup_files()
- if reset:
- known_paths = None
- return known_paths
+
+ # If a batch is already in progress (for example, main() is still
+ # accumulating sitedirs), participate in the batch by writing into the
+ # shared _startup_state and letting the eventual process_startup_files()
+ # flush it. Otherwise this is a standalone call, so create a unique
+ # per-call state, populate it, and process it before returning.
+ if _startup_state is None:
+ state = _StartupState()
+ state.read_pth_file(sitedir, name, known_paths)
+ state.process()
+ else:
+ _startup_state.read_pth_file(sitedir, name, known_paths)
+
+ return None if reset else known_paths
def addsitedir(sitedir, known_paths=None, *, defer_processing_start_files=False):
"""Add 'sitedir' argument to sys.path if missing and handle startup
files."""
+ global _startup_state
_trace(f"Adding directory: {sitedir!r}")
if known_paths is None:
known_paths = _init_pathinfo()
# If the normcase'd new sitedir isn't already known, append it to
# sys.path, keep a record of it, and process all .pth and .start files
# found in that directory. If the new sitedir is known, be sure not
- # to process all of those twice! gh-75723
+ # to process all of those more than once! gh-75723
if sitedircase not in known_paths:
sys.path.append(sitedir)
known_paths.add(sitedircase)
try:
names = os.listdir(sitedir)
except OSError:
- return
+ return None if reset else known_paths
+
+ # Pick the _StartupState we'll write into. There are three cases:
+ #
+ # 1. A batch is already active (_startup_state is set, e.g. because
+ # main() previously called us with
+ # defer_processing_start_files=True). Participate in this batch by
+ # sharing the same state. Don't flush the state since the batch's
+ # eventual process_startup_files() will do that.
+ #
+ # 2. There is no active batch but the caller passed
+ # defer_processing_start_files=True. Preserve a fresh
+ # _StartupState into the global _startup_state so that subsequent
+ # addsitedir() calls participate in this batch, and so that the
+ # caller's later process_startup_files() finds it.
+ #
+ # 3. This is a standalone call (there is no active batch and
+ # defer_processing_start_files=False). Create a unique per-call
+ # state, populate it, process it, and then clear it. Per-call
+ # state is what makes reentrant addsitedir() safe; a recursive call
+ # from inside process() lands here too and gets its own independent
+ # state.
+
+ if _startup_state is not None:
+ state = _startup_state
+ flush_now = False
+ elif defer_processing_start_files:
+ state = _startup_state = _StartupState()
+ flush_now = False
+ else:
+ state = _StartupState()
+ flush_now = True
# The following phases are defined by PEP 829.
# Phases 1-3: Read .pth files, accumulating paths and import lines.
if name.endswith(".pth") and not name.startswith(".")
)
for name in pth_names:
- _read_pth_file(sitedir, name, known_paths)
+ state.read_pth_file(sitedir, name, known_paths)
# Phases 6-7: Discover .start files and accumulate their entry points.
- # Import lines from .pth files with a matching .start file are discarded
- # at flush time by _exec_imports().
+ # Import lines from .pth files with a matching .start file are
+ # discarded at flush time by _StartupState._exec_imports().
start_names = sorted(
name for name in names
if name.endswith(".start") and not name.startswith(".")
)
for name in start_names:
- _read_start_file(sitedir, name)
+ state.read_start_file(sitedir, name)
- # Generally, when addsitedir() is called explicitly, we'll want to process
- # all the startup file data immediately. However, when called through
- # main(), we'll want to batch up all the startup file processing. main()
- # will set this flag to True to defer processing.
- if not defer_processing_start_files:
- process_startup_files()
+ if flush_now:
+ state.process()
- if reset:
- return None
-
- return known_paths
+ return None if reset else known_paths
def check_enableusersite():
def setUp(self):
self.enterContext(import_helper.DirsOnSysPath())
self.tmpdir = self.sitedir = self.enterContext(os_helper.temp_dir())
- # Save and clear all pending dicts.
- self.saved_entrypoints = site._pending_entrypoints.copy()
- self.saved_syspaths = site._pending_syspaths.copy()
- self.saved_importexecs = site._pending_importexecs.copy()
- site._pending_entrypoints.clear()
- site._pending_syspaths.clear()
- site._pending_importexecs.clear()
+ # Each test gets its own _StartupState to drive the parser and
+ # processor methods directly. Defensively clear any _startup_state
+ # that a prior test may have left set via defer_processing_start_files
+ # without a corresponding process_startup_files() flush.
+ self.state = site._StartupState()
+ site._startup_state = None
+ self.addCleanup(self._reset_startup_state)
- def tearDown(self):
- site._pending_entrypoints = self.saved_entrypoints.copy()
- site._pending_syspaths = self.saved_syspaths.copy()
- site._pending_importexecs = self.saved_importexecs.copy()
+ def _reset_startup_state(self):
+ site._startup_state = None
def _make_start(self, content, name='testpkg'):
"""Write a <name>.start file and return its basename."""
return extdir
def _all_entrypoints(self):
- """Flatten _pending_entrypoints dict into a list of (filename, entry) tuples."""
+ """Flatten state._entrypoints into a list of (filename, entry) tuples."""
result = []
- for filename, entries in site._pending_entrypoints.items():
+ for filename, entries in self.state._entrypoints.items():
for entry in entries:
result.append((filename, entry))
return result
def _just_entrypoints(self):
return [entry for filename, entry in self._all_entrypoints()]
- # --- _read_start_file tests ---
+ # There are two classes of tests here. Tests that start with `test_impl_`
+ # know details about the implementation and they access non-public methods
+ # and data structures to perform focused functional tests.
+ #
+ # Tests that start with `test_addsitedir_` are end-to-end tests that ensure
+ # integration semantics and functionality as a caller of the public
+ # surfaces would see.
+
+ # --- _StartupState.read_start_file tests ---
- def test_read_start_file_basic(self):
+ def test_impl_read_start_file_basic(self):
self._make_start("os.path:join\n", name='foo')
- site._read_start_file(self.sitedir, 'foo.start')
+ self.state.read_start_file(self.sitedir, 'foo.start')
fullname = os.path.join(self.sitedir, 'foo.start')
- self.assertEqual(site._pending_entrypoints[fullname], ['os.path:join'])
+ self.assertEqual(
+ self.state._entrypoints[fullname], ['os.path:join']
+ )
- def test_read_start_file_multiple_entries(self):
+ def test_impl_read_start_file_multiple_entries(self):
self._make_start("os.path:join\nos.path:exists\n", name='foo')
- site._read_start_file(self.sitedir, 'foo.start')
+ self.state.read_start_file(self.sitedir, 'foo.start')
fullname = os.path.join(self.sitedir, 'foo.start')
- self.assertEqual(site._pending_entrypoints[fullname],
- ['os.path:join', 'os.path:exists'])
+ self.assertEqual(
+ self.state._entrypoints[fullname],
+ ['os.path:join', 'os.path:exists'],
+ )
- def test_read_start_file_comments_and_blanks(self):
+ def test_impl_read_start_file_comments_and_blanks(self):
self._make_start("# a comment\n\nos.path:join\n \n", name='foo')
- site._read_start_file(self.sitedir, 'foo.start')
+ self.state.read_start_file(self.sitedir, 'foo.start')
fullname = os.path.join(self.sitedir, 'foo.start')
- self.assertEqual(site._pending_entrypoints[fullname], ['os.path:join'])
+ self.assertEqual(
+ self.state._entrypoints[fullname], ['os.path:join']
+ )
- def test_read_start_file_accepts_all_non_blank_lines(self):
+ def test_impl_read_start_file_accepts_all_non_blank_lines(self):
# Syntax validation is deferred to entry-point execution time
# (where pkgutil.resolve_name(strict=True) enforces the strict
# pkg.mod:callable form), so parsing accepts every non-blank,
"os.path:join\n" # valid
)
self._make_start(content, name='foo')
- site._read_start_file(self.sitedir, 'foo.start')
+ self.state.read_start_file(self.sitedir, 'foo.start')
fullname = os.path.join(self.sitedir, 'foo.start')
- self.assertEqual(site._pending_entrypoints[fullname], [
+ self.assertEqual(self.state._entrypoints[fullname], [
'os.path',
'pkg.mod:',
':callable',
'os.path:join',
])
- def test_read_start_file_empty(self):
+ def test_impl_read_start_file_empty(self):
# PEP 829: an empty .start file is still registered as present
- # (with an empty entry-point list) so that it suppresses `import`
+ # (with an empty entry point list) so that it suppresses `import`
# lines in any matching .pth file.
self._make_start("", name='foo')
- site._read_start_file(self.sitedir, 'foo.start')
+ self.state.read_start_file(self.sitedir, 'foo.start')
fullname = os.path.join(self.sitedir, 'foo.start')
- self.assertEqual(site._pending_entrypoints, {fullname: []})
+ self.assertEqual(self.state._entrypoints, {fullname: []})
- def test_read_start_file_comments_only(self):
+ def test_impl_read_start_file_comments_only(self):
# As with an empty file, a comments-only .start file is registered
# as present so it can suppress matching .pth `import` lines.
self._make_start("# just a comment\n# another\n", name='foo')
- site._read_start_file(self.sitedir, 'foo.start')
+ self.state.read_start_file(self.sitedir, 'foo.start')
fullname = os.path.join(self.sitedir, 'foo.start')
- self.assertEqual(site._pending_entrypoints, {fullname: []})
+ self.assertEqual(self.state._entrypoints, {fullname: []})
- def test_read_start_file_nonexistent(self):
+ def test_impl_read_start_file_nonexistent(self):
with captured_stderr():
- site._read_start_file(self.tmpdir, 'nonexistent.start')
- self.assertEqual(site._pending_entrypoints, {})
+ self.state.read_start_file(self.tmpdir, 'nonexistent.start')
+ self.assertEqual(self.state._entrypoints, {})
@unittest.skipUnless(hasattr(os, 'chflags'), 'test needs os.chflags()')
- def test_read_start_file_hidden_flags(self):
+ def test_impl_read_start_file_hidden_flags(self):
self._make_start("os.path:join\n", name='foo')
filepath = os.path.join(self.tmpdir, 'foo.start')
st = os.stat(filepath)
os.chflags(filepath, st.st_flags | stat.UF_HIDDEN)
- site._read_start_file(self.sitedir, 'foo.start')
- self.assertEqual(site._pending_entrypoints, {})
+ self.state.read_start_file(self.sitedir, 'foo.start')
+ self.assertEqual(self.state._entrypoints, {})
- def test_read_start_file_duplicates_not_deduplicated(self):
+ def test_impl_one_start_file_with_duplicates_not_deduplicated(self):
# PEP 829: duplicate entry points are NOT deduplicated.
self._make_start("os.path:join\nos.path:join\n", name='foo')
- site._read_start_file(self.sitedir, 'foo.start')
+ self.state.read_start_file(self.sitedir, 'foo.start')
fullname = os.path.join(self.sitedir, 'foo.start')
- self.assertEqual(site._pending_entrypoints[fullname],
- ['os.path:join', 'os.path:join'])
+ self.assertEqual(
+ self.state._entrypoints[fullname],
+ ['os.path:join', 'os.path:join'],
+ )
+
+ def test_impl_two_start_files_with_duplicates_not_deduplicated(self):
+ self._make_start("os.path:join", name="foo")
+ self._make_start("os.path:join", name="bar")
+ self.state.read_start_file(self.sitedir, 'foo.start')
+ self.state.read_start_file(self.sitedir, 'bar.start')
+ self.assertEqual(
+ self._just_entrypoints(),
+ ['os.path:join', 'os.path:join'],
+ )
- def test_read_start_file_accepts_utf8_bom(self):
+ def test_impl_read_start_file_accepts_utf8_bom(self):
# PEP 829: .start files MUST be utf-8-sig (UTF-8 with optional BOM).
filepath = os.path.join(self.tmpdir, 'foo.start')
with open(filepath, 'wb') as f:
f.write(b'\xef\xbb\xbf' + b'os.path:join\n')
- site._read_start_file(self.sitedir, 'foo.start')
+ self.state.read_start_file(self.sitedir, 'foo.start')
fullname = os.path.join(self.sitedir, 'foo.start')
self.assertEqual(
- site._pending_entrypoints[fullname], ['os.path:join'])
+ self.state._entrypoints[fullname], ['os.path:join']
+ )
- def test_read_start_file_invalid_utf8_silently_skipped(self):
- # PEP 829: .start files MUST be utf-8-sig. Unlike .pth, there is
- # no locale-encoding fallback -- a .start file that is not valid
+ def test_impl_read_start_file_invalid_utf8_silently_skipped(self):
+ # PEP 829: .start files MUST be utf-8-sig. Unlike .pth files, there
+ # is no locale-encoding fallback. A .start file that is not valid
# UTF-8 is silently skipped, with no key registered in
- # _pending_entrypoints and no output to stderr (parsing errors
- # are reported only under -v).
+ # state._entrypoints and no output to stderr (parsing errors are
+ # reported only under -v).
filepath = os.path.join(self.tmpdir, 'foo.start')
with open(filepath, 'wb') as f:
# Bare continuation byte -- invalid as a UTF-8 start byte.
f.write(b'\x80\x80\x80\n')
with captured_stderr() as err:
- site._read_start_file(self.sitedir, 'foo.start')
- self.assertEqual(site._pending_entrypoints, {})
+ self.state.read_start_file(self.sitedir, 'foo.start')
+ self.assertEqual(self.state._entrypoints, {})
self.assertEqual(err.getvalue(), "")
- def test_two_start_files_with_duplicates_not_deduplicated(self):
- self._make_start("os.path:join", name="foo")
- self._make_start("os.path:join", name="bar")
- site._read_start_file(self.sitedir, 'foo.start')
- site._read_start_file(self.sitedir, 'bar.start')
- self.assertEqual(self._just_entrypoints(),
- ['os.path:join', 'os.path:join'])
+ # --- _StartupState.read_pth_file tests ---
- # --- _read_pth_file tests ---
-
- def test_read_pth_file_paths(self):
+ def test_impl_read_pth_file_paths(self):
subdir = os.path.join(self.sitedir, 'mylib')
os.mkdir(subdir)
self._make_pth("mylib\n", name='foo')
- site._read_pth_file(self.sitedir, 'foo.pth', set())
+ self.state.read_pth_file(self.sitedir, 'foo.pth', set())
fullname = os.path.join(self.sitedir, 'foo.pth')
- self.assertIn(subdir, site._pending_syspaths[fullname])
+ self.assertIn(subdir, self.state._syspaths[fullname])
- def test_read_pth_file_imports_collected(self):
+ def test_impl_read_pth_file_imports_collected(self):
self._make_pth("import sys\n", name='foo')
- site._read_pth_file(self.sitedir, 'foo.pth', set())
+ self.state.read_pth_file(self.sitedir, 'foo.pth', set())
fullname = os.path.join(self.sitedir, 'foo.pth')
- self.assertEqual(site._pending_importexecs[fullname], ['import sys'])
+ self.assertEqual(
+ self.state._importexecs[fullname], ['import sys']
+ )
- def test_read_pth_file_comments_and_blanks(self):
+ def test_impl_read_pth_file_comments_and_blanks(self):
self._make_pth("# comment\n\n \n", name='foo')
- site._read_pth_file(self.sitedir, 'foo.pth', set())
- self.assertEqual(site._pending_syspaths, {})
- self.assertEqual(site._pending_importexecs, {})
+ self.state.read_pth_file(self.sitedir, 'foo.pth', set())
+ self.assertEqual(self.state._syspaths, {})
+ self.assertEqual(self.state._importexecs, {})
- def test_read_pth_file_deduplication(self):
+ def test_impl_read_pth_file_deduplication(self):
subdir = os.path.join(self.sitedir, 'mylib')
os.mkdir(subdir)
+ # An accumulator acts as a deduplication ledger.
known_paths = set()
self._make_pth("mylib\n", name='a')
self._make_pth("mylib\n", name='b')
- site._read_pth_file(self.sitedir, 'a.pth', known_paths)
- site._read_pth_file(self.sitedir, 'b.pth', known_paths)
- # Only one entry across both files.
+ self.state.read_pth_file(self.sitedir, 'a.pth', known_paths)
+ self.state.read_pth_file(self.sitedir, 'b.pth', known_paths)
+ # There is only one entry across both files.
all_dirs = []
- for dirs in site._pending_syspaths.values():
+ for dirs in self.state._syspaths.values():
all_dirs.extend(dirs)
self.assertEqual(all_dirs, [subdir])
- def test_read_pth_file_bad_line_continues(self):
- # PEP 829: errors on individual lines don't abort the file.
+ def test_impl_read_pth_file_bad_line_continues(self):
+ # PEP 829: errors on individual lines don't abort processing the file.
subdir = os.path.join(self.sitedir, 'goodpath')
os.mkdir(subdir)
self._make_pth("abc\x00def\ngoodpath\n", name='foo')
with captured_stderr():
- site._read_pth_file(self.sitedir, 'foo.pth', set())
+ self.state.read_pth_file(self.sitedir, 'foo.pth', set())
fullname = os.path.join(self.sitedir, 'foo.pth')
- self.assertIn(subdir, site._pending_syspaths.get(fullname, []))
+ self.assertIn(subdir, self.state._syspaths.get(fullname, []))
def _flags_with_verbose(self, verbose):
# Build a sys.flags clone with verbose overridden but every
# other field preserved, so unrelated reads like
# sys.flags.optimize during io.open_code() continue to work.
- attrs = {name: getattr(sys.flags, name)
- for name in sys.flags.__match_args__}
+ attrs = {
+ name: getattr(sys.flags, name)
+ for name in sys.flags.__match_args__
+ }
attrs['verbose'] = verbose
return SimpleNamespace(**attrs)
- def test_read_pth_file_parse_error_silent_by_default(self):
+ def test_impl_read_pth_file_parse_error_silent_by_default(self):
# PEP 829: parse-time errors are silent unless -v is given.
- # Force the error path by making makepath() raise.
+ # Force the error path by making makepath() raise an exception.
self._make_pth("badline\n", name='foo')
- with mock.patch('site.makepath', side_effect=ValueError("boom")), \
- mock.patch('sys.flags', self._flags_with_verbose(False)), \
- captured_stderr() as err:
- site._read_pth_file(self.sitedir, 'foo.pth', set())
+ with (
+ mock.patch('site.makepath', side_effect=ValueError("boom")),
+ mock.patch('sys.flags', self._flags_with_verbose(False)),
+ captured_stderr() as err,
+ ):
+ self.state.read_pth_file(self.sitedir, 'foo.pth', set())
self.assertEqual(err.getvalue(), "")
- def test_read_pth_file_parse_error_reported_under_verbose(self):
+ def test_impl_read_pth_file_parse_error_reported_under_verbose(self):
# PEP 829: parse-time errors are reported when -v is given.
self._make_pth("badline\n", name='foo')
- with mock.patch('site.makepath', side_effect=ValueError("boom")), \
- mock.patch('sys.flags', self._flags_with_verbose(True)), \
- captured_stderr() as err:
- site._read_pth_file(self.sitedir, 'foo.pth', set())
+ with (
+ mock.patch('site.makepath', side_effect=ValueError("boom")),
+ mock.patch('sys.flags', self._flags_with_verbose(True)),
+ captured_stderr() as err,
+ ):
+ self.state.read_pth_file(self.sitedir, 'foo.pth', set())
out = err.getvalue()
self.assertIn('Error in', out)
self.assertIn('foo.pth', out)
- def test_read_pth_file_locale_fallback(self):
+ def test_impl_read_pth_file_locale_fallback(self):
# PEP 829: .pth files that fail UTF-8 decoding fall back to the
# locale encoding for backward compatibility (deprecated in
# 3.15, to be removed in 3.20). Mock locale.getencoding() so
# \xe9 is invalid UTF-8 but valid in latin-1.
with open(filepath, 'wb') as f:
f.write(b'# caf\xe9 comment\nmylib\n')
- with mock.patch('locale.getencoding', return_value='latin-1'), \
- captured_stderr():
- site._read_pth_file(self.sitedir, 'foo.pth', set())
+ with (
+ mock.patch('locale.getencoding', return_value='latin-1'),
+ captured_stderr(),
+ ):
+ self.state.read_pth_file(self.sitedir, 'foo.pth', set())
fullname = os.path.join(self.sitedir, 'foo.pth')
- self.assertIn(subdir, site._pending_syspaths.get(fullname, []))
+ self.assertIn(subdir, self.state._syspaths.get(fullname, []))
- # --- _execute_start_entrypoints tests ---
+ # --- _StartupState._execute_start_entrypoints tests ---
- def test_execute_entrypoints_with_callable(self):
- # Entrypoint with callable is invoked.
+ def test_impl_execute_entrypoints_with_callable(self):
+ # An entry point with a callable.
self._make_mod("""\
called = False
def startup():
called = True
""", name='epmod', package=True, on_path=True)
fullname = os.path.join(self.sitedir, 'epmod.start')
- site._pending_entrypoints[fullname] = ['epmod:startup']
- site._execute_start_entrypoints()
+ self.state._entrypoints[fullname] = ['epmod:startup']
+ self.state._execute_start_entrypoints()
import epmod
self.assertTrue(epmod.called)
- def test_execute_entrypoints_import_error(self):
- # Import error prints traceback but continues.
+ def test_impl_execute_entrypoints_import_error(self):
+ # Import errors print a traceback and continue.
fullname = os.path.join(self.sitedir, 'bad.start')
- site._pending_entrypoints[fullname] = [
- 'nosuchmodule_xyz:func', 'os.path:join']
+ self.state._entrypoints[fullname] = [
+ 'nosuchmodule_xyz:func', 'os.path:join',
+ ]
with captured_stderr() as err:
- site._execute_start_entrypoints()
+ self.state._execute_start_entrypoints()
self.assertIn('nosuchmodule_xyz', err.getvalue())
# os.path:join should still have been called (no exception for it)
- def test_execute_entrypoints_strict_syntax_rejection(self):
- # PEP 829: only the strict pkg.mod:callable form is valid.
- # At entry-point execution, pkgutil.resolve_name(strict=True)
- # raises ValueError for invalid syntax; the invalid entry is
- # reported and execution continues with the next one.
+ def test_impl_execute_entrypoints_strict_syntax_rejection(self):
+ # PEP 829: only the strict pkg.mod:callable form is valid. At entry
+ # point execution time, pkgutil.resolve_name(strict=True) raises a
+ # ValueError for the invalid syntax. The invalid entry is reported
+ # and execution continues with the next one.
fullname = os.path.join(self.sitedir, 'bad.start')
- site._pending_entrypoints[fullname] = [
+ self.state._entrypoints[fullname] = [
'os.path', # no colon
'pkg.mod:', # empty callable
':callable', # empty module
'pkg.mod:callable:extra', # multiple colons
]
with captured_stderr() as err:
- site._execute_start_entrypoints()
+ self.state._execute_start_entrypoints()
out = err.getvalue()
self.assertIn('Invalid entry point syntax', out)
- for bad in ('os.path', 'pkg.mod:', ':callable',
- 'pkg.mod:callable:extra'):
+ for bad in (
+ 'os.path',
+ 'pkg.mod:',
+ ':callable',
+ 'pkg.mod:callable:extra',
+ ):
self.assertIn(bad, out)
- def test_execute_entrypoints_callable_error(self):
- # Callable that raises prints traceback but continues.
+ def test_impl_execute_entrypoints_callable_error(self):
+ # A callable that errors prints a traceback but continues.
self._make_mod("""\
def fail():
raise RuntimeError("boom")
""", name='badmod', package=True, on_path=True)
fullname = os.path.join(self.sitedir, 'badmod.start')
- site._pending_entrypoints[fullname] = ['badmod:fail']
+ self.state._entrypoints[fullname] = ['badmod:fail']
with captured_stderr() as err:
- site._execute_start_entrypoints()
+ self.state._execute_start_entrypoints()
self.assertIn('RuntimeError', err.getvalue())
self.assertIn('boom', err.getvalue())
- def test_execute_entrypoints_duplicates_called_twice(self):
+ def test_impl_execute_entrypoints_duplicates_called_twice(self):
# PEP 829: duplicate entry points execute multiple times.
self._make_mod("""\
call_count = 0
def bump():
global call_count
call_count += 1
-""", name='countmod', package=True, on_path=True)
+""", name='countmod', package=False, on_path=True)
fullname = os.path.join(self.sitedir, 'countmod.start')
- site._pending_entrypoints[fullname] = [
- 'countmod:bump', 'countmod:bump']
- site._execute_start_entrypoints()
+ self.state._entrypoints[fullname] = [
+ 'countmod:bump', 'countmod:bump',
+ ]
+ self.state._execute_start_entrypoints()
import countmod
self.assertEqual(countmod.call_count, 2)
- # --- _exec_imports tests ---
+ # --- _StartupState._exec_imports tests ---
- def test_exec_imports_suppressed_by_matching_start(self):
+ def test_impl_exec_imports_suppressed_by_matching_start(self):
# Import lines from foo.pth are suppressed when foo.start exists.
+ self._make_mod("""\
+call_count = 0
+def bump():
+ global call_count
+ call_count += 1
+""", name='countmod', package=False, on_path=True)
pth_fullname = os.path.join(self.sitedir, 'foo.pth')
start_fullname = os.path.join(self.sitedir, 'foo.start')
- site._pending_importexecs[pth_fullname] = ['import sys']
- site._pending_entrypoints[start_fullname] = ['os.path:join']
- # Should not exec the import line; no error expected.
- site._exec_imports()
+ self.state._importexecs[pth_fullname] = ['import countmod; countmod.bump()']
+ self.state._entrypoints[start_fullname] = ['os.path:join']
+ self.state._exec_imports()
+ import countmod
+ self.assertEqual(countmod.call_count, 0)
- def test_exec_imports_not_suppressed_by_different_start(self):
+ def test_impl_exec_imports_not_suppressed_by_different_start(self):
# Import lines from foo.pth are NOT suppressed by bar.start.
+ self._make_mod("""\
+call_count = 0
+def bump():
+ global call_count
+ call_count += 1
+""", name='countmod', package=False, on_path=True)
pth_fullname = os.path.join(self.sitedir, 'foo.pth')
start_fullname = os.path.join(self.sitedir, 'bar.start')
- site._pending_importexecs[pth_fullname] = ['import sys']
- site._pending_entrypoints[start_fullname] = ['os.path:join']
- # Should execute the import line without error.
- site._exec_imports()
+ self.state._importexecs[pth_fullname] = ['import countmod; countmod.bump()']
+ self.state._entrypoints[start_fullname] = ['os.path:join']
+ self.state._exec_imports()
+ import countmod
+ self.assertEqual(countmod.call_count, 1)
- def test_exec_imports_suppressed_by_empty_matching_start(self):
+ def test_impl_exec_imports_suppressed_by_empty_matching_start(self):
self._make_start("", name='foo')
self._make_pth("import epmod; epmod.startup()", name='foo')
self._make_mod("""\
global called
called = True
""", name='epmod', package=True, on_path=True)
- site._read_pth_file(self.sitedir, 'foo.pth', set())
- site._read_start_file(self.sitedir, 'foo.start')
- site._exec_imports()
+ self.state.read_pth_file(self.sitedir, 'foo.pth', set())
+ self.state.read_start_file(self.sitedir, 'foo.start')
+ self.state._exec_imports()
import epmod
self.assertFalse(epmod.called)
- # --- _extend_syspath tests ---
+ # --- _StartupState._extend_syspath tests ---
- def test_extend_syspath_existing_dir(self):
+ def test_impl_extend_syspath_existing_dir(self):
subdir = os.path.join(self.sitedir, 'extlib')
os.mkdir(subdir)
- site._pending_syspaths['test.pth'] = [subdir]
- site._extend_syspath()
+ self.state._syspaths['test.pth'] = [subdir]
+ self.state._extend_syspath()
self.assertIn(subdir, sys.path)
- def test_extend_syspath_nonexistent_dir(self):
- nosuch = os.path.join(self.sitedir, 'nosuchdir')
- site._pending_syspaths['test.pth'] = [nosuch]
+ def test_impl_extend_syspath_nonexistent_dir(self):
+ nonesuch = os.path.join(self.sitedir, 'nosuchdir')
+ self.state._syspaths['test.pth'] = [nonesuch]
with captured_stderr() as err:
- site._extend_syspath()
- self.assertNotIn(nosuch, sys.path)
+ self.state._extend_syspath()
+ self.assertNotIn(nonesuch, sys.path)
self.assertIn('does not exist', err.getvalue())
# --- addsitedir integration tests ---
+ def test_addsitedir_pth_import_skipped_when_matching_start_exists(self):
+ # PEP 829: an empty .start file disables the matching .pth's import
+ # lines, even when the .start has no entry points of its own.
+ self._make_mod("flag = False\n", name='suppressed', on_path=True)
+ self._make_start("", name='foo')
+ self._make_pth(
+ "import suppressed; suppressed.flag = True\n",
+ name='foo')
+ site.addsitedir(self.sitedir, set())
+ import suppressed
+ self.assertFalse(
+ suppressed.flag,
+ "import line in foo.pth should be suppressed by foo.start")
+
+ def test_addsitedir_dotfile_start_entrypoint_not_executed(self):
+ # .start files starting with '.' are skipped, so their entry
+ # points must not run.
+ self._make_mod("""\
+called = False
+def hook():
+ global called
+ called = True
+""",
+ name='dotted', on_path=True)
+ self._make_start("dotted:hook\n", name='.hidden')
+ site.addsitedir(self.sitedir, set())
+ import dotted
+ self.assertFalse(dotted.called)
+
+ def test_addsitedir_dedups_paths_across_pth_files(self):
+ # PEP 829: when multiple .pth files reference the same path within
+ # a single addsitedir() invocation, the path is appended to
+ # sys.path exactly once.
+ subdir = os.path.join(self.sitedir, 'shared')
+ os.mkdir(subdir)
+ self._make_pth("shared\n", name='a')
+ self._make_pth("shared\n", name='b')
+ before = sys.path.count(subdir)
+ site.addsitedir(self.sitedir, set())
+ self.assertEqual(sys.path.count(subdir), before + 1)
+
def test_addsitedir_discovers_start_files(self):
# addsitedir() should discover .start files and accumulate entries.
+ # With defer_processing_start_files=True the preserved state lives on
+ # site._startup_state and isn't flushed until the caller invokes
+ # process_startup_files().
self._make_start("os.path:join\n", name='foo')
- site.addsitedir(self.sitedir, set(),
- defer_processing_start_files=True)
+ site.addsitedir(
+ self.sitedir, set(),
+ defer_processing_start_files=True,
+ )
fullname = os.path.join(self.sitedir, 'foo.start')
- self.assertIn('os.path:join', site._pending_entrypoints[fullname])
+ self.assertIn(
+ 'os.path:join', site._startup_state._entrypoints[fullname]
+ )
- def test_addsitedir_start_suppresses_pth_imports(self):
+ def test_impl_exec_imports_skips_when_matching_start(self):
# When foo.start exists, import lines in foo.pth are skipped
- # at flush time by _exec_imports().
+ # at flush time by _StartupState._exec_imports().
self._make_start("os.path:join\n", name='foo')
self._make_pth("import sys\n", name='foo')
- site.addsitedir(self.sitedir, set(),
- defer_processing_start_files=True)
+ site.addsitedir(
+ self.sitedir, set(),
+ defer_processing_start_files=True,
+ )
pth_fullname = os.path.join(self.sitedir, 'foo.pth')
start_fullname = os.path.join(self.sitedir, 'foo.start')
# Import line was collected...
- self.assertIn('import sys',
- site._pending_importexecs.get(pth_fullname, []))
+ self.assertIn(
+ 'import sys',
+ site._startup_state._importexecs.get(pth_fullname, []),
+ )
# ...but _exec_imports() will skip it because foo.start exists.
- site._exec_imports()
+ site._startup_state._exec_imports()
def test_addsitedir_pth_paths_still_work_with_start(self):
# Path lines in .pth files still work even when a .start file exists.
os.mkdir(subdir)
self._make_start("os.path:join\n", name='foo')
self._make_pth("mylib\n", name='foo')
- site.addsitedir(self.sitedir, set(),
- defer_processing_start_files=True)
+ site.addsitedir(
+ self.sitedir, set(),
+ defer_processing_start_files=True,
+ )
fullname = os.path.join(self.sitedir, 'foo.pth')
- self.assertIn(subdir, site._pending_syspaths.get(fullname, []))
+ self.assertIn(
+ subdir, site._startup_state._syspaths.get(fullname, [])
+ )
def test_addsitedir_start_alphabetical_order(self):
# Multiple .start files are discovered alphabetically.
+ # _all_entrypoints() reads from self.state, so swap in the
+ # preserved batch state for the duration of the assertion.
self._make_start("os.path:join\n", name='zzz')
self._make_start("os.path:exists\n", name='aaa')
- site.addsitedir(self.sitedir, set(),
- defer_processing_start_files=True)
+ site.addsitedir(
+ self.sitedir, set(),
+ defer_processing_start_files=True,
+ )
+ self.state = site._startup_state
all_entries = self._all_entrypoints()
entries = [entry for _, entry in all_entries]
idx_a = entries.index('os.path:exists')
os.mkdir(subdir)
self._make_pth("mylib\n", name='foo')
self._make_start("os.path:join\n", name='foo')
- site.addsitedir(self.sitedir, set(),
- defer_processing_start_files=True)
+ site.addsitedir(
+ self.sitedir, set(),
+ defer_processing_start_files=True,
+ )
# Both should be collected.
pth_fullname = os.path.join(self.sitedir, 'foo.pth')
start_fullname = os.path.join(self.sitedir, 'foo.start')
- self.assertIn(subdir, site._pending_syspaths.get(pth_fullname, []))
- self.assertIn('os.path:join',
- site._pending_entrypoints.get(start_fullname, []))
+ self.assertIn(
+ subdir, site._startup_state._syspaths.get(pth_fullname, [])
+ )
+ self.assertIn(
+ 'os.path:join',
+ site._startup_state._entrypoints.get(start_fullname, []),
+ )
- def test_addsitedir_dotfile_start_ignored(self):
+ def test_impl_addsitedir_skips_dotfile_start(self):
# .start files starting with '.' are skipped. Defer flushing so
- # the assertion against _pending_entrypoints is meaningful;
- # otherwise process_startup_files() would clear the dict
- # regardless of whether the dotfile was picked up.
+ # the preserved batch state stays inspectable on
+ # site._startup_state; otherwise process_startup_files() would
+ # detach and consume it regardless of whether the dotfile was
+ # picked up.
self._make_start("os.path:join\n", name='.hidden')
- site.addsitedir(self.sitedir, set(),
- defer_processing_start_files=True)
- self.assertEqual(site._pending_entrypoints, {})
+ site.addsitedir(
+ self.sitedir, set(),
+ defer_processing_start_files=True,
+ )
+ self.assertEqual(site._startup_state._entrypoints, {})
def test_addsitedir_standalone_flushes(self):
- # When called with known_paths=None (standalone), addsitedir
- # flushes immediately so the caller sees the effect.
+ # When called with defer_processing_start_files=False (the
+ # default), addsitedir creates a per-call _StartupState and
+ # processes it before returning, so the caller sees the effect
+ # immediately. No batch state is left behind on
+ # site._startup_state.
subdir = os.path.join(self.sitedir, 'flushlib')
os.mkdir(subdir)
self._make_pth("flushlib\n", name='foo')
site.addsitedir(self.sitedir) # known_paths=None
self.assertIn(subdir, sys.path)
- # Pending dicts should be cleared after flush.
- self.assertEqual(site._pending_syspaths, {})
+ self.assertIsNone(site._startup_state)
def test_addsitedir_defer_does_not_flush(self):
# With defer_processing_start_files=True, addsitedir accumulates
# pending state but does not flush; sys.path is updated only when
- # process_startup_files() is called explicitly.
+ # process_startup_files() is called explicitly. The accumulated
+ # state lives on the lazily-promoted site._startup_state.
subdir = os.path.join(self.sitedir, 'acclib')
os.mkdir(subdir)
self._make_pth("acclib\n", name='foo')
- site.addsitedir(self.sitedir, set(),
- defer_processing_start_files=True)
+ site.addsitedir(
+ self.sitedir, set(),
+ defer_processing_start_files=True,
+ )
# Path is pending, not yet on sys.path.
self.assertNotIn(subdir, sys.path)
fullname = os.path.join(self.sitedir, 'foo.pth')
- self.assertIn(subdir, site._pending_syspaths.get(fullname, []))
+ self.assertIn(
+ subdir, site._startup_state._syspaths.get(fullname, [])
+ )
def test_pth_path_is_available_to_start_entrypoint(self):
# Core PEP 829 invariant: all .pth path extensions are applied to
import mod
self.assertEqual(mod._pth_count, 1)
+ # gh-149504
+ def test_reentrant_addsitedir_pth(self):
+ # An import line in a .pth file that calls site.addsitedir()
+ # must not crash or re-execute outer entries while the outer
+ # call is still processing its pending startup state.
+ overlay = self.enterContext(os_helper.temp_dir())
+ overlay_pth = os.path.join(overlay, 'overlay.pth')
+ pkgdir = self.enterContext(os_helper.temp_dir())
+ with open(overlay_pth, 'w', encoding='utf-8') as fp:
+ print(pkgdir, file=fp)
+ self._make_pth(f"import site; site.addsitedir({overlay!r})\n")
+ site.addsitedir(self.sitedir, set())
+ self.assertIn(overlay, sys.path)
+ self.assertIn(pkgdir, sys.path)
+
+ # gh-149504
+ def test_reentrant_addsitedir_start(self):
+ # As above, but the re-entry happens from a .start entry point
+ # instead of a .pth import line. The entry point execution
+ # phase is vulnerable to the same class of bug.
+ overlay = self.enterContext(os_helper.temp_dir())
+ overlay_pth = os.path.join(overlay, 'overlay.pth')
+ pkgdir = self.enterContext(os_helper.temp_dir())
+ with open(overlay_pth, 'w', encoding='utf-8') as fp:
+ print(pkgdir, file=fp)
+ self._make_mod(f"""\
+import site
+def bootstrap():
+ site.addsitedir({overlay!r})
+""",
+ name='reenter_helper', on_path=True)
+ self._make_start("reenter_helper:bootstrap\n")
+ site.addsitedir(self.sitedir, set())
+ self.assertIn(overlay, sys.path)
+ self.assertIn(pkgdir, sys.path)
+
+
if __name__ == "__main__":
unittest.main()