if path.is_dir():
bases += [path]
elif path.suffix == ".tar":
- extract_tar(
- path, d,
- tools=context.config.tools(),
- sandbox=context.sandbox(options=["--ro-bind", path, path, "--bind", d.parent, d.parent]),
- )
+ extract_tar(path, d, tools=context.config.tools(), sandbox=context.sandbox)
bases += [d]
elif path.suffix == ".raw":
run(["systemd-dissect", "-M", path, d])
with complete_step("Removing files…"):
for pattern in context.config.remove_files:
- rmtree(*context.root.glob(pattern.lstrip("/")),
- sandbox=context.sandbox(options=["--bind", context.root, context.root]))
+ rmtree(*context.root.glob(pattern.lstrip("/")), sandbox=context.sandbox)
def install_distribution(context: Context) -> None:
preserve=preserve,
use_subvolumes=context.config.use_subvolumes,
tools=context.config.tools(),
- sandbox=context.sandbox(options=["--ro-bind", src, src, "--bind", t.parent, t.parent]),
+ sandbox=context.sandbox,
)
if src.is_dir() or (src.is_file() and target):
copy()
elif src.suffix == ".tar":
- extract_tar(
- src, t,
- tools=context.config.tools(),
- # Make sure tar uses user/group information from the root directory instead of the host.
- sandbox=context.sandbox(options=["--bind", dst, dst, *finalize_passwd_mounts(dst)]),
- )
+ extract_tar(src, t, tools=context.config.tools(), sandbox=context.sandbox)
elif src.suffix == ".raw":
run(
["systemd-dissect", "--copy-from", src, "/", t],
with complete_step("Copying in extra packages…"):
for d in context.config.package_directories:
- install_tree(context, d, context.packages)
+ copy_tree(
+ d, context.packages,
+ use_subvolumes=context.config.use_subvolumes,
+ tools=context.config.tools(),
+ sandbox=context.sandbox,
+ )
if context.want_local_repo():
with complete_step("Building local package repository"):
return
with complete_step("Copying in build tree…"):
- install_tree(context, context.install_dir, context.root)
+ copy_tree(
+ context.install_dir, context.root,
+ use_subvolumes=context.config.use_subvolumes,
+ tools=context.config.tools(),
+ sandbox=context.sandbox,
+ )
def gzip_binary(context: Context) -> str:
for p in intel.iterdir():
f.write(p.read_bytes())
- make_cpio(
- root, microcode,
- tools=context.config.tools(),
- sandbox=context.sandbox(options=["--ro-bind", root, root]),
- )
+ make_cpio(root, microcode, tools=context.config.tools(), sandbox=context.sandbox)
return microcode
include=context.config.kernel_modules_initrd_include,
exclude=context.config.kernel_modules_initrd_exclude,
host=context.config.kernel_modules_initrd_include_host,
- sandbox=context.sandbox(options=["--ro-bind", context.root, context.root]),
+ sandbox=context.sandbox,
),
tools=context.config.tools(),
- sandbox=context.sandbox(options=["--ro-bind", context.root, context.root]),
+ sandbox=context.sandbox,
)
# Debian/Ubuntu do not compress their kernel modules, so we compress the initramfs instead. Note that
def make_uki(context: Context, stub: Path, kver: str, kimg: Path, output: Path) -> None:
microcode = build_microcode_initrd(context)
- make_cpio(
- context.root, context.workspace / "initrd",
- tools=context.config.tools(),
- sandbox=context.sandbox(
- # Make sure cpio uses user/group information from the root directory instead of the host.
- options=["--ro-bind", context.root, context.root, *finalize_passwd_mounts(context.root)],
- ),
- )
+ make_cpio(context.root, context.workspace / "initrd", tools=context.config.tools(), sandbox=context.sandbox)
maybe_compress(context, context.config.compress_output, context.workspace / "initrd", context.workspace / "initrd")
initrds = [microcode] if microcode else []
src, dst,
use_subvolumes=context.config.use_subvolumes,
tools=context.config.tools(),
- sandbox=context.sandbox(options=["--bind", src.parent, src.parent, "--bind", dst.parent, dst.parent]),
+ sandbox=context.sandbox,
)
return
include=context.config.kernel_modules_include,
exclude=context.config.kernel_modules_exclude,
host=context.config.kernel_modules_include_host,
- sandbox=context.sandbox(options=["--ro-bind", context.root, context.root]),
+ sandbox=context.sandbox,
)
with complete_step(f"Running depmod for {kver}"):
final, build, manifest = cache_tree_paths(context.config)
with complete_step("Installing cache copies"):
- rmtree(final, sandbox=context.sandbox(options=["--bind", final.parent, final.parent]))
+ rmtree(final, sandbox=context.sandbox)
# We only use the cache-overlay directory for caching if we have a base tree, otherwise we just
# cache the root directory.
context.workspace / "cache-overlay", final,
use_subvolumes=context.config.use_subvolumes,
tools=context.config.tools(),
- sandbox=context.sandbox(
- options=[
- "--bind", context.workspace, context.workspace,
- "--bind", final.parent, final.parent,
- ],
- ),
+ sandbox=context.sandbox,
)
else:
move_tree(
context.root, final,
use_subvolumes=context.config.use_subvolumes,
- sandbox=context.sandbox(
- options=[
- "--bind", context.root.parent, context.root.parent,
- "--bind", final.parent, final.parent,
- ],
- ),
+ sandbox=context.sandbox,
)
if need_build_overlay(context.config) and (context.workspace / "build-overlay").exists():
- rmtree(build, sandbox=context.sandbox(options=["--bind", build.parent, build.parent]))
+ rmtree(build, sandbox=context.sandbox)
move_tree(
context.workspace / "build-overlay", build,
use_subvolumes=context.config.use_subvolumes,
tools=context.config.tools(),
- sandbox=context.sandbox(
- options=[
- "--bind", context.workspace, context.workspace,
- "--bind", build.parent, build.parent,
- ],
- ),
+ sandbox=context.sandbox,
)
manifest.write_text(
final, build, _ = cache_tree_paths(context.config)
with complete_step("Copying cached trees"):
- install_tree(context, final, context.root)
+ copy_tree(
+ final, context.root,
+ use_subvolumes=context.config.use_subvolumes,
+ tools=context.config.tools(),
+ sandbox=context.sandbox,
+ )
+
if need_build_overlay(context.config):
(context.workspace / "build-overlay").symlink_to(build)
f, context.config.output_dir_or_cwd(),
use_subvolumes=context.config.use_subvolumes,
tools=context.config.tools(),
- sandbox=context.sandbox(
- options=[
- "--bind", context.staging, context.staging,
- "--bind", context.config.output_dir_or_cwd(), context.config.output_dir_or_cwd(),
- ],
- ),
+ sandbox=context.sandbox,
)
def setup_workspace(args: Args, config: Config) -> Iterator[Path]:
with contextlib.ExitStack() as stack:
workspace = Path(tempfile.mkdtemp(dir=config.workspace_dir_or_default(), prefix="mkosi-workspace"))
- sandbox = config.sandbox(
- options=["--bind", config.workspace_dir_or_default(), config.workspace_dir_or_default()],
- )
- stack.callback(lambda: rmtree(workspace, sandbox=sandbox))
+ stack.callback(lambda: rmtree(workspace, sandbox=config.sandbox))
(workspace / "tmp").mkdir(mode=0o1777)
with scopedenv({"TMPDIR" : os.fspath(workspace / "tmp")}):
with umask(~0o755):
dst.mkdir(parents=True, exist_ok=True)
+ def sandbox(*, options: Sequence[PathString]) -> list[PathString]:
+ return context.sandbox(options=[*options, *exclude])
+
with flock(src):
copy_tree(
src, dst,
tools=context.config.tools(),
preserve=False,
- sandbox=context.sandbox(
- options=["--ro-bind", src, src, "--bind", dst.parent, dst.parent, *exclude]
- ),
+ sandbox=sandbox,
)
make_tar(
context.root, context.staging / context.config.output_with_format,
tools=context.config.tools(),
- # Make sure tar uses user/group information from the root directory instead of the host.
- sandbox=context.sandbox(
- options=["--ro-bind", context.root, context.root, *finalize_passwd_mounts(context.root)],
- ),
+ sandbox=context.sandbox,
)
elif context.config.output_format == OutputFormat.cpio:
make_cpio(
context.root, context.staging / context.config.output_with_format,
tools=context.config.tools(),
- # Make sure cpio uses user/group information from the root directory instead of the host.
- sandbox=context.sandbox(
- options=["--ro-bind", context.root, context.root, *finalize_passwd_mounts(context.root)],
- ),
+ sandbox=context.sandbox,
)
elif context.config.output_format == OutputFormat.uki:
assert stub and kver and kimg
# SPDX-License-Identifier: LGPL-2.1+
import os
-from collections.abc import Iterable, Sequence
+from collections.abc import Iterable
from pathlib import Path
from typing import Optional
from mkosi.log import log_step
from mkosi.run import find_binary, run
-from mkosi.types import PathString
+from mkosi.sandbox import SandboxProtocol, finalize_passwd_mounts, nosandbox
def tar_binary(*, tools: Path = Path("/")) -> str:
]
-def make_tar(src: Path, dst: Path, *, tools: Path = Path("/"), sandbox: Sequence[PathString] = ()) -> None:
+def make_tar(src: Path, dst: Path, *, tools: Path = Path("/"), sandbox: SandboxProtocol = nosandbox) -> None:
log_step(f"Creating tar archive {dst}…")
with dst.open("wb") as f:
".",
],
stdout=f,
- sandbox=sandbox,
+ # Make sure tar uses user/group information from the root directory instead of the host.
+ sandbox=sandbox(options=["--ro-bind", src, src, *finalize_passwd_mounts(src)]),
)
*,
log: bool = True,
tools: Path = Path("/"),
- sandbox: Sequence[PathString] = (),
+ sandbox: SandboxProtocol = nosandbox,
) -> None:
if log:
log_step(f"Extracting tar archive {src}…")
*tar_exclude_apivfs_tmp(),
],
stdin=f,
- sandbox=sandbox,
+ sandbox=sandbox(
+ # Make sure tar uses user/group information from the root directory instead of the host.
+ options=["--ro-bind", src, src, "--bind", dst.parent, dst.parent, *finalize_passwd_mounts(dst)]
+ ),
)
*,
files: Optional[Iterable[Path]] = None,
tools: Path = Path("/"),
- sandbox: Sequence[PathString] = (),
+ sandbox: SandboxProtocol = nosandbox,
) -> None:
if not files:
files = src.rglob("*")
],
input="\0".join(os.fspath(f.relative_to(src)) for f in files),
stdout=f,
- # Make sure cpio uses user/group information from the root directory instead of the host.
- sandbox=sandbox,
+ sandbox=sandbox(options=["--ro-bind", src, src, *finalize_passwd_mounts(src)]),
)
self.root,
use_subvolumes=self.config.use_subvolumes,
tools=config.tools(),
- sandbox=config.sandbox(options=["--bind", self.workspace, self.workspace]),
+ sandbox=config.sandbox,
)
self.staging.mkdir()
if newdb.exists() and not newdb.is_symlink():
with complete_step("Moving rpm database /usr/lib/sysimage/rpm → /var/lib/rpm"):
- rmtree(olddb, sandbox=context.sandbox(options=["--bind", olddb.parent, olddb.parent]))
+ rmtree(olddb, sandbox=context.sandbox)
shutil.move(newdb, olddb)
newdb.symlink_to(os.path.relpath(olddb, start=newdb.parent))
from mkosi.installer.apt import Apt
from mkosi.log import die
from mkosi.run import run
-from mkosi.sandbox import finalize_passwd_mounts
from mkosi.util import umask
Path(o.name), context.root,
log=False,
tools=context.config.tools(),
- # Make sure tar uses user/group information from the root directory instead of the host.
- sandbox=context.sandbox(
- options=["--bind", context.root, context.root, *finalize_passwd_mounts(context.root)],
- ),
+ sandbox=context.sandbox,
)
# Finally, run apt to properly install packages in the chroot without having to worry that maintainer
dst = context.workspace / "package-cache-dir" / d / subdir
dst.mkdir(parents=True, exist_ok=True)
- copy_tree(src, dst, sandbox=context.sandbox(options=["--ro-bind", src, src, "--bind", dst, dst]))
+ copy_tree(src, dst, sandbox=context.sandbox)
context.package_cache_dir = context.workspace / "package-cache-dir"
("dpkg", ["var/lib/dpkg"]),
(executable, [f"var/lib/{subdir}", f"var/cache/{subdir}"])):
if always or not find_binary(tool, root=context.root):
- rmtree(*(context.root / p for p in paths),
- sandbox=context.sandbox(options=["--bind", context.root, context.root]))
+ rmtree(*(context.root / p for p in paths), sandbox=context.sandbox)
rpmdb = context.root / "usr/lib/sysimage/rpm"
if not rpmdb.exists():
rpmdb = context.root / "var/lib/rpm"
- rmtree(rpmdb, sandbox=context.sandbox(options=["--bind", rpmdb.parent, rpmdb.parent]))
+ rmtree(rpmdb, sandbox=context.sandbox)
shutil.move(rpmdb_home, rpmdb)
rpmdb_home.symlink_to(os.path.relpath(rpmdb, start=rpmdb_home.parent))
from mkosi.log import complete_step, log_step
from mkosi.run import run
-from mkosi.types import PathString
+from mkosi.sandbox import SandboxProtocol, nosandbox
def loaded_modules() -> list[str]:
kver: str,
modules: Sequence[str],
*,
- sandbox: Sequence[PathString] = (),
+ sandbox: SandboxProtocol = nosandbox,
) -> tuple[set[Path], set[Path]]:
"""
Returns a tuple of lists containing the paths to the module and firmware dependencies of the given list
info = ""
for i in range(0, len(nametofile.keys()), 8500):
chunk = list(nametofile.keys())[i:i+8500]
- info += run(["modinfo", "--basedir", root, "--set-version", kver, "--null", *chunk],
- stdout=subprocess.PIPE, sandbox=sandbox).stdout.strip()
+ info += run(
+ ["modinfo", "--basedir", root, "--set-version", kver, "--null", *chunk],
+ stdout=subprocess.PIPE,
+ sandbox=sandbox(options=["--ro-bind", root, root])
+ ).stdout.strip()
log_step("Calculating required kernel modules and firmware")
include: Sequence[str],
exclude: Sequence[str],
host: bool,
- sandbox: Sequence[PathString] = (),
+ sandbox: SandboxProtocol = nosandbox,
) -> Iterator[Path]:
modulesd = root / "usr/lib/modules" / kver
modules = filter_kernel_modules(root, kver, include=include, exclude=exclude, host=host)
include: Sequence[str],
exclude: Sequence[str],
host: bool,
- sandbox: Sequence[PathString] = (),
+ sandbox: SandboxProtocol = nosandbox,
) -> None:
if not include and not exclude:
return
from mkosi.log import die
from mkosi.run import run
-from mkosi.types import PathString
+from mkosi.sandbox import SandboxProtocol, nosandbox
@dataclasses.dataclass(frozen=True)
GRUB_BOOT_PARTITION_UUID = "21686148-6449-6e6f-744e-656564454649"
-def find_partitions(image: Path, *, sandbox: Sequence[PathString]) -> list[Partition]:
- output = json.loads(run(["systemd-repart", "--json=short", image],
- stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
- sandbox=sandbox).stdout)
+def find_partitions(image: Path, *, sandbox: SandboxProtocol = nosandbox) -> list[Partition]:
+ output = json.loads(
+ run(
+ ["systemd-repart", "--json=short", image],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL,
+ sandbox=sandbox(options=["--ro-bind", image, image]),
+ ).stdout
+ )
return [Partition.from_dict(d) for d in output]
preserve=config.output_format == OutputFormat.directory,
use_subvolumes=config.use_subvolumes,
tools=config.tools(),
- sandbox=config.sandbox(options=["--ro-bind", src, src, "--bind", tmp.parent, tmp.parent]),
+ sandbox=config.sandbox,
)
fork_and_wait(copy)
if config.output_format == OutputFormat.directory:
become_root()
- rmtree(tmp, sandbox=config.sandbox(options=["--ro-bind", src, src, "--bind", tmp.parent, tmp.parent]))
+ rmtree(tmp, sandbox=config.sandbox)
fork_and_wait(rm)
elif config.output_format == OutputFormat.disk:
# We can't rely on gpt-auto-generator when direct kernel booting so synthesize a root=
# kernel argument instead.
- root = finalize_root(
- find_partitions(fname, sandbox=config.sandbox(options=["--ro-bind", fname, fname]))
- )
+ root = finalize_root(find_partitions(fname, sandbox=config.sandbox))
if not root:
die("Cannot perform a direct kernel boot without a root or usr partition")
import uuid
from collections.abc import Sequence
from pathlib import Path
-from typing import Optional
+from typing import Optional, Protocol
from mkosi.types import PathString
from mkosi.user import INVOKING_USER
from mkosi.util import flatten, one_zero
+class SandboxProtocol(Protocol):
+ def __call__(self, *, options: Sequence[PathString]) -> list[PathString]: ...
+
+
+def nosandbox(*, options: Sequence[PathString]) -> list[PathString]:
+ return []
+
+
# https://github.com/torvalds/linux/blob/master/include/uapi/linux/capability.h
class Capability(enum.Enum):
CAP_NET_ADMIN = 12
import shutil
import subprocess
import tempfile
-from collections.abc import Iterator, Sequence
+from collections.abc import Iterator
from pathlib import Path
from mkosi.config import ConfigFeature
from mkosi.log import die
from mkosi.run import find_binary, run
+from mkosi.sandbox import SandboxProtocol, nosandbox
from mkosi.types import PathString
+from mkosi.util import flatten
-def statfs(path: Path, *, sandbox: Sequence[PathString] = ()) -> str:
+def statfs(path: Path, *, sandbox: SandboxProtocol = nosandbox) -> str:
return run(["stat", "--file-system", "--format", "%T", path],
- sandbox=sandbox, stdout=subprocess.PIPE).stdout.strip()
+ sandbox=sandbox(options=["--ro-bind", path, path]), stdout=subprocess.PIPE).stdout.strip()
-def is_subvolume(path: Path, *, sandbox: Sequence[PathString] = ()) -> bool:
+def is_subvolume(path: Path, *, sandbox: SandboxProtocol = nosandbox) -> bool:
return path.is_dir() and statfs(path, sandbox=sandbox) == "btrfs" and path.stat().st_ino == 256
*,
use_subvolumes: ConfigFeature = ConfigFeature.disabled,
tools: Path = Path("/"),
- sandbox: Sequence[PathString] = (),
+ sandbox: SandboxProtocol = nosandbox,
) -> Path:
if use_subvolumes == ConfigFeature.enabled and not find_binary("btrfs", root=tools):
die("Subvolumes requested but the btrfs command was not found")
if use_subvolumes != ConfigFeature.disabled and find_binary("btrfs", root=tools) is not None:
result = run(["btrfs", "subvolume", "create", path],
- sandbox=sandbox, check=use_subvolumes == ConfigFeature.enabled).returncode
+ sandbox=sandbox(options=["--bind", path.parent, path.parent]),
+ check=use_subvolumes == ConfigFeature.enabled).returncode
else:
result = 1
dereference: bool = False,
use_subvolumes: ConfigFeature = ConfigFeature.disabled,
tools: Path = Path("/"),
- sandbox: Sequence[PathString] = (),
+ sandbox: SandboxProtocol = nosandbox,
) -> Path:
subvolume = (use_subvolumes == ConfigFeature.enabled or
use_subvolumes == ConfigFeature.auto and find_binary("btrfs", root=tools) is not None)
"--reflink=auto",
src, dst,
]
+ options: list[PathString] = ["--ro-bind", src, src, "--bind", dst.parent, dst.parent]
# If the source and destination are both directories, we want to merge the source directory with the
# destination directory. If the source if a file and the destination is a directory, we want to copy
if not preserve
else contextlib.nullcontext()
):
- run(copy, sandbox=sandbox)
+ run(copy, sandbox=sandbox(options=options))
return dst
# btrfs can't snapshot to an existing directory so make sure the destination does not exist.
dst.rmdir()
result = run(["btrfs", "subvolume", "snapshot", src, dst],
- check=use_subvolumes == ConfigFeature.enabled, sandbox=sandbox).returncode
+ check=use_subvolumes == ConfigFeature.enabled, sandbox=sandbox(options=options)).returncode
if result != 0:
with (
preserve_target_directories_stat(src, dst)
if not preserve
else contextlib.nullcontext()
):
- run(copy, sandbox=sandbox)
+ run(copy, sandbox=sandbox(options=options))
return dst
-def rmtree(*paths: Path, sandbox: Sequence[PathString] = ()) -> None:
+def rmtree(*paths: Path, sandbox: SandboxProtocol = nosandbox) -> None:
if paths:
- run(["rm", "-rf", "--", *paths], sandbox=sandbox)
+ run(["rm", "-rf", "--", *paths],
+ sandbox=sandbox(options=flatten(["--bind", p.parent, p.parent] for p in paths)))
def move_tree(
*,
use_subvolumes: ConfigFeature = ConfigFeature.disabled,
tools: Path = Path("/"),
- sandbox: Sequence[PathString] = (),
+ sandbox: SandboxProtocol = nosandbox
) -> Path:
if src == dst:
return dst