]> git.ipfire.org Git - thirdparty/mkosi.git/commitdiff
tree-wide: Introduce SandboxProtocol
authorDaan De Meyer <daan.j.demeyer@gmail.com>
Sun, 11 Feb 2024 09:59:57 +0000 (10:59 +0100)
committerDaan De Meyer <daan.j.demeyer@gmail.com>
Sun, 11 Feb 2024 10:01:27 +0000 (11:01 +0100)
Instead of passing a full sandbox command into the functions from
tree.py, archive.py, kmod.py and partition.py, let's instead pass
in a function that creates a sandbox, so we can pass in the required
options from the functions themselves. This reduces duplication a lot
as we don't have to specify all the sandbox options at each callsite.

12 files changed:
mkosi/__init__.py
mkosi/archive.py
mkosi/context.py
mkosi/distributions/centos.py
mkosi/distributions/debian.py
mkosi/installer/__init__.py
mkosi/installer/rpm.py
mkosi/kmod.py
mkosi/partition.py
mkosi/qemu.py
mkosi/sandbox.py
mkosi/tree.py

index 72ca3ba48f77174788f8627c40360494dffaed21..47cfa1ecb9a829738403687c064441cc4ef4d2a4 100644 (file)
@@ -105,11 +105,7 @@ def mount_base_trees(context: Context) -> Iterator[None]:
             if path.is_dir():
                 bases += [path]
             elif path.suffix == ".tar":
-                extract_tar(
-                    path, d,
-                    tools=context.config.tools(),
-                    sandbox=context.sandbox(options=["--ro-bind", path, path, "--bind", d.parent, d.parent]),
-                )
+                extract_tar(path, d, tools=context.config.tools(), sandbox=context.sandbox)
                 bases += [d]
             elif path.suffix == ".raw":
                 run(["systemd-dissect", "-M", path, d])
@@ -131,8 +127,7 @@ def remove_files(context: Context) -> None:
 
     with complete_step("Removing files…"):
         for pattern in context.config.remove_files:
-            rmtree(*context.root.glob(pattern.lstrip("/")),
-                   sandbox=context.sandbox(options=["--bind", context.root, context.root]))
+            rmtree(*context.root.glob(pattern.lstrip("/")), sandbox=context.sandbox)
 
 
 def install_distribution(context: Context) -> None:
@@ -1346,18 +1341,13 @@ def install_tree(
             preserve=preserve,
             use_subvolumes=context.config.use_subvolumes,
             tools=context.config.tools(),
-            sandbox=context.sandbox(options=["--ro-bind", src, src, "--bind", t.parent, t.parent]),
+            sandbox=context.sandbox,
         )
 
     if src.is_dir() or (src.is_file() and target):
         copy()
     elif src.suffix == ".tar":
-        extract_tar(
-            src, t,
-            tools=context.config.tools(),
-            # Make sure tar uses user/group information from the root directory instead of the host.
-            sandbox=context.sandbox(options=["--bind", dst, dst, *finalize_passwd_mounts(dst)]),
-        )
+        extract_tar(src, t, tools=context.config.tools(), sandbox=context.sandbox)
     elif src.suffix == ".raw":
         run(
             ["systemd-dissect", "--copy-from", src, "/", t],
@@ -1412,7 +1402,12 @@ def install_package_directories(context: Context) -> None:
 
     with complete_step("Copying in extra packages…"):
         for d in context.config.package_directories:
-            install_tree(context, d, context.packages)
+            copy_tree(
+                d, context.packages,
+                use_subvolumes=context.config.use_subvolumes,
+                tools=context.config.tools(),
+                sandbox=context.sandbox,
+            )
 
     if context.want_local_repo():
         with complete_step("Building local package repository"):
@@ -1433,7 +1428,12 @@ def install_build_dest(context: Context) -> None:
         return
 
     with complete_step("Copying in build tree…"):
-        install_tree(context, context.install_dir, context.root)
+        copy_tree(
+            context.install_dir, context.root,
+            use_subvolumes=context.config.use_subvolumes,
+            tools=context.config.tools(),
+            sandbox=context.sandbox,
+        )
 
 
 def gzip_binary(context: Context) -> str:
@@ -1655,11 +1655,7 @@ def build_microcode_initrd(context: Context) -> Optional[Path]:
                 for p in intel.iterdir():
                     f.write(p.read_bytes())
 
-    make_cpio(
-        root, microcode,
-        tools=context.config.tools(),
-        sandbox=context.sandbox(options=["--ro-bind", root, root]),
-    )
+    make_cpio(root, microcode, tools=context.config.tools(), sandbox=context.sandbox)
 
     return microcode
 
@@ -1676,10 +1672,10 @@ def build_kernel_modules_initrd(context: Context, kver: str) -> Path:
             include=context.config.kernel_modules_initrd_include,
             exclude=context.config.kernel_modules_initrd_exclude,
             host=context.config.kernel_modules_initrd_include_host,
-            sandbox=context.sandbox(options=["--ro-bind", context.root, context.root]),
+            sandbox=context.sandbox,
         ),
         tools=context.config.tools(),
