os.unlink(entry.path)
-def do_mount(
+@contextlib.contextmanager
+def mount(
what: PathString,
where: Path,
+ operation: Optional[str] = None,
options: Sequence[str] = (),
type: Optional[str] = None,
read_only: bool = False,
-) -> None:
+) -> Iterator[Path]:
os.makedirs(where, 0o755, True)
if read_only:
options = ["ro", *options]
- cmd: List[PathString] = ["mount", "-n", what, where]
+ cmd: List[PathString] = ["mount", "--no-mtab"]
+
+ if operation:
+ cmd += [operation]
+
+ cmd += [what, where]
if type:
- cmd += ["-t", type]
+ cmd += ["--types", type]
if options:
- cmd += ["-o", ",".join(options)]
+ cmd += ["--options", ",".join(options)]
- run(cmd)
+ try:
+ run(cmd)
+ yield where
+ finally:
+ run(["umount", "--no-mtab", "--recursive", where])
-def mount_loop(args: MkosiArgs, dev: Path, where: Path, read_only: bool = False) -> None:
+def mount_loop(args: MkosiArgs, dev: Path, where: Path, read_only: bool = False) -> ContextManager[Path]:
options = []
if not args.output_format.is_squashfs():
options += ["discard"]
if compress and args.output_format == OutputFormat.gpt_btrfs and where.name not in {"efi", "boot"}:
options += ["compress" if compress is True else f"compress={compress}"]
- do_mount(dev, where, options, read_only=read_only)
+ return mount(dev, where, options=options, read_only=read_only)
-def mount_bind(what: Path, where: Optional[Path] = None) -> Path:
+def mount_bind(what: Path, where: Optional[Path] = None) -> ContextManager[Path]:
if where is None:
where = what
os.makedirs(what, 0o755, True)
os.makedirs(where, 0o755, True)
- run(["mount", "--bind", what, where])
- return where
+ return mount(what, where, operation="--bind")
-def mount_tmpfs(where: Path) -> None:
- do_mount("tmpfs", where, type="tmpfs")
+def mount_tmpfs(where: Path) -> ContextManager[Path]:
+ return mount("tmpfs", where, type="tmpfs")
+@contextlib.contextmanager
def mount_overlay(
- args: MkosiArgs,
base_image: Path, # the path to the mounted base image root
root: Path, # the path to the destination image root
read_only: bool = False,
-) -> Tuple[Path, TempDir]:
+) -> Iterator[Path]:
"""Set up the overlay mount on `root` with `base_image` as the lower layer.
Sadly the overlay cannot be mounted onto the root directly, because the
f'upperdir={realroot}',
f'workdir={workdir.name}']
- do_mount("overlay", realroot, options, type="overlay", read_only=read_only)
- mount_bind(realroot, root)
- return realroot, workdir
-
-
-@complete_step("Cleaning up overlayfs")
-def clean_up_overlay(root: Path, realroot: Path, workdir: TempDir) -> None:
- """Destroy the overlayfs structure set up by `mount_overlay`.
-
- When this function returns, the contents of the root file system have been
- moved into root, and `realroot` and `workdir` are gone.
-
- If `realroot` is set, it means we mounted `root` twice: the first mount is
- the overlayfs mount, and the second is a bind-mount to adjust the location
- one level up. Thus we need unmount twice too; after the first unmount here,
- the image remains mounted at `root`.
- """
-
- umount(root)
- umount(realroot)
-
- workdir.cleanup()
-
- # Let's now move the contents of realroot into root
- for entry in os.scandir(realroot):
- os.rename(realroot / entry.name, root / entry.name)
- realroot.rmdir()
+ try:
+ overlay = mount("overlay", realroot, options=options, type="overlay", read_only=read_only)
+ with workdir, overlay, mount_bind(realroot, root):
+ yield root
+ finally:
+ with complete_step("Cleaning up overlayfs"):
+ # Let's now move the contents of realroot into root
+ for entry in os.scandir(realroot):
+ os.rename(realroot / entry.name, root / entry.name)
+ realroot.rmdir()
- delete_whiteout_files(root)
+ delete_whiteout_files(root)
@contextlib.contextmanager
def mount_image(
args: MkosiArgs,
root: Path,
+ do_run_build_script: bool,
+ cached: bool,
base_image: Optional[Path], # the path to the mounted base image root
loopdev: Optional[Path],
image: LuksSetupOutput,
root_read_only: bool = False,
) -> Iterator[None]:
- with complete_step("Mounting image…"):
-
- realroot: Optional[Path] = None
- workdir: Optional[TempDir] = None
+ with complete_step("Mounting image…", "Unmounting image…"), contextlib.ExitStack() as stack:
if base_image is not None:
- mount_bind(root)
- realroot, workdir = mount_overlay(args, base_image, root, root_read_only)
+ stack.enter_context(mount_bind(root))
+ stack.enter_context(mount_overlay(base_image, root, root_read_only))
elif image.root is not None:
if args.usr_only:
# In UsrOnly mode let's have a bind mount at the top so that umount --recursive works nicely later
- mount_bind(root)
- mount_loop(args, image.root, root / "usr", root_read_only)
+ stack.enter_context(mount_bind(root))
+ stack.enter_context(mount_loop(args, image.root, root / "usr", root_read_only))
else:
- mount_loop(args, image.root, root, root_read_only)
+ stack.enter_context(mount_loop(args, image.root, root, root_read_only))
else:
# always have a root of the tree as a mount point so we can
# recursively unmount anything that ends up mounted there
- mount_bind(root)
+ stack.enter_context(mount_bind(root))
if image.home is not None:
- mount_loop(args, image.home, root / "home")
+ stack.enter_context(mount_loop(args, image.home, root / "home"))
if image.srv is not None:
- mount_loop(args, image.srv, root / "srv")
+ stack.enter_context(mount_loop(args, image.srv, root / "srv"))
if image.var is not None:
- mount_loop(args, image.var, root / "var")
+ stack.enter_context(mount_loop(args, image.var, root / "var"))
if image.tmp is not None:
- mount_loop(args, image.tmp, root / "var/tmp")
+ stack.enter_context(mount_loop(args, image.tmp, root / "var/tmp"))
if loopdev is not None:
assert args.partition_table is not None
path = args.partition_table.partition_path(PartitionIdentifier.esp, loopdev)
if path:
- mount_loop(args, path, root / "efi")
+ stack.enter_context(mount_loop(args, path, root / "efi"))
path = args.partition_table.partition_path(PartitionIdentifier.xbootldr, loopdev)
if path:
- mount_loop(args, path, root / "boot")
+ stack.enter_context(mount_loop(args, path, root / "boot"))
# Make sure /tmp and /run are not part of the image
- mount_tmpfs(root / "run")
- mount_tmpfs(root / "tmp")
+ stack.enter_context(mount_tmpfs(root / "run"))
+ stack.enter_context(mount_tmpfs(root / "tmp"))
- try:
- yield
- finally:
- if realroot is not None:
- assert workdir is not None
- clean_up_overlay(root, realroot, workdir)
+ if do_run_build_script and args.include_dir and not cached:
+ stack.enter_context(mount_bind(args.include_dir, root / "usr/include"))
- with complete_step("Unmounting image"):
- umount(root)
+ yield
def install_etc_locale(args: MkosiArgs, root: Path, cached: bool) -> None:
def mount_api_vfs(args: MkosiArgs, root: Path) -> Iterator[None]:
subdirs = ("proc", "dev", "sys")
- with complete_step("Mounting API VFS"):
+ with complete_step("Mounting API VFS…", "Unmounting API VFS…"), contextlib.ExitStack() as stack:
for subdir in subdirs:
- mount_bind(Path("/") / subdir, root / subdir)
- try:
+ stack.enter_context(mount_bind(Path("/") / subdir, root / subdir))
+
yield
- finally:
- with complete_step("Unmounting API VFS"):
- for subdir in subdirs:
- umount(root / subdir)
@contextlib.contextmanager
yield
return
- caches = []
-
# We can't do this in mount_image() yet, as /var itself might have to be created as a subvolume first
- with complete_step("Mounting Package Cache"):
+ with complete_step("Mounting Package Cache", "Unmounting Package Cache"), contextlib.ExitStack() as stack:
if args.distribution in (Distribution.fedora, Distribution.mageia, Distribution.openmandriva):
- caches = [mount_bind(args.cache_path, root / "var/cache/dnf")]
+ stack.enter_context(mount_bind(args.cache_path, root / "var/cache/dnf"))
elif is_centos_variant(args.distribution):
# We mount both the YUM and the DNF cache in this case, as
# YUM might just be redirected to DNF even if we invoke
# the former
- caches = [
- mount_bind(args.cache_path / "yum", root / "var/cache/yum"),
- mount_bind(args.cache_path / "dnf", root / "var/cache/dnf"),
- ]
+ stack.enter_context(mount_bind(args.cache_path / "yum", root / "var/cache/yum"))
+ stack.enter_context(mount_bind(args.cache_path / "dnf", root / "var/cache/dnf"))
elif args.distribution in (Distribution.debian, Distribution.ubuntu):
- caches = [mount_bind(args.cache_path, root / "var/cache/apt/archives")]
+ stack.enter_context(mount_bind(args.cache_path, root / "var/cache/apt/archives"))
elif args.distribution == Distribution.arch:
- caches = [mount_bind(args.cache_path, root / "var/cache/pacman/pkg")]
+ stack.enter_context(mount_bind(args.cache_path, root / "var/cache/pacman/pkg"))
elif args.distribution == Distribution.gentoo:
- caches = [mount_bind(args.cache_path, root / "var/cache/binpkgs")]
+ stack.enter_context(mount_bind(args.cache_path, root / "var/cache/binpkgs"))
elif args.distribution == Distribution.opensuse:
- caches = [mount_bind(args.cache_path, root / "var/cache/zypp/packages")]
- try:
- yield
- finally:
- with complete_step("Unmounting Package Cache"):
- for d in caches: # NOQA: E501
- umount(d)
+ stack.enter_context(mount_bind(args.cache_path, root / "var/cache/zypp/packages"))
-
-def umount(where: Path) -> None:
- run(["umount", "--recursive", "-n", where])
+ yield
def configure_dracut(args: MkosiArgs, root: Path, do_run_build_script: bool, cached: bool) -> None:
if do_run_build_script:
root_home(args, root).joinpath("dest").mkdir(mode=0o755)
- if args.include_dir is not None:
- root.joinpath("usr").mkdir(mode=0o755)
- root.joinpath("usr/include").mkdir(mode=0o755)
-
if args.build_dir is not None:
root_home(args, root).joinpath("build").mkdir(0o755)
# dir if we still have to generate the root image here
prepare_tree_root(args, root)
- with mount_image(args, root, base_image, loopdev, encrypted.without_generated_root(args)):
+ with mount_image(args, root, do_run_build_script, cached, base_image, loopdev, encrypted.without_generated_root(args)):
prepare_tree(args, root, do_run_build_script, cached)
- if do_run_build_script and args.include_dir and not cached:
- empty_directory(args.include_dir)
- # We do a recursive unmount of root so we don't need to explicitly unmount this mount
- # later.
- mount_bind(args.include_dir, root / "usr/include")
-
cached_tree = reuse_cache_tree(args, root, do_run_build_script, for_cache, cached)
install_skeleton_trees(args, root, cached_tree)
install_distribution(args, root, do_run_build_script, cached_tree)
# This time we mount read-only, as we already generated
# the verity data, and hence really shouldn't modify the
# image anymore.
- mount = lambda: mount_image(args, root, base_image, loopdev,
+ mount = lambda: mount_image(args, root, do_run_build_script, cached, base_image, loopdev,
encrypted.without_generated_root(args),
root_read_only=True)