From: Daan De Meyer Date: Sun, 11 Feb 2024 09:59:57 +0000 (+0100) Subject: tree-wide: Introduce SandboxProtocol X-Git-Tag: v21~59^2~5 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=35f7d88a7469e7aa8a7abf5828e5a77288691f63;p=thirdparty%2Fmkosi.git tree-wide: Introduce SandboxProtocol Instead of passing a full sandbox command into the functions from tree.py, archive.py, kmod.py and partition.py, let's instead pass in a function that creates a sandbox, so we can pass in the required options from the functions themselves. This reduces duplication a lot as we don't have to specify all the sandbox options at each callsite. --- diff --git a/mkosi/__init__.py b/mkosi/__init__.py index 72ca3ba48..47cfa1ecb 100644 --- a/mkosi/__init__.py +++ b/mkosi/__init__.py @@ -105,11 +105,7 @@ def mount_base_trees(context: Context) -> Iterator[None]: if path.is_dir(): bases += [path] elif path.suffix == ".tar": - extract_tar( - path, d, - tools=context.config.tools(), - sandbox=context.sandbox(options=["--ro-bind", path, path, "--bind", d.parent, d.parent]), - ) + extract_tar(path, d, tools=context.config.tools(), sandbox=context.sandbox) bases += [d] elif path.suffix == ".raw": run(["systemd-dissect", "-M", path, d]) @@ -131,8 +127,7 @@ def remove_files(context: Context) -> None: with complete_step("Removing files…"): for pattern in context.config.remove_files: - rmtree(*context.root.glob(pattern.lstrip("/")), - sandbox=context.sandbox(options=["--bind", context.root, context.root])) + rmtree(*context.root.glob(pattern.lstrip("/")), sandbox=context.sandbox) def install_distribution(context: Context) -> None: @@ -1346,18 +1341,13 @@ def install_tree( preserve=preserve, use_subvolumes=context.config.use_subvolumes, tools=context.config.tools(), - sandbox=context.sandbox(options=["--ro-bind", src, src, "--bind", t.parent, t.parent]), + sandbox=context.sandbox, ) if src.is_dir() or (src.is_file() and target): copy() elif src.suffix == ".tar": - extract_tar( - src, t, - tools=context.config.tools(), - # Make sure tar uses user/group information from the root directory instead of the host. - sandbox=context.sandbox(options=["--bind", dst, dst, *finalize_passwd_mounts(dst)]), - ) + extract_tar(src, t, tools=context.config.tools(), sandbox=context.sandbox) elif src.suffix == ".raw": run( ["systemd-dissect", "--copy-from", src, "/", t], @@ -1412,7 +1402,12 @@ def install_package_directories(context: Context) -> None: with complete_step("Copying in extra packages…"): for d in context.config.package_directories: - install_tree(context, d, context.packages) + copy_tree( + d, context.packages, + use_subvolumes=context.config.use_subvolumes, + tools=context.config.tools(), + sandbox=context.sandbox, + ) if context.want_local_repo(): with complete_step("Building local package repository"): @@ -1433,7 +1428,12 @@ def install_build_dest(context: Context) -> None: return with complete_step("Copying in build tree…"): - install_tree(context, context.install_dir, context.root) + copy_tree( + context.install_dir, context.root, + use_subvolumes=context.config.use_subvolumes, + tools=context.config.tools(), + sandbox=context.sandbox, + ) def gzip_binary(context: Context) -> str: @@ -1655,11 +1655,7 @@ def build_microcode_initrd(context: Context) -> Optional[Path]: for p in intel.iterdir(): f.write(p.read_bytes()) - make_cpio( - root, microcode, - tools=context.config.tools(), - sandbox=context.sandbox(options=["--ro-bind", root, root]), - ) + make_cpio(root, microcode, tools=context.config.tools(), sandbox=context.sandbox) return microcode @@ -1676,10 +1672,10 @@ def build_kernel_modules_initrd(context: Context, kver: str) -> Path: include=context.config.kernel_modules_initrd_include, exclude=context.config.kernel_modules_initrd_exclude, host=context.config.kernel_modules_initrd_include_host, - sandbox=context.sandbox(options=["--ro-bind", context.root, context.root]), + sandbox=context.sandbox, ), tools=context.config.tools(), - sandbox=context.sandbox(options=["--ro-bind", context.root, context.root]), + sandbox=context.sandbox, ) # Debian/Ubuntu do not compress their kernel modules, so we compress the initramfs instead. Note that @@ -1966,14 +1962,7 @@ def install_uki(context: Context, partitions: Sequence[Partition]) -> None: def make_uki(context: Context, stub: Path, kver: str, kimg: Path, output: Path) -> None: microcode = build_microcode_initrd(context) - make_cpio( - context.root, context.workspace / "initrd", - tools=context.config.tools(), - sandbox=context.sandbox( - # Make sure cpio uses user/group information from the root directory instead of the host. - options=["--ro-bind", context.root, context.root, *finalize_passwd_mounts(context.root)], - ), - ) + make_cpio(context.root, context.workspace / "initrd", tools=context.config.tools(), sandbox=context.sandbox) maybe_compress(context, context.config.compress_output, context.workspace / "initrd", context.workspace / "initrd") initrds = [microcode] if microcode else [] @@ -2004,7 +1993,7 @@ def maybe_compress(context: Context, compression: Compression, src: Path, dst: O src, dst, use_subvolumes=context.config.use_subvolumes, tools=context.config.tools(), - sandbox=context.sandbox(options=["--bind", src.parent, src.parent, "--bind", dst.parent, dst.parent]), + sandbox=context.sandbox, ) return @@ -2375,7 +2364,7 @@ def run_depmod(context: Context, *, force: bool = False) -> None: include=context.config.kernel_modules_include, exclude=context.config.kernel_modules_exclude, host=context.config.kernel_modules_include_host, - sandbox=context.sandbox(options=["--ro-bind", context.root, context.root]), + sandbox=context.sandbox, ) with complete_step(f"Running depmod for {kver}"): @@ -2534,7 +2523,7 @@ def save_cache(context: Context) -> None: final, build, manifest = cache_tree_paths(context.config) with complete_step("Installing cache copies"): - rmtree(final, sandbox=context.sandbox(options=["--bind", final.parent, final.parent])) + rmtree(final, sandbox=context.sandbox) # We only use the cache-overlay directory for caching if we have a base tree, otherwise we just # cache the root directory. @@ -2543,37 +2532,22 @@ def save_cache(context: Context) -> None: context.workspace / "cache-overlay", final, use_subvolumes=context.config.use_subvolumes, tools=context.config.tools(), - sandbox=context.sandbox( - options=[ - "--bind", context.workspace, context.workspace, - "--bind", final.parent, final.parent, - ], - ), + sandbox=context.sandbox, ) else: move_tree( context.root, final, use_subvolumes=context.config.use_subvolumes, - sandbox=context.sandbox( - options=[ - "--bind", context.root.parent, context.root.parent, - "--bind", final.parent, final.parent, - ], - ), + sandbox=context.sandbox, ) if need_build_overlay(context.config) and (context.workspace / "build-overlay").exists(): - rmtree(build, sandbox=context.sandbox(options=["--bind", build.parent, build.parent])) + rmtree(build, sandbox=context.sandbox) move_tree( context.workspace / "build-overlay", build, use_subvolumes=context.config.use_subvolumes, tools=context.config.tools(), - sandbox=context.sandbox( - options=[ - "--bind", context.workspace, context.workspace, - "--bind", build.parent, build.parent, - ], - ), + sandbox=context.sandbox, ) manifest.write_text( @@ -2618,7 +2592,13 @@ def reuse_cache(context: Context) -> bool: final, build, _ = cache_tree_paths(context.config) with complete_step("Copying cached trees"): - install_tree(context, final, context.root) + copy_tree( + final, context.root, + use_subvolumes=context.config.use_subvolumes, + tools=context.config.tools(), + sandbox=context.sandbox, + ) + if need_build_overlay(context.config): (context.workspace / "build-overlay").symlink_to(build) @@ -2898,12 +2878,7 @@ def finalize_staging(context: Context) -> None: f, context.config.output_dir_or_cwd(), use_subvolumes=context.config.use_subvolumes, tools=context.config.tools(), - sandbox=context.sandbox( - options=[ - "--bind", context.staging, context.staging, - "--bind", context.config.output_dir_or_cwd(), context.config.output_dir_or_cwd(), - ], - ), + sandbox=context.sandbox, ) @@ -2926,10 +2901,7 @@ def normalize_mtime(root: Path, mtime: Optional[int], directory: Optional[Path] def setup_workspace(args: Args, config: Config) -> Iterator[Path]: with contextlib.ExitStack() as stack: workspace = Path(tempfile.mkdtemp(dir=config.workspace_dir_or_default(), prefix="mkosi-workspace")) - sandbox = config.sandbox( - options=["--bind", config.workspace_dir_or_default(), config.workspace_dir_or_default()], - ) - stack.callback(lambda: rmtree(workspace, sandbox=sandbox)) + stack.callback(lambda: rmtree(workspace, sandbox=config.sandbox)) (workspace / "tmp").mkdir(mode=0o1777) with scopedenv({"TMPDIR" : os.fspath(workspace / "tmp")}): @@ -2975,14 +2947,15 @@ def copy_repository_metadata(context: Context) -> None: with umask(~0o755): dst.mkdir(parents=True, exist_ok=True) + def sandbox(*, options: Sequence[PathString]) -> list[PathString]: + return context.sandbox(options=[*options, *exclude]) + with flock(src): copy_tree( src, dst, tools=context.config.tools(), preserve=False, - sandbox=context.sandbox( - options=["--ro-bind", src, src, "--bind", dst.parent, dst.parent, *exclude] - ), + sandbox=sandbox, ) @@ -3079,19 +3052,13 @@ def build_image(context: Context) -> None: make_tar( context.root, context.staging / context.config.output_with_format, tools=context.config.tools(), - # Make sure tar uses user/group information from the root directory instead of the host. - sandbox=context.sandbox( - options=["--ro-bind", context.root, context.root, *finalize_passwd_mounts(context.root)], - ), + sandbox=context.sandbox, ) elif context.config.output_format == OutputFormat.cpio: make_cpio( context.root, context.staging / context.config.output_with_format, tools=context.config.tools(), - # Make sure cpio uses user/group information from the root directory instead of the host. - sandbox=context.sandbox( - options=["--ro-bind", context.root, context.root, *finalize_passwd_mounts(context.root)], - ), + sandbox=context.sandbox, ) elif context.config.output_format == OutputFormat.uki: assert stub and kver and kimg diff --git a/mkosi/archive.py b/mkosi/archive.py index a681d5a12..d33d0f058 100644 --- a/mkosi/archive.py +++ b/mkosi/archive.py @@ -1,13 +1,13 @@ # SPDX-License-Identifier: LGPL-2.1+ import os -from collections.abc import Iterable, Sequence +from collections.abc import Iterable from pathlib import Path from typing import Optional from mkosi.log import log_step from mkosi.run import find_binary, run -from mkosi.types import PathString +from mkosi.sandbox import SandboxProtocol, finalize_passwd_mounts, nosandbox def tar_binary(*, tools: Path = Path("/")) -> str: @@ -36,7 +36,7 @@ def tar_exclude_apivfs_tmp() -> list[str]: ] -def make_tar(src: Path, dst: Path, *, tools: Path = Path("/"), sandbox: Sequence[PathString] = ()) -> None: +def make_tar(src: Path, dst: Path, *, tools: Path = Path("/"), sandbox: SandboxProtocol = nosandbox) -> None: log_step(f"Creating tar archive {dst}…") with dst.open("wb") as f: @@ -59,7 +59,8 @@ def make_tar(src: Path, dst: Path, *, tools: Path = Path("/"), sandbox: Sequence ".", ], stdout=f, - sandbox=sandbox, + # Make sure tar uses user/group information from the root directory instead of the host. + sandbox=sandbox(options=["--ro-bind", src, src, *finalize_passwd_mounts(src)]), ) @@ -69,7 +70,7 @@ def extract_tar( *, log: bool = True, tools: Path = Path("/"), - sandbox: Sequence[PathString] = (), + sandbox: SandboxProtocol = nosandbox, ) -> None: if log: log_step(f"Extracting tar archive {src}…") @@ -93,7 +94,10 @@ def extract_tar( *tar_exclude_apivfs_tmp(), ], stdin=f, - sandbox=sandbox, + sandbox=sandbox( + # Make sure tar uses user/group information from the root directory instead of the host. + options=["--ro-bind", src, src, "--bind", dst.parent, dst.parent, *finalize_passwd_mounts(dst)] + ), ) @@ -103,7 +107,7 @@ def make_cpio( *, files: Optional[Iterable[Path]] = None, tools: Path = Path("/"), - sandbox: Sequence[PathString] = (), + sandbox: SandboxProtocol = nosandbox, ) -> None: if not files: files = src.rglob("*") @@ -124,6 +128,5 @@ def make_cpio( ], input="\0".join(os.fspath(f.relative_to(src)) for f in files), stdout=f, - # Make sure cpio uses user/group information from the root directory instead of the host. - sandbox=sandbox, + sandbox=sandbox(options=["--ro-bind", src, src, *finalize_passwd_mounts(src)]), ) diff --git a/mkosi/context.py b/mkosi/context.py index 799e00b8e..207d6b723 100644 --- a/mkosi/context.py +++ b/mkosi/context.py @@ -39,7 +39,7 @@ class Context: self.root, use_subvolumes=self.config.use_subvolumes, tools=config.tools(), - sandbox=config.sandbox(options=["--bind", self.workspace, self.workspace]), + sandbox=config.sandbox, ) self.staging.mkdir() diff --git a/mkosi/distributions/centos.py b/mkosi/distributions/centos.py index 9288237fd..12098cf02 100644 --- a/mkosi/distributions/centos.py +++ b/mkosi/distributions/centos.py @@ -27,7 +27,7 @@ def move_rpm_db(context: Context) -> None: if newdb.exists() and not newdb.is_symlink(): with complete_step("Moving rpm database /usr/lib/sysimage/rpm → /var/lib/rpm"): - rmtree(olddb, sandbox=context.sandbox(options=["--bind", olddb.parent, olddb.parent])) + rmtree(olddb, sandbox=context.sandbox) shutil.move(newdb, olddb) newdb.symlink_to(os.path.relpath(olddb, start=newdb.parent)) diff --git a/mkosi/distributions/debian.py b/mkosi/distributions/debian.py index 6d4a232b4..f19eac133 100644 --- a/mkosi/distributions/debian.py +++ b/mkosi/distributions/debian.py @@ -13,7 +13,6 @@ from mkosi.installer import PackageManager from mkosi.installer.apt import Apt from mkosi.log import die from mkosi.run import run -from mkosi.sandbox import finalize_passwd_mounts from mkosi.util import umask @@ -183,10 +182,7 @@ class Installer(DistributionInstaller): Path(o.name), context.root, log=False, tools=context.config.tools(), - # Make sure tar uses user/group information from the root directory instead of the host. - sandbox=context.sandbox( - options=["--bind", context.root, context.root, *finalize_passwd_mounts(context.root)], - ), + sandbox=context.sandbox, ) # Finally, run apt to properly install packages in the chroot without having to worry that maintainer diff --git a/mkosi/installer/__init__.py b/mkosi/installer/__init__.py index d79049745..3e3cc4a50 100644 --- a/mkosi/installer/__init__.py +++ b/mkosi/installer/__init__.py @@ -87,7 +87,7 @@ def clean_package_manager_metadata(context: Context) -> None: dst = context.workspace / "package-cache-dir" / d / subdir dst.mkdir(parents=True, exist_ok=True) - copy_tree(src, dst, sandbox=context.sandbox(options=["--ro-bind", src, src, "--bind", dst, dst])) + copy_tree(src, dst, sandbox=context.sandbox) context.package_cache_dir = context.workspace / "package-cache-dir" @@ -103,5 +103,4 @@ def clean_package_manager_metadata(context: Context) -> None: ("dpkg", ["var/lib/dpkg"]), (executable, [f"var/lib/{subdir}", f"var/cache/{subdir}"])): if always or not find_binary(tool, root=context.root): - rmtree(*(context.root / p for p in paths), - sandbox=context.sandbox(options=["--bind", context.root, context.root])) + rmtree(*(context.root / p for p in paths), sandbox=context.sandbox) diff --git a/mkosi/installer/rpm.py b/mkosi/installer/rpm.py index f388382e9..7f16c56fb 100644 --- a/mkosi/installer/rpm.py +++ b/mkosi/installer/rpm.py @@ -60,7 +60,7 @@ def fixup_rpmdb_location(context: Context) -> None: rpmdb = context.root / "usr/lib/sysimage/rpm" if not rpmdb.exists(): rpmdb = context.root / "var/lib/rpm" - rmtree(rpmdb, sandbox=context.sandbox(options=["--bind", rpmdb.parent, rpmdb.parent])) + rmtree(rpmdb, sandbox=context.sandbox) shutil.move(rpmdb_home, rpmdb) rpmdb_home.symlink_to(os.path.relpath(rpmdb, start=rpmdb_home.parent)) diff --git a/mkosi/kmod.py b/mkosi/kmod.py index ee49c25d5..9b2eeeb45 100644 --- a/mkosi/kmod.py +++ b/mkosi/kmod.py @@ -9,7 +9,7 @@ from pathlib import Path from mkosi.log import complete_step, log_step from mkosi.run import run -from mkosi.types import PathString +from mkosi.sandbox import SandboxProtocol, nosandbox def loaded_modules() -> list[str]: @@ -62,7 +62,7 @@ def resolve_module_dependencies( kver: str, modules: Sequence[str], *, - sandbox: Sequence[PathString] = (), + sandbox: SandboxProtocol = nosandbox, ) -> tuple[set[Path], set[Path]]: """ Returns a tuple of lists containing the paths to the module and firmware dependencies of the given list @@ -84,8 +84,11 @@ def resolve_module_dependencies( info = "" for i in range(0, len(nametofile.keys()), 8500): chunk = list(nametofile.keys())[i:i+8500] - info += run(["modinfo", "--basedir", root, "--set-version", kver, "--null", *chunk], - stdout=subprocess.PIPE, sandbox=sandbox).stdout.strip() + info += run( + ["modinfo", "--basedir", root, "--set-version", kver, "--null", *chunk], + stdout=subprocess.PIPE, + sandbox=sandbox(options=["--ro-bind", root, root]) + ).stdout.strip() log_step("Calculating required kernel modules and firmware") @@ -149,7 +152,7 @@ def gen_required_kernel_modules( include: Sequence[str], exclude: Sequence[str], host: bool, - sandbox: Sequence[PathString] = (), + sandbox: SandboxProtocol = nosandbox, ) -> Iterator[Path]: modulesd = root / "usr/lib/modules" / kver modules = filter_kernel_modules(root, kver, include=include, exclude=exclude, host=host) @@ -187,7 +190,7 @@ def process_kernel_modules( include: Sequence[str], exclude: Sequence[str], host: bool, - sandbox: Sequence[PathString] = (), + sandbox: SandboxProtocol = nosandbox, ) -> None: if not include and not exclude: return diff --git a/mkosi/partition.py b/mkosi/partition.py index c5146074f..f2fd3cc4a 100644 --- a/mkosi/partition.py +++ b/mkosi/partition.py @@ -7,7 +7,7 @@ from typing import Any, Optional from mkosi.log import die from mkosi.run import run -from mkosi.types import PathString +from mkosi.sandbox import SandboxProtocol, nosandbox @dataclasses.dataclass(frozen=True) @@ -31,10 +31,15 @@ class Partition: GRUB_BOOT_PARTITION_UUID = "21686148-6449-6e6f-744e-656564454649" -def find_partitions(image: Path, *, sandbox: Sequence[PathString]) -> list[Partition]: - output = json.loads(run(["systemd-repart", "--json=short", image], - stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, - sandbox=sandbox).stdout) +def find_partitions(image: Path, *, sandbox: SandboxProtocol = nosandbox) -> list[Partition]: + output = json.loads( + run( + ["systemd-repart", "--json=short", image], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + sandbox=sandbox(options=["--ro-bind", image, image]), + ).stdout + ) return [Partition.from_dict(d) for d in output] diff --git a/mkosi/qemu.py b/mkosi/qemu.py index 1e1659abd..40c99c309 100644 --- a/mkosi/qemu.py +++ b/mkosi/qemu.py @@ -451,7 +451,7 @@ def copy_ephemeral(config: Config, src: Path) -> Iterator[Path]: preserve=config.output_format == OutputFormat.directory, use_subvolumes=config.use_subvolumes, tools=config.tools(), - sandbox=config.sandbox(options=["--ro-bind", src, src, "--bind", tmp.parent, tmp.parent]), + sandbox=config.sandbox, ) fork_and_wait(copy) @@ -461,7 +461,7 @@ def copy_ephemeral(config: Config, src: Path) -> Iterator[Path]: if config.output_format == OutputFormat.directory: become_root() - rmtree(tmp, sandbox=config.sandbox(options=["--ro-bind", src, src, "--bind", tmp.parent, tmp.parent])) + rmtree(tmp, sandbox=config.sandbox) fork_and_wait(rm) @@ -724,9 +724,7 @@ def run_qemu(args: Args, config: Config) -> None: elif config.output_format == OutputFormat.disk: # We can't rely on gpt-auto-generator when direct kernel booting so synthesize a root= # kernel argument instead. - root = finalize_root( - find_partitions(fname, sandbox=config.sandbox(options=["--ro-bind", fname, fname])) - ) + root = finalize_root(find_partitions(fname, sandbox=config.sandbox)) if not root: die("Cannot perform a direct kernel boot without a root or usr partition") diff --git a/mkosi/sandbox.py b/mkosi/sandbox.py index 527bdca46..d95807dd8 100644 --- a/mkosi/sandbox.py +++ b/mkosi/sandbox.py @@ -5,13 +5,21 @@ import os import uuid from collections.abc import Sequence from pathlib import Path -from typing import Optional +from typing import Optional, Protocol from mkosi.types import PathString from mkosi.user import INVOKING_USER from mkosi.util import flatten, one_zero +class SandboxProtocol(Protocol): + def __call__(self, *, options: Sequence[PathString]) -> list[PathString]: ... + + +def nosandbox(*, options: Sequence[PathString]) -> list[PathString]: + return [] + + # https://github.com/torvalds/linux/blob/master/include/uapi/linux/capability.h class Capability(enum.Enum): CAP_NET_ADMIN = 12 diff --git a/mkosi/tree.py b/mkosi/tree.py index 2bee42ef5..e744909dd 100644 --- a/mkosi/tree.py +++ b/mkosi/tree.py @@ -5,21 +5,23 @@ import errno import shutil import subprocess import tempfile -from collections.abc import Iterator, Sequence +from collections.abc import Iterator from pathlib import Path from mkosi.config import ConfigFeature from mkosi.log import die from mkosi.run import find_binary, run +from mkosi.sandbox import SandboxProtocol, nosandbox from mkosi.types import PathString +from mkosi.util import flatten -def statfs(path: Path, *, sandbox: Sequence[PathString] = ()) -> str: +def statfs(path: Path, *, sandbox: SandboxProtocol = nosandbox) -> str: return run(["stat", "--file-system", "--format", "%T", path], - sandbox=sandbox, stdout=subprocess.PIPE).stdout.strip() + sandbox=sandbox(options=["--ro-bind", path, path]), stdout=subprocess.PIPE).stdout.strip() -def is_subvolume(path: Path, *, sandbox: Sequence[PathString] = ()) -> bool: +def is_subvolume(path: Path, *, sandbox: SandboxProtocol = nosandbox) -> bool: return path.is_dir() and statfs(path, sandbox=sandbox) == "btrfs" and path.stat().st_ino == 256 @@ -28,7 +30,7 @@ def make_tree( *, use_subvolumes: ConfigFeature = ConfigFeature.disabled, tools: Path = Path("/"), - sandbox: Sequence[PathString] = (), + sandbox: SandboxProtocol = nosandbox, ) -> Path: if use_subvolumes == ConfigFeature.enabled and not find_binary("btrfs", root=tools): die("Subvolumes requested but the btrfs command was not found") @@ -42,7 +44,8 @@ def make_tree( if use_subvolumes != ConfigFeature.disabled and find_binary("btrfs", root=tools) is not None: result = run(["btrfs", "subvolume", "create", path], - sandbox=sandbox, check=use_subvolumes == ConfigFeature.enabled).returncode + sandbox=sandbox(options=["--bind", path.parent, path.parent]), + check=use_subvolumes == ConfigFeature.enabled).returncode else: result = 1 @@ -75,7 +78,7 @@ def copy_tree( dereference: bool = False, use_subvolumes: ConfigFeature = ConfigFeature.disabled, tools: Path = Path("/"), - sandbox: Sequence[PathString] = (), + sandbox: SandboxProtocol = nosandbox, ) -> Path: subvolume = (use_subvolumes == ConfigFeature.enabled or use_subvolumes == ConfigFeature.auto and find_binary("btrfs", root=tools) is not None) @@ -91,6 +94,7 @@ def copy_tree( "--reflink=auto", src, dst, ] + options: list[PathString] = ["--ro-bind", src, src, "--bind", dst.parent, dst.parent] # If the source and destination are both directories, we want to merge the source directory with the # destination directory. If the source if a file and the destination is a directory, we want to copy @@ -111,7 +115,7 @@ def copy_tree( if not preserve else contextlib.nullcontext() ): - run(copy, sandbox=sandbox) + run(copy, sandbox=sandbox(options=options)) return dst # btrfs can't snapshot to an existing directory so make sure the destination does not exist. @@ -119,21 +123,22 @@ def copy_tree( dst.rmdir() result = run(["btrfs", "subvolume", "snapshot", src, dst], - check=use_subvolumes == ConfigFeature.enabled, sandbox=sandbox).returncode + check=use_subvolumes == ConfigFeature.enabled, sandbox=sandbox(options=options)).returncode if result != 0: with ( preserve_target_directories_stat(src, dst) if not preserve else contextlib.nullcontext() ): - run(copy, sandbox=sandbox) + run(copy, sandbox=sandbox(options=options)) return dst -def rmtree(*paths: Path, sandbox: Sequence[PathString] = ()) -> None: +def rmtree(*paths: Path, sandbox: SandboxProtocol = nosandbox) -> None: if paths: - run(["rm", "-rf", "--", *paths], sandbox=sandbox) + run(["rm", "-rf", "--", *paths], + sandbox=sandbox(options=flatten(["--bind", p.parent, p.parent] for p in paths))) def move_tree( @@ -142,7 +147,7 @@ def move_tree( *, use_subvolumes: ConfigFeature = ConfigFeature.disabled, tools: Path = Path("/"), - sandbox: Sequence[PathString] = (), + sandbox: SandboxProtocol = nosandbox ) -> Path: if src == dst: return dst