-        sandbox=context.sandbox(options=["--ro-bind", context.root, context.root]),
+        sandbox=context.sandbox,
     )
 
     # Debian/Ubuntu do not compress their kernel modules, so we compress the initramfs instead. Note that
@@ -1966,14 +1962,7 @@ def install_uki(context: Context, partitions: Sequence[Partition]) -> None:
 
 def make_uki(context: Context, stub: Path, kver: str, kimg: Path, output: Path) -> None:
     microcode = build_microcode_initrd(context)
-    make_cpio(
-        context.root, context.workspace / "initrd",
-        tools=context.config.tools(),
-        sandbox=context.sandbox(
-            # Make sure cpio uses user/group information from the root directory instead of the host.
-            options=["--ro-bind", context.root, context.root, *finalize_passwd_mounts(context.root)],
-        ),
-    )
+    make_cpio(context.root, context.workspace / "initrd", tools=context.config.tools(), sandbox=context.sandbox)
     maybe_compress(context, context.config.compress_output, context.workspace / "initrd", context.workspace / "initrd")
 
     initrds = [microcode] if microcode else []
@@ -2004,7 +1993,7 @@ def maybe_compress(context: Context, compression: Compression, src: Path, dst: O
                 src, dst,
                 use_subvolumes=context.config.use_subvolumes,
                 tools=context.config.tools(),
-                sandbox=context.sandbox(options=["--bind", src.parent, src.parent, "--bind", dst.parent, dst.parent]),
+                sandbox=context.sandbox,
             )
         return
 
@@ -2375,7 +2364,7 @@ def run_depmod(context: Context, *, force: bool = False) -> None:
             include=context.config.kernel_modules_include,
             exclude=context.config.kernel_modules_exclude,
             host=context.config.kernel_modules_include_host,
-            sandbox=context.sandbox(options=["--ro-bind", context.root, context.root]),
+            sandbox=context.sandbox,
         )
 
         with complete_step(f"Running depmod for {kver}"):
@@ -2534,7 +2523,7 @@ def save_cache(context: Context) -> None:
     final, build, manifest = cache_tree_paths(context.config)
 
     with complete_step("Installing cache copies"):
-        rmtree(final, sandbox=context.sandbox(options=["--bind", final.parent, final.parent]))
+        rmtree(final, sandbox=context.sandbox)
 
         # We only use the cache-overlay directory for caching if we have a base tree, otherwise we just
         # cache the root directory.
@@ -2543,37 +2532,22 @@ def save_cache(context: Context) -> None:
                 context.workspace / "cache-overlay", final,
                 use_subvolumes=context.config.use_subvolumes,
                 tools=context.config.tools(),
-                sandbox=context.sandbox(
-                    options=[
-                        "--bind", context.workspace, context.workspace,
-                        "--bind", final.parent, final.parent,
-                    ],
-                ),
+                sandbox=context.sandbox,
             )
         else:
             move_tree(
                 context.root, final,
                 use_subvolumes=context.config.use_subvolumes,
-                sandbox=context.sandbox(
-                    options=[
-                        "--bind", context.root.parent, context.root.parent,
-                        "--bind", final.parent, final.parent,
-                    ],
-                ),
+                sandbox=context.sandbox,
             )
 
         if need_build_overlay(context.config) and (context.workspace / "build-overlay").exists():
