from mkosi.installer import clean_package_manager_metadata, package_manager_scripts
from mkosi.log import complete_step, die, log_step
from mkosi.manifest import Manifest
-from mkosi.mounts import mount_overlay, mount_passwd, mount_tools, scandir_recursive
+from mkosi.mounts import mount_overlay, mount_passwd, mount_tools
from mkosi.pager import page
from mkosi.qemu import copy_ephemeral, machine_cid, run_qemu
from mkosi.run import become_root, bwrap, chroot_cmd, init_mount_namespace, run
archive_tree(state.root, state.staging / state.config.output_with_format)
-def find_files(dir: Path, root: Path) -> Iterator[Path]:
- """Generate a list of all filepaths in directory @dir relative to @root"""
- yield from scandir_recursive(dir,
- lambda entry: Path(entry.path).relative_to(root))
-
-
def make_initrd(state: MkosiState) -> None:
if state.config.output_format != OutputFormat.cpio:
return
- make_cpio(state, find_files(state.root, state.root), state.staging / state.config.output_with_format)
+ make_cpio(state.root, state.root.rglob("*"), state.staging / state.config.output_with_format)
-def make_cpio(state: MkosiState, files: Iterator[Path], output: Path) -> None:
+def make_cpio(root: Path, files: Iterator[Path], output: Path) -> None:
with complete_step(f"Creating cpio {output}…"):
run([
"cpio",
"--null",
"-H", "newc",
"--quiet",
- "-D", state.root,
+ "-D", root,
"-O", output,
- ], input="\0".join(os.fspath(file) for file in files))
+ ], input="\0".join(os.fspath(f.relative_to(root)) for f in files))
def make_directory(state: MkosiState) -> None:
def filter_kernel_modules(root: Path, kver: str, include: Sequence[str], exclude: Sequence[str]) -> list[Path]:
- modulesd = Path("usr/lib/modules") / kver
- modules = set(m.relative_to(root) for m in (root / modulesd).glob("**/*.ko*"))
+ modulesd = root / "usr/lib/modules" / kver
+ modules = set(m for m in (root / modulesd).rglob("*.ko*"))
keep = set()
for pattern in include:
modulesd = Path("usr/lib/modules") / kver
builtin = set(module_path_to_name(Path(m)) for m in (state.root / modulesd / "modules.builtin").read_text().splitlines())
allmodules = set((state.root / modulesd / "kernel").glob("**/*.ko*"))
- nametofile = {module_path_to_name(m): m.relative_to(state.root) for m in allmodules}
+ nametofile = {module_path_to_name(m): m for m in allmodules}
log_step("Running modinfo to fetch kernel module dependencies")
depends += [d for d in value.strip().split(",") if d]
elif key == "firmware":
- firmware += [f.relative_to(state.root) for f in state.root.joinpath("usr/lib/firmware").glob(f"{value.strip()}*")]
+ firmware += [f for f in (state.root / "usr/lib/firmware").glob(f"{value.strip()}*")]
elif key == "name":
name = value.strip()
kmods = state.workspace / f"initramfs-kernel-modules-{kver}.img"
with complete_step(f"Generating kernel modules initrd for kernel {kver}"):
- modulesd = Path("usr/lib/modules") / kver
+ modulesd = state.root / "usr/lib/modules" / kver
modules = filter_kernel_modules(state.root, kver,
state.config.kernel_modules_initrd_include,
state.config.kernel_modules_initrd_exclude)
yield modulesd
yield modulesd / "kernel"
- for d in (modulesd, Path("usr/lib/firmware")):
- for p in (state.root / d).glob("**/*"):
+ for d in (modulesd, state.root / "usr/lib/firmware"):
+ for p in (state.root / d).rglob("*"):
if p.is_dir():
- yield p.relative_to(state.root)
+ yield p
for p in sorted(mods) + sorted(firmware):
yield p
if not p.name.startswith("modules"):
continue
- yield p.relative_to(state.root)
+ yield p
if (state.root / modulesd / "vdso").exists():
yield modulesd / "vdso"
for p in (state.root / modulesd / "vdso").iterdir():
- yield p.relative_to(state.root)
+ yield p
- make_cpio(state, files(), kmods)
+ make_cpio(state.root, files(), kmods)
# Debian/Ubuntu do not compress their kernel modules, so we compress the initramfs instead. Note that
# this is not ideal since the compressed kernel modules will all be decompressed on boot which
names = [module_path_to_name(m) for m in modules]
mods, firmware = resolve_module_dependencies(state, kver, names)
- allmodules = set(m.relative_to(state.root) for m in (state.root / modulesd).glob("**/*.ko*"))
- allfirmware = set(m.relative_to(state.root) for m in (state.root / "usr/lib/firmware").glob("**/*") if not m.is_dir())
+ allmodules = set(m for m in (state.root / modulesd).rglob("*.ko*"))
+ allfirmware = set(m for m in (state.root / "usr/lib/firmware").rglob("*") if not m.is_dir())
for m in allmodules:
if m in mods:
f"user:{uid}:rwx" if allow else f"user:{uid}",
"-"],
# Supply files via stdin so we don't clutter --debug run output too much
- input="\n".join([str(root),
- *(e.path for e in cast(Iterator[os.DirEntry[str]], scandir_recursive(root)) if e.is_dir())])
+ input="\n".join([str(root), *(os.fspath(p) for p in root.rglob("*") if p.is_dir())]),
)
# SPDX-License-Identifier: LGPL-2.1+
-import collections
import contextlib
import os
import platform
import tempfile
from collections.abc import Iterator, Sequence
from pathlib import Path
-from typing import Callable, Deque, Optional, TypeVar, Union, cast
+from typing import Optional, TypeVar
from mkosi.config import GenericVersion, MkosiConfig
from mkosi.log import complete_step
T = TypeVar("T")
-def scandir_recursive(
- root: Path,
- filter: Optional[Callable[[os.DirEntry[str]], T]] = None,
-) -> Iterator[T]:
- """Recursively walk the tree starting at @root, optionally apply filter, yield non-none values"""
- queue: Deque[Union[str, Path]] = collections.deque([root])
-
- while queue:
- for entry in os.scandir(queue.pop()):
- pred = filter(entry) if filter is not None else entry
- if pred is not None:
- yield cast(T, pred)
- if entry.is_dir(follow_symlinks=False):
- queue.append(entry.path)
-
-
def stat_is_whiteout(st: os.stat_result) -> bool:
return stat.S_ISCHR(st.st_mode) and st.st_rdev == 0
"""
with complete_step("Removing overlay whiteout files…"):
- for entry in cast(Iterator[os.DirEntry[str]], scandir_recursive(path)):
- if stat_is_whiteout(entry.stat(follow_symlinks=False)):
- os.unlink(entry.path)
+ for entry in path.rglob("*"):
+ # TODO: Use Path.stat() once we depend on Python 3.10+.
+ if stat_is_whiteout(os.stat(entry, follow_symlinks=False)):
+ entry.unlink()
@contextlib.contextmanager