-            rmtree(build, sandbox=context.sandbox(options=["--bind", build.parent, build.parent]))
+            rmtree(build, sandbox=context.sandbox)
             move_tree(
                 context.workspace / "build-overlay", build,
                 use_subvolumes=context.config.use_subvolumes,
                 tools=context.config.tools(),
-                sandbox=context.sandbox(
-                    options=[
-                        "--bind", context.workspace, context.workspace,
-                        "--bind", build.parent, build.parent,
-                    ],
-                ),
+                sandbox=context.sandbox,
             )
 
         manifest.write_text(
@@ -2618,7 +2592,13 @@ def reuse_cache(context: Context) -> bool:
     final, build, _ = cache_tree_paths(context.config)
 
     with complete_step("Copying cached trees"):
-        install_tree(context, final, context.root)
+        copy_tree(
+            final, context.root,
+            use_subvolumes=context.config.use_subvolumes,
+            tools=context.config.tools(),
+            sandbox=context.sandbox,
+        )
+
         if need_build_overlay(context.config):
             (context.workspace / "build-overlay").symlink_to(build)
 
@@ -2898,12 +2878,7 @@ def finalize_staging(context: Context) -> None:
             f, context.config.output_dir_or_cwd(),
             use_subvolumes=context.config.use_subvolumes,
             tools=context.config.tools(),
-            sandbox=context.sandbox(
-                options=[
-                    "--bind", context.staging, context.staging,
-                    "--bind", context.config.output_dir_or_cwd(), context.config.output_dir_or_cwd(),
-                ],
-            ),
+            sandbox=context.sandbox,
         )
 
 
@@ -2926,10 +2901,7 @@ def normalize_mtime(root: Path, mtime: Optional[int], directory: Optional[Path]
 def setup_workspace(args: Args, config: Config) -> Iterator[Path]:
     with contextlib.ExitStack() as stack:
         workspace = Path(tempfile.mkdtemp(dir=config.workspace_dir_or_default(), prefix="mkosi-workspace"))
-        sandbox = config.sandbox(
-            options=["--bind", config.workspace_dir_or_default(), config.workspace_dir_or_default()],
-        )
-        stack.callback(lambda: rmtree(workspace, sandbox=sandbox))
+        stack.callback(lambda: rmtree(workspace, sandbox=config.sandbox))
         (workspace / "tmp").mkdir(mode=0o1777)
 
         with scopedenv({"TMPDIR" : os.fspath(workspace / "tmp")}):
@@ -2975,14 +2947,15 @@ def copy_repository_metadata(context: Context) -> None:
             with umask(~0o755):
                 dst.mkdir(parents=True, exist_ok=True)
 
+            def sandbox(*, options: Sequence[PathString]) -> list[PathString]:
+                return context.sandbox(options=[*options, *exclude])
+
             with flock(src):
                 copy_tree(
                     src, dst,
                     tools=context.config.tools(),
                     preserve=False,
-                    sandbox=context.sandbox(
-                        options=["--ro-bind", src, src, "--bind", dst.parent, dst.parent, *exclude]
-                    ),
+                    sandbox=sandbox,
                 )
 
 
@@ -3079,19 +3052,13 @@ def build_image(context: Context) -> None:
         make_tar(
             context.root, context.staging / context.config.output_with_format,
             tools=context.config.tools(),
-            # Make sure tar uses user/group information from the root directory instead of the host.
-            sandbox=context.sandbox(
-                options=["--ro-bind", context.root, context.root, *finalize_passwd_mounts(context.root)],
-            ),
+            sandbox=context.sandbox,
         )
     elif context.config.output_format == OutputFormat.cpio:
         make_cpio(
             context.root, context.staging / context.config.output_with_format,
             tools=context.config.tools(),
-            # Make sure cpio uses user/group information from the root directory instead of the host.
-            sandbox=context.sandbox(
-                options=["--ro-bind", context.root, context.root, *finalize_passwd_mounts(context.root)],
-            ),
+            sandbox=context.sandbox,
         )
     elif context.config.output_format == OutputFormat.uki:
         assert stub and kver and kimg
index a681d5a1240ec767cae516d53c3e669194e092fa..d33d0f05804fb0f5a924f719668bda80fc676ef6 100644 (file)
@@ -1,13 +1,13 @@
 # SPDX-License-Identifier: LGPL-2.1+
 
 import os
-from collections.abc import Iterable, Sequence
+from collections.abc import Iterable
 from pathlib import Path
 from typing import Optional
 
 from mkosi.log import log_step
 from mkosi.run import find_binary, run
-from mkosi.types import PathString
+from mkosi.sandbox import SandboxProtocol, finalize_passwd_mounts, nosandbox
 
 
 def tar_binary(*, tools: Path = Path("/")) -> str:
@@ -36,7 +36,7 @@ def tar_exclude_apivfs_tmp() -> list[str]:
     ]
 
 
-def make_tar(src: Path, dst: Path, *, tools: Path = Path("/"), sandbox: Sequence[PathString] = ()) -> None:
+def make_tar(src: Path, dst: Path, *, tools: Path = Path("/"), sandbox: SandboxProtocol = nosandbox) -> None:
     log_step(f"Creating tar archive {dst}…")
 
     with dst.open("wb") as f:
@@ -59,7 +59,8 @@ def make_tar(src: Path, dst: Path, *, tools: Path = Path("/"), sandbox: Sequence
                 ".",
             ],
             stdout=f,
-            sandbox=sandbox,
+            # Make sure tar uses user/group information from the root directory instead of the host.
+            sandbox=sandbox(options=["--ro-bind", src, src, *finalize_passwd_mounts(src)]),
         )
 
 
@@ -69,7 +70,7 @@ def extract_tar(
     *,
     log: bool = True,
     tools: Path = Path("/"),
-    sandbox: Sequence[PathString] = (),
+    sandbox: SandboxProtocol = nosandbox,
 ) -> None:
     if log:
         log_step(f"Extracting tar archive {src}…")
@@ -93,7 +94,10 @@ def extract_tar(
                 *tar_exclude_apivfs_tmp(),
             ],
             stdin=f,
-            sandbox=sandbox,
+            sandbox=sandbox(
+                # Make sure tar uses user/group information from the root directory instead of the host.
+                options=["--ro-bind", src, src, "--bind", dst.parent, dst.parent, *finalize_passwd_mounts(dst)]
+            ),
         )
 
 
@@ -103,7 +107,7 @@ def make_cpio(
     *,
     files: Optional[Iterable[Path]] = None,
     tools: Path = Path("/"),
-    sandbox: Sequence[PathString] = (),
+    sandbox: SandboxProtocol = nosandbox,
 ) -> None:
     if not files:
         files = src.rglob("*")
@@ -124,6 +128,5 @@ def make_cpio(
             ],
             input="\0".join(os.fspath(f.relative_to(src)) for f in files),
             stdout=f,
-            # Make sure cpio uses user/group information from the root directory instead of the host.
-            sandbox=sandbox,
+            sandbox=sandbox(options=["--ro-bind", src, src, *finalize_passwd_mounts(src)]),
         )
index 799e00b8e40a0db6c569b4cbe1d6405ebb7ee9dc..207d6b72303ebc2f889ed46aafd350ec01faffdb 100644 (file)
@@ -39,7 +39,7 @@ class Context:
                     self.root,
                     use_subvolumes=self.config.use_subvolumes,
                     tools=config.tools(),
-                    sandbox=config.sandbox(options=["--bind", self.workspace, self.workspace]),
+                    sandbox=config.sandbox,
                 )
 
         self.staging.mkdir()
index 9288237fde2df1c03e533355ddc449691d2b6d53..12098cf0256f878db7c2b6aa94dcf015dad4da69 100644 (file)
@@ -27,7 +27,7 @@ def move_rpm_db(context: Context) -> None:
 
     if newdb.exists() and not newdb.is_symlink():
         with complete_step("Moving rpm database /usr/lib/sysimage/rpm → /var/lib/rpm"):
-            rmtree(olddb, sandbox=context.sandbox(options=["--bind", olddb.parent, olddb.parent]))
+            rmtree(olddb, sandbox=context.sandbox)
             shutil.move(newdb, olddb)
 
             newdb.symlink_to(os.path.relpath(olddb, start=newdb.parent))
index 6d4a232b4fe3de6c3adc5f02f01d937b4ba340bb..f19eac133ef7078805e24e1a4055f9ccab0c2671 100644 (file)
@@ -13,7 +13,6 @@ from mkosi.installer import PackageManager
 from mkosi.installer.apt import Apt
 from mkosi.log import die
 from mkosi.run import run
-from mkosi.sandbox import finalize_passwd_mounts
 from mkosi.util import umask
 
 
@@ -183,10 +182,7 @@ class Installer(DistributionInstaller):
                     Path(o.name), context.root,
                     log=False,
                     tools=context.config.tools(),
-                    # Make sure tar uses user/group information from the root directory instead of the host.
-                    sandbox=context.sandbox(
-                        options=["--bind", context.root, context.root, *finalize_passwd_mounts(context.root)],
-                    ),
+                    sandbox=context.sandbox,
                 )
 
         # Finally, run apt to properly install packages in the chroot without having to worry that maintainer
index d79049745785b7ae6be2236804394907d1412bbd..3e3cc4a5021f72aa58991fadec6e3a49c62c658f 100644 (file)
@@ -87,7 +87,7 @@ def clean_package_manager_metadata(context: Context) -> None:
             dst = context.workspace / "package-cache-dir" / d / subdir
             dst.mkdir(parents=True, exist_ok=True)
 
-            copy_tree(src, dst, sandbox=context.sandbox(options=["--ro-bind", src, src, "--bind", dst, dst]))
+            copy_tree(src, dst, sandbox=context.sandbox)
 
         context.package_cache_dir = context.workspace / "package-cache-dir"
 
@@ -103,5 +103,4 @@ def clean_package_manager_metadata(context: Context) -> None:
                         ("dpkg",     ["var/lib/dpkg"]),
                         (executable, [f"var/lib/{subdir}", f"var/cache/{subdir}"])):
         if always or not find_binary(tool, root=context.root):
-            rmtree(*(context.root / p for p in paths),
-                   sandbox=context.sandbox(options=["--bind", context.root, context.root]))
+            rmtree(*(context.root / p for p in paths), sandbox=context.sandbox)
index f388382e90d73ffb74dc566c40301c3bff73af9e..7f16c56fb50e017f17a42ecccfc3f1e9960c15d0 100644 (file)
@@ -60,7 +60,7 @@ def fixup_rpmdb_location(context: Context) -> None:
     rpmdb = context.root / "usr/lib/sysimage/rpm"
     if not rpmdb.exists():
         rpmdb = context.root / "var/lib/rpm"
-    rmtree(rpmdb, sandbox=context.sandbox(options=["--bind", rpmdb.parent, rpmdb.parent]))
+    rmtree(rpmdb, sandbox=context.sandbox)
     shutil.move(rpmdb_home, rpmdb)
     rpmdb_home.symlink_to(os.path.relpath(rpmdb, start=rpmdb_home.parent))
 
index ee49c25d56b9cd371dfeb8f00a32c852afa5b62c..9b2eeeb45eba79477a665adbba1a5e88ee565e0e 100644 (file)
@@ -9,7 +9,7 @@ from pathlib import Path
 
 from mkosi.log import complete_step, log_step
 from mkosi.run import run
-from mkosi.types import PathString
+from mkosi.sandbox import SandboxProtocol, nosandbox
 
 
 def loaded_modules() -> list[str]:
@@ -62,7 +62,7 @@ def resolve_module_dependencies(
     kver: str,
     modules: Sequence[str],
     *,
-    sandbox: Sequence[PathString] = (),
+    sandbox: SandboxProtocol = nosandbox,
 ) -> tuple[set[Path], set[Path]]:
     """
     Returns a tuple of lists containing the paths to the module and firmware dependencies of the given list
@@ -84,8 +84,11 @@ def resolve_module_dependencies(
     info = ""
     for i in range(0, len(nametofile.keys()), 8500):
         chunk = list(nametofile.keys())[i:i+8500]
-        info += run(["modinfo", "--basedir", root, "--set-version", kver, "--null", *chunk],
-                    stdout=subprocess.PIPE, sandbox=sandbox).stdout.strip()
+        info += run(
+            ["modinfo", "--basedir", root, "--set-version", kver, "--null", *chunk],
+            stdout=subprocess.PIPE,
+            sandbox=sandbox(options=["--ro-bind", root, root])
+        ).stdout.strip()
 
     log_step("Calculating required kernel modules and firmware")
 
@@ -149,7 +152,7 @@ def gen_required_kernel_modules(
     include: Sequence[str],
     exclude: Sequence[str],
     host: bool,
-    sandbox: Sequence[PathString] = (),
+    sandbox: SandboxProtocol = nosandbox,
 ) -> Iterator[Path]:
     modulesd = root / "usr/lib/modules" / kver
     modules = filter_kernel_modules(root, kver, include=include, exclude=exclude, host=host)
@@ -187,7 +190,7 @@ def process_kernel_modules(
     include: Sequence[str],
     exclude: Sequence[str],
     host: bool,
-    sandbox: Sequence[PathString] = (),
+    sandbox: SandboxProtocol = nosandbox,
 ) -> None:
     if not include and not exclude:
         return
index c5146074f810d249b522362f9755cc74eef314d4..f2fd3cc4a4a4e5c1fe3361661330dedb0af9011f 100644 (file)
@@ -7,7 +7,7 @@ from typing import Any, Optional
 
 from mkosi.log import die
 from mkosi.run import run
-from mkosi.types import PathString
+from mkosi.sandbox import SandboxProtocol, nosandbox
 
 
 @dataclasses.dataclass(frozen=True)
@@ -31,10 +31,15 @@ class Partition:
     GRUB_BOOT_PARTITION_UUID = "21686148-6449-6e6f-744e-656564454649"
 
 
-def find_partitions(image: Path, *, sandbox: Sequence[PathString]) -> list[Partition]:
-    output = json.loads(run(["systemd-repart", "--json=short", image],
-                            stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
-                            sandbox=sandbox).stdout)
+def find_partitions(image: Path, *, sandbox: SandboxProtocol = nosandbox) -> list[Partition]:
+    output = json.loads(
+        run(
+            ["systemd-repart", "--json=short", image],
+            stdout=subprocess.PIPE,
+            stderr=subprocess.DEVNULL,
+            sandbox=sandbox(options=["--ro-bind", image, image]),
+        ).stdout
+    )
     return [Partition.from_dict(d) for d in output]
 
 
index 1e1659abdcb9ebf4f258c681bb88a2a9d643212c..40c99c3091583bb462286530b5507a8b32c25051 100644 (file)
@@ -451,7 +451,7 @@ def copy_ephemeral(config: Config, src: Path) -> Iterator[Path]:
                 preserve=config.output_format == OutputFormat.directory,
                 use_subvolumes=config.use_subvolumes,
                 tools=config.tools(),
-                sandbox=config.sandbox(options=["--ro-bind", src, src, "--bind", tmp.parent, tmp.parent]),
+                sandbox=config.sandbox,
             )
 
         fork_and_wait(copy)
@@ -461,7 +461,7 @@ def copy_ephemeral(config: Config, src: Path) -> Iterator[Path]:
             if config.output_format == OutputFormat.directory:
                 become_root()
 
-            rmtree(tmp, sandbox=config.sandbox(options=["--ro-bind", src, src, "--bind", tmp.parent, tmp.parent]))
+            rmtree(tmp, sandbox=config.sandbox)
 
         fork_and_wait(rm)
 
@@ -724,9 +724,7 @@ def run_qemu(args: Args, config: Config) -> None:
             elif config.output_format == OutputFormat.disk:
                 # We can't rely on gpt-auto-generator when direct kernel booting so synthesize a root=
                 # kernel argument instead.
-                root = finalize_root(
-                    find_partitions(fname, sandbox=config.sandbox(options=["--ro-bind", fname, fname]))
-                )
+                root = finalize_root(find_partitions(fname, sandbox=config.sandbox))
                 if not root:
                     die("Cannot perform a direct kernel boot without a root or usr partition")
 
index 527bdca46811dab41c2ff8c858e49a8de2a68b2f..d95807dd897c005350c06f47b47cc8f6e3468ea5 100644 (file)
@@ -5,13 +5,21 @@ import os
 import uuid
 from collections.abc import Sequence
 from pathlib import Path
-from typing import Optional
+from typing import Optional, Protocol
 
 from mkosi.types import PathString
 from mkosi.user import INVOKING_USER
 from mkosi.util import flatten, one_zero
 
 
+class SandboxProtocol(Protocol):
+    def __call__(self, *, options: Sequence[PathString]) -> list[PathString]: ...
+
+
+def nosandbox(*, options: Sequence[PathString]) -> list[PathString]:
+    return []
+
+
 # https://github.com/torvalds/linux/blob/master/include/uapi/linux/capability.h
 class Capability(enum.Enum):
     CAP_NET_ADMIN = 12
index 2bee42ef564facae8acaf76faf86d8fc4cf2224e..e744909dddac2840ad23c56bf04787665ea6741a 100644 (file)
@@ -5,21 +5,23 @@ import errno
 import shutil
 import subprocess
 import tempfile
-from collections.abc import Iterator, Sequence
+from collections.abc import Iterator
 from pathlib import Path
 
 from mkosi.config import ConfigFeature
 from mkosi.log import die
 from mkosi.run import find_binary, run
+from mkosi.sandbox import SandboxProtocol, nosandbox
 from mkosi.types import PathString
+from mkosi.util import flatten
 
 
-def statfs(path: Path, *, sandbox: Sequence[PathString] = ()) -> str:
+def statfs(path: Path, *, sandbox: SandboxProtocol = nosandbox) -> str:
     return run(["stat", "--file-system", "--format", "%T", path],
-               sandbox=sandbox, stdout=subprocess.PIPE).stdout.strip()
+               sandbox=sandbox(options=["--ro-bind", path, path]), stdout=subprocess.PIPE).stdout.strip()
 
 
-def is_subvolume(path: Path, *, sandbox: Sequence[PathString] = ()) -> bool:
+def is_subvolume(path: Path, *, sandbox: SandboxProtocol = nosandbox) -> bool:
     return path.is_dir() and statfs(path, sandbox=sandbox) == "btrfs" and path.stat().st_ino == 256
 
 
@@ -28,7 +30,7 @@ def make_tree(
     *,
     use_subvolumes: ConfigFeature = ConfigFeature.disabled,
     tools: Path = Path("/"),
-    sandbox: Sequence[PathString] = (),
+    sandbox: SandboxProtocol = nosandbox,
 ) -> Path:
     if use_subvolumes == ConfigFeature.enabled and not find_binary("btrfs", root=tools):
         die("Subvolumes requested but the btrfs command was not found")
@@ -42,7 +44,8 @@ def make_tree(
 
     if use_subvolumes != ConfigFeature.disabled and find_binary("btrfs", root=tools) is not None:
         result = run(["btrfs", "subvolume", "create", path],
-                     sandbox=sandbox, check=use_subvolumes == ConfigFeature.enabled).returncode
+                     sandbox=sandbox(options=["--bind", path.parent, path.parent]),
+                     check=use_subvolumes == ConfigFeature.enabled).returncode
     else:
         result = 1
 
@@ -75,7 +78,7 @@ def copy_tree(
     dereference: bool = False,
     use_subvolumes: ConfigFeature = ConfigFeature.disabled,
     tools: Path = Path("/"),
-    sandbox: Sequence[PathString] = (),
+    sandbox: SandboxProtocol = nosandbox,
 ) -> Path:
     subvolume = (use_subvolumes == ConfigFeature.enabled or
                  use_subvolumes == ConfigFeature.auto and find_binary("btrfs", root=tools) is not None)
@@ -91,6 +94,7 @@ def copy_tree(
         "--reflink=auto",
         src, dst,
     ]
+    options: list[PathString] = ["--ro-bind", src, src, "--bind", dst.parent, dst.parent]
 
     # If the source and destination are both directories, we want to merge the source directory with the
     # destination directory. If the source if a file and the destination is a directory, we want to copy
@@ -111,7 +115,7 @@ def copy_tree(
             if not preserve
             else contextlib.nullcontext()
         ):
-            run(copy, sandbox=sandbox)
+            run(copy, sandbox=sandbox(options=options))
         return dst
 
     # btrfs can't snapshot to an existing directory so make sure the destination does not exist.
@@ -119,21 +123,22 @@ def copy_tree(
         dst.rmdir()
 
     result = run(["btrfs", "subvolume", "snapshot", src, dst],
-                 check=use_subvolumes == ConfigFeature.enabled, sandbox=sandbox).returncode
+                 check=use_subvolumes == ConfigFeature.enabled, sandbox=sandbox(options=options)).returncode
     if result != 0:
         with (
             preserve_target_directories_stat(src, dst)
             if not preserve
             else contextlib.nullcontext()
         ):
-            run(copy, sandbox=sandbox)
+            run(copy, sandbox=sandbox(options=options))
 
     return dst
 
 
-def rmtree(*paths: Path, sandbox: Sequence[PathString] = ()) -> None:
+def rmtree(*paths: Path, sandbox: SandboxProtocol = nosandbox) -> None:
     if paths:
-        run(["rm", "-rf", "--", *paths], sandbox=sandbox)
+        run(["rm", "-rf", "--", *paths],
+            sandbox=sandbox(options=flatten(["--bind", p.parent, p.parent] for p in paths)))
 
 
 def move_tree(
@@ -142,7 +147,7 @@ def move_tree(
     *,
     use_subvolumes: ConfigFeature = ConfigFeature.disabled,
     tools: Path = Path("/"),
-    sandbox: Sequence[PathString] = (),
+    sandbox: SandboxProtocol = nosandbox
 ) -> Path:
     if src == dst:
         return dst