- opensuse
- ubuntu
tools:
- - ""
# TODO: Enable again when https://gitlab.com/qemu-project/qemu/-/issues/2070 is fixed and available in Arch.
# - arch
- debian
--verbose \
-m integration \
--distribution ${{ matrix.distro }} \
- $([[ -n "${{ matrix.tools }}" ]] && echo --tools-tree-distribution=${{ matrix.tools }}) \
+ --tools-tree-distribution ${{ matrix.tools }} \
tests/
## v20
+- The github action does not build and install systemd from source
+ anymore. Instead, `ToolsTree=default` can be used to make sure a
+ recent version of systemd is used to do the image build.
- Added `EnvironmentFiles=` to read environment variables from
environment files.
- We drastically reduced how much of the host system we expose to
scripts. Aside from `/usr`, a few directories in `/etc`, `/tmp`,
- `/var/tmp` and various directory configured in mkosi settings, all
- host directories are hidden from scripts and package managers.
+ `/var/tmp` and various directories configured in mkosi settings, all
+ host directories are hidden from scripts, package managers and other
+ tools executed by mkosi.
- Added `SELinuxRelabel=` to specify whether to relabel selinux files
or not.
- Many fixes to tools trees were made and tools trees are now covered by
name: setup-mkosi
-description: Install mkosi and all its dependencies
+description: Install mkosi
runs:
using: composite
sudo apt-get update
sudo apt-get install --assume-yes --no-install-recommends \
archlinux-keyring \
- btrfs-progs \
bubblewrap \
debian-archive-keyring \
dnf \
- e2fsprogs \
- erofs-utils \
- mtools \
- ovmf \
pacman-package-manager \
- python3-pefile \
- python3-pyelftools \
- qemu-system-x86 \
- squashfs-tools \
- swtpm \
systemd-container \
- xfsprogs \
zypper
sudo pacman-key --init
sudo pacman-key --populate archlinux
- - name: Update systemd
- shell: bash
- working-directory: ${{ github.action_path }}
- run: |
- echo "deb-src http://archive.ubuntu.com/ubuntu/ $(lsb_release -cs) main restricted universe multiverse" | sudo tee -a /etc/apt/sources.list
- sudo apt-get update
- sudo apt-get build-dep systemd
- sudo apt-get install --assume-yes --no-install-recommends libfdisk-dev libtss2-dev
-
- git clone https://github.com/systemd/systemd-stable --single-branch --branch=v255-stable --depth=1 systemd
- meson setup systemd/build systemd \
- -D repart=true \
- -D efi=true \
- -D bootloader=true \
- -D ukify=true \
- -D firstboot=true \
- -D blkid=true \
- -D openssl=true \
- -D tpm2=true
-
- BINARIES=(
- bootctl
- kernel-install
- systemctl
- systemd-dissect
- systemd-firstboot
- systemd-measure
- systemd-nspawn
- systemd-repart
- udevadm
- ukify
- )
-
- ninja -C systemd/build ${BINARIES[@]}
-
- for BINARY in "${BINARIES[@]}"; do
- sudo ln -svf $PWD/systemd/build/$BINARY /usr/bin/$BINARY
- $BINARY --version
- done
-
- # Make sure we have mkfs.xfs that can handle spaces in protofiles.
- # TODO: Drop when we move to the next Ubuntu LTS.
- - name: Update xfsprogs
- shell: bash
- working-directory: ${{ github.action_path }}
- run: |
- sudo apt-get install --assume-yes --no-install-recommends \
- make \
- gcc \
- autoconf \
- automake \
- libtool \
- libdevmapper-dev \
- libblkid-dev \
- libicu-dev \
- libedit-dev \
- libinih-dev \
- liburcu-dev \
- uuid-dev
-
- git clone --single-branch --branch v6.4.0 https://git.kernel.org/pub/scm/fs/xfs/xfsprogs-dev.git
- cd xfsprogs-dev
- make -j $(nproc)
- sudo make install
-
- name: Install
shell: bash
run: sudo ln -svf ${{ github.action_path }}/bin/mkosi /usr/bin/mkosi
import mkosi.resources
from mkosi.archive import extract_tar, make_cpio, make_tar
-from mkosi.bubblewrap import bwrap, chroot_cmd
from mkosi.burn import run_burn
from mkosi.config import (
Args,
from mkosi.kmod import gen_required_kernel_modules, process_kernel_modules
from mkosi.log import ARG_DEBUG, complete_step, die, log_notice, log_step
from mkosi.manifest import Manifest
-from mkosi.mounts import mount_overlay, mount_usr
+from mkosi.mounts import mount_overlay
from mkosi.pager import page
from mkosi.partition import Partition, finalize_root, finalize_roothash
-from mkosi.qemu import KernelType, QemuDeviceNode, copy_ephemeral, run_qemu, run_ssh
+from mkosi.qemu import KernelType, copy_ephemeral, run_qemu, run_ssh
from mkosi.run import become_root, find_binary, fork_and_wait, init_mount_namespace, run
+from mkosi.sandbox import chroot_cmd, finalize_crypto_mounts
from mkosi.tree import copy_tree, move_tree, rmtree
from mkosi.types import PathString
from mkosi.util import (
INVOKING_USER,
- chdir,
flatten,
format_rlimit,
make_executable,
resource_path,
round_up,
scopedenv,
- try_import,
umask,
)
from mkosi.versioncomp import GenericVersion
with complete_step("Removing files…"):
for pattern in context.config.remove_files:
- for p in context.root.glob(pattern.lstrip("/")):
- rmtree(p)
+ rmtree(*context.root.glob(pattern.lstrip("/")),
+ sandbox=context.sandbox(options=["--bind", context.root, context.root]))
def install_distribution(context: Context) -> None:
yield flatten(["--bind", src, target] for src, target in sorted(set(mounts), key=lambda s: s[1]))
-def script_maybe_chroot(script: Path, mountpoint: str) -> list[str]:
- return ["mkosi-chroot", mountpoint] if script.suffix == ".chroot" else [os.fspath(script)]
-
-
@contextlib.contextmanager
def finalize_scripts(scripts: Mapping[str, Sequence[PathString]] = {}) -> Iterator[Path]:
with tempfile.TemporaryDirectory(prefix="mkosi-scripts") as d:
helpers: dict[str, Sequence[PathString]], # FIXME: change dict to Mapping when PyRight is fixed
) -> contextlib.AbstractContextManager[Path]:
scripts: dict[str, Sequence[PathString]] = {}
- if find_binary("git"):
+ if find_binary("git", root=context.config.tools()):
scripts["git"] = ("git", "-c", "safe.directory=*")
for binary in ("useradd", "groupadd"):
- if find_binary(binary):
+ if find_binary(binary, root=context.config.tools()):
scripts[binary] = (binary, "--root", context.root)
return finalize_scripts(scripts | helpers | package_manager_scripts(context))
cd = stack.enter_context(finalize_chroot_scripts(context))
for script in context.config.prepare_scripts:
- helpers = {
- "mkosi-chroot": chroot_cmd(
- context.root,
- resolve=True,
- options=[
- "--bind", script, "/work/prepare",
- "--bind", Path.cwd(), "/work/src",
- "--bind", cd, "/work/scripts",
- "--chdir", "/work/src",
- "--setenv", "SRCDIR", "/work/src",
- "--setenv", "BUILDROOT", "/",
- ],
- ),
+ chroot = chroot_cmd(
+ context.root,
+ resolve=True,
+ tools=context.config.tools(),
+ options=[
+ "--bind", script, "/work/prepare",
+ "--bind", Path.cwd(), "/work/src",
+ "--bind", cd, "/work/scripts",
+ "--chdir", "/work/src",
+ "--setenv", "SRCDIR", "/work/src",
+ "--setenv", "BUILDROOT", "/",
+ ],
+ )
+
+ helpers: dict[str, Sequence[PathString]] = {
+ "mkosi-chroot": chroot,
"mkosi-as-caller" : MKOSI_AS_CALLER,
}
hd = stack.enter_context(finalize_host_scripts(context, helpers))
with complete_step(step_msg.format(script)):
- bwrap(
- context,
- script_maybe_chroot(script, "/work/prepare") + [arg],
- network=True,
- options=sources + ["--ro-bind", script, script],
- scripts=hd,
+ run(
+ ["/work/prepare" if script.suffix == ".chroot" else script, arg],
env=env | context.config.environment,
stdin=sys.stdin,
+ sandbox=context.sandbox(
+ network=True,
+ options=sources + [
+ "--ro-bind", script, script,
+ "--ro-bind", cd, cd,
+ "--bind", context.root, context.root,
+ "--bind", context.cache_dir, context.cache_dir,
+ *finalize_crypto_mounts(tools=context.config.tools()),
+ "--chdir", Path.cwd(),
+ ],
+ scripts=hd,
+ ) + (chroot if script.suffix == ".chroot" else []),
)
finalize_source_mounts(context.config) as sources,
):
for script in context.config.build_scripts:
+ chroot = chroot_cmd(
+ context.root,
+ resolve=context.config.with_network,
+ tools=context.config.tools(),
+ options=[
+ "--bind", script, "/work/build-script",
+ "--bind", context.install_dir, "/work/dest",
+ "--bind", context.staging, "/work/out",
+ "--bind", Path.cwd(), "/work/src",
+ "--bind", cd, "/work/scripts",
+ *(
+ ["--bind", os.fspath(context.config.build_dir), "/work/build"]
+ if context.config.build_dir
+ else []
+ ),
+ "--chdir", "/work/src",
+ "--setenv", "SRCDIR", "/work/src",
+ "--setenv", "DESTDIR", "/work/dest",
+ "--setenv", "OUTPUTDIR", "/work/out",
+ "--setenv", "BUILDROOT", "/",
+ *(["--setenv", "BUILDDIR", "/work/build"] if context.config.build_dir else []),
+ ],
+ )
+
helpers = {
- "mkosi-chroot": chroot_cmd(
- context.root,
- resolve=context.config.with_network,
- options=[
- "--bind", script, "/work/build-script",
- "--bind", context.install_dir, "/work/dest",
- "--bind", context.staging, "/work/out",
- "--bind", Path.cwd(), "/work/src",
- "--bind", cd, "/work/scripts",
- *([
- "--bind", os.fspath(context.config.build_dir), "/work/build"]
- if context.config.build_dir
- else []
- ),
- "--chdir", "/work/src",
- "--setenv", "SRCDIR", "/work/src",
- "--setenv", "DESTDIR", "/work/dest",
- "--setenv", "OUTPUTDIR", "/work/out",
- "--setenv", "BUILDROOT", "/",
- *(["--setenv", "BUILDDIR", "/work/build"] if context.config.build_dir else []),
- ],
- ),
- "mkosi-as-caller" : MKOSI_AS_CALLER,
+ "mkosi-chroot": chroot,
+ "mkosi-as-caller": MKOSI_AS_CALLER,
}
cmdline = context.args.cmdline if context.args.verb == Verb.build else []
finalize_host_scripts(context, helpers) as hd,
complete_step(f"Running build script {script}…"),
):
- bwrap(
- context,
- script_maybe_chroot(script, "/work/build-script") + cmdline,
- network=context.config.with_network,
- options=sources + ["--ro-bind", script, script],
- scripts=hd,
+ run(
+ ["/work/build-script" if script.suffix == ".chroot" else script, *cmdline],
env=env | context.config.environment,
stdin=sys.stdin,
+ sandbox=context.sandbox(
+ network=context.config.with_network,
+ options=sources + [
+ "--ro-bind", script, script,
+ "--ro-bind", cd, cd,
+ "--bind", context.root, context.root,
+ "--bind", context.install_dir, context.install_dir,
+ "--bind", context.staging, context.staging,
+ *(
+ ["--bind", os.fspath(context.config.build_dir), os.fspath(context.config.build_dir)]
+ if context.config.build_dir
+ else []
+ ),
+ *finalize_crypto_mounts(tools=context.config.tools()),
+ "--chdir", Path.cwd(),
+ ],
+ scripts=hd,
+ ) + (chroot if script.suffix == ".chroot" else []),
)
finalize_source_mounts(context.config) as sources,
):
for script in context.config.postinst_scripts:
+ chroot = chroot_cmd(
+ context.root,
+ resolve=context.config.with_network,
+ tools=context.config.tools(),
+ options=[
+ "--bind", script, "/work/postinst",
+ "--bind", context.staging, "/work/out",
+ "--bind", Path.cwd(), "/work/src",
+ "--bind", cd, "/work/scripts",
+ "--chdir", "/work/src",
+ "--setenv", "SRCDIR", "/work/src",
+ "--setenv", "OUTPUTDIR", "/work/out",
+ "--setenv", "BUILDROOT", "/",
+ ],
+ )
+
helpers = {
- "mkosi-chroot": chroot_cmd(
- context.root,
- resolve=context.config.with_network,
- options=[
- "--bind", script, "/work/postinst",
- "--bind", context.staging, "/work/out",
- "--bind", Path.cwd(), "/work/src",
- "--bind", cd, "/work/scripts",
- "--chdir", "/work/src",
- "--setenv", "SRCDIR", "/work/src",
- "--setenv", "OUTPUTDIR", "/work/out",
- "--setenv", "BUILDROOT", "/",
- ],
- ),
- "mkosi-as-caller" : MKOSI_AS_CALLER,
+ "mkosi-chroot": chroot,
+ "mkosi-as-caller": MKOSI_AS_CALLER,
}
with (
finalize_host_scripts(context, helpers) as hd,
complete_step(f"Running postinstall script {script}…"),
):
- bwrap(
- context,
- script_maybe_chroot(script, "/work/postinst") + ["final"],
- network=context.config.with_network,
- options=sources + ["--ro-bind", script, script],
- scripts=hd,
+ run(
+ ["/work/postinst" if script.suffix == ".chroot" else script, "final"],
env=env | context.config.environment,
stdin=sys.stdin,
+ sandbox=context.sandbox(
+ network=context.config.with_network,
+ options=sources + [
+ "--ro-bind", script, script,
+ "--ro-bind", cd, cd,
+ "--bind", context.root, context.root,
+ "--bind", context.staging, context.staging,
+ *finalize_crypto_mounts(tools=context.config.tools()),
+ "--chdir", Path.cwd(),
+ ],
+ scripts=hd,
+ ) + (chroot if script.suffix == ".chroot" else []),
)
finalize_source_mounts(context.config) as sources,
):
for script in context.config.finalize_scripts:
+ chroot = chroot_cmd(
+ context.root,
+ resolve=context.config.with_network,
+ tools=context.config.tools(),
+ options=[
+ "--bind", script, "/work/finalize",
+ "--bind", context.staging, "/work/out",
+ "--bind", Path.cwd(), "/work/src",
+ "--bind", cd, "/work/scripts",
+ "--chdir", "/work/src",
+ "--setenv", "SRCDIR", "/work/src",
+ "--setenv", "OUTPUTDIR", "/work/out",
+ "--setenv", "BUILDROOT", "/",
+ ],
+ )
+
helpers = {
- "mkosi-chroot": chroot_cmd(
- context.root,
- resolve=context.config.with_network,
- options=[
- "--bind", script, "/work/finalize",
- "--bind", context.staging, "/work/out",
- "--bind", Path.cwd(), "/work/src",
- "--bind", cd, "/work/scripts",
- "--chdir", "/work/src",
- "--setenv", "SRCDIR", "/work/src",
- "--setenv", "OUTPUTDIR", "/work/out",
- "--setenv", "BUILDROOT", "/",
- ],
- ),
- "mkosi-as-caller" : MKOSI_AS_CALLER,
+ "mkosi-chroot": chroot,
+ "mkosi-as-caller": MKOSI_AS_CALLER,
}
with (
finalize_host_scripts(context, helpers) as hd,
complete_step(f"Running finalize script {script}…"),
):
- bwrap(
- context,
- script_maybe_chroot(script, "/work/finalize"),
- network=context.config.with_network,
- options=sources + ["--ro-bind", script, script],
- scripts=hd,
+ run(
+ ["/work/finalize" if script.suffix == ".chroot" else script],
env=env | context.config.environment,
stdin=sys.stdin,
+ sandbox=context.sandbox(
+ network=context.config.with_network,
+ options=sources + [
+ "--ro-bind", script, script,
+ "--ro-bind", cd, cd,
+ "--bind", context.root, context.root,
+ "--bind", context.staging, context.staging,
+ *finalize_crypto_mounts(tools=context.config.tools()),
+ "--chdir", Path.cwd(),
+ ],
+ scripts=hd,
+ ) + (chroot if script.suffix == ".chroot" else []),
)
def certificate_common_name(context: Context, certificate: Path) -> str:
- output = bwrap(
- context,
+ output = run(
[
"openssl",
"x509",
"-in", certificate,
],
stdout=subprocess.PIPE,
+ sandbox=context.sandbox(options=["--ro-bind", certificate, certificate]),
).stdout
for line in output.splitlines():
# pesign takes a certificate directory and a certificate common name as input arguments, so we have
# to transform our input key and cert into that format. Adapted from
# https://www.mankier.com/1/pesign#Examples-Signing_with_the_certificate_and_private_key_in_individual_files
- bwrap(
- context,
- [
- "openssl",
- "pkcs12",
- "-export",
- # Arcane incantation to create a pkcs12 certificate without a password.
- "-keypbe", "NONE",
- "-certpbe", "NONE",
- "-nomaciter",
- "-passout", "pass:",
- "-out", context.workspace / "secure-boot.p12",
- "-inkey", context.config.secure_boot_key,
- "-in", context.config.secure_boot_certificate,
- ],
- )
+ with open(context.workspace / "secure-boot.p12", "wb") as f:
+ run(
+ [
+ "openssl",
+ "pkcs12",
+ "-export",
+ # Arcane incantation to create a pkcs12 certificate without a password.
+ "-keypbe", "NONE",
+ "-certpbe", "NONE",
+ "-nomaciter",
+ "-passout", "pass:",
+ "-inkey", context.config.secure_boot_key,
+ "-in", context.config.secure_boot_certificate,
+ ],
+ stdout=f,
+ sandbox=context.sandbox(
+ options=[
+ "--ro-bind", context.config.secure_boot_key, context.config.secure_boot_key,
+ "--ro-bind", context.config.secure_boot_certificate, context.config.secure_boot_certificate,
+ ],
+ ),
+ )
- bwrap(
- context,
+ (context.workspace / "pesign").mkdir(exist_ok=True)
+
+ run(
[
"pk12util",
"-K", "",
"-i", context.workspace / "secure-boot.p12",
"-d", context.workspace / "pesign",
],
+ sandbox=context.sandbox(
+ options=[
+ "--ro-bind", context.workspace / "secure-boot.p12", context.workspace / "secure-boot.p12",
+ "--bind", context.workspace / "pesign", context.workspace / "pesign",
+ ],
+ ),
)
if (
context.config.secure_boot_sign_tool == SecureBootSignTool.sbsign or
context.config.secure_boot_sign_tool == SecureBootSignTool.auto and
- shutil.which("sbsign") is not None
+ find_binary("sbsign", root=context.config.tools()) is not None
):
- bwrap(
- context,
- [
- "sbsign",
- "--key", context.config.secure_boot_key,
- "--cert", context.config.secure_boot_certificate,
- "--output", output,
- input,
- ],
- )
+ with open(output, "wb") as f:
+ run(
+ [
+ "sbsign",
+ "--key", context.config.secure_boot_key,
+ "--cert", context.config.secure_boot_certificate,
+ "--output", "/dev/stdout",
+ input,
+ ],
+ stdout=f,
+ sandbox=context.sandbox(
+ options=[
+ "--ro-bind", context.config.secure_boot_key, context.config.secure_boot_key,
+ "--ro-bind", context.config.secure_boot_certificate, context.config.secure_boot_certificate,
+ "--ro-bind", input, input,
+ ]
+ ),
+ )
elif (
context.config.secure_boot_sign_tool == SecureBootSignTool.pesign or
context.config.secure_boot_sign_tool == SecureBootSignTool.auto and
- shutil.which("pesign") is not None
+ find_binary("pesign", root=context.config.tools()) is not None
):
pesign_prepare(context)
- bwrap(
- context,
- [
- "pesign",
- "--certdir", context.workspace / "pesign",
- "--certificate", certificate_common_name(context, context.config.secure_boot_certificate),
- "--sign",
- "--force",
- "--in", input,
- "--out", output,
- ],
- )
+ with open(output, "wb") as f:
+ run(
+ [
+ "pesign",
+ "--certdir", context.workspace / "pesign",
+ "--certificate", certificate_common_name(context, context.config.secure_boot_certificate),
+ "--sign",
+ "--force",
+ "--in", input,
+ "--out", "/dev/stdout",
+ ],
+ stdout=f,
+ sandbox=context.sandbox(
+ options=[
+ "--ro-bind", context.workspace / "pesign", context.workspace / "pesign",
+ "--ro-bind", input, input,
+ ]
+ ),
+ )
else:
die("One of sbsign or pesign is required to use SecureBoot=")
if not any(gen_kernel_images(context)) and context.config.bootable == ConfigFeature.auto:
return
- if not shutil.which("bootctl"):
+ if not find_binary("bootctl", root=context.config.tools()):
if context.config.bootable == ConfigFeature.enabled:
die("An EFI bootable image with systemd-boot was requested but bootctl was not found")
return
sign_efi_binary(context, input, output)
with complete_step("Installing systemd-boot…"):
- bwrap(
- context,
+ run(
["bootctl", "install", "--root", context.root, "--all-architectures", "--no-variables"],
env={"SYSTEMD_ESP_PATH": "/efi"},
+ sandbox=context.sandbox(options=["--bind", context.root, context.root]),
)
if context.config.shim_bootloader != ShimBootloader.none:
keys.mkdir(parents=True, exist_ok=True)
# sbsiglist expects a DER certificate.
- bwrap(
- context,
- [
- "openssl",
- "x509",
- "-outform", "DER",
- "-in", context.config.secure_boot_certificate,
- "-out", context.workspace / "mkosi.der",
- ],
- )
-
- bwrap(
- context,
- [
- "sbsiglist",
- "--owner", str(uuid.uuid4()),
- "--type", "x509",
- "--output", context.workspace / "mkosi.esl",
- context.workspace / "mkosi.der",
- ],
- )
+ with umask(~0o600), open(context.workspace / "mkosi.der", "wb") as f:
+ run(
+ [
+ "openssl",
+ "x509",
+ "-outform", "DER",
+ "-in", context.config.secure_boot_certificate,
+ ],
+ stdout=f,
+ sandbox=context.sandbox(
+ options=[
+ "--ro-bind",
+ context.config.secure_boot_certificate,
+ context.config.secure_boot_certificate,
+ ],
+ ),
+ )
- # We reuse the key for all secure boot databases to keep things simple.
- for db in ["PK", "KEK", "db"]:
- bwrap(
- context,
+ with umask(~0o600), open(context.workspace / "mkosi.esl", "wb") as f:
+ run(
[
- "sbvarsign",
- "--attr",
- "NON_VOLATILE,BOOTSERVICE_ACCESS,RUNTIME_ACCESS,TIME_BASED_AUTHENTICATED_WRITE_ACCESS",
- "--key", context.config.secure_boot_key,
- "--cert", context.config.secure_boot_certificate,
- "--output", keys / f"{db}.auth",
- db,
- context.workspace / "mkosi.esl",
+ "sbsiglist",
+ "--owner", str(uuid.uuid4()),
+ "--type", "x509",
+ "--output", "/dev/stdout",
+ context.workspace / "mkosi.der",
],
+ stdout=f,
+ sandbox=context.sandbox(
+ options=["--ro-bind", context.workspace / "mkosi.der", context.workspace / "mkosi.der"]
+ ),
)
+ # We reuse the key for all secure boot databases to keep things simple.
+ for db in ["PK", "KEK", "db"]:
+ with umask(~0o600), open(keys / f"{db}.auth", "wb") as f:
+ run(
+ [
+ "sbvarsign",
+ "--attr",
+ "NON_VOLATILE,BOOTSERVICE_ACCESS,RUNTIME_ACCESS,TIME_BASED_AUTHENTICATED_WRITE_ACCESS",
+ "--key", context.config.secure_boot_key,
+ "--cert", context.config.secure_boot_certificate,
+ "--output", "/dev/stdout",
+ db,
+ context.workspace / "mkosi.esl",
+ ],
+ stdout=f,
+ sandbox=context.sandbox(
+ options=[
+ "--ro-bind", context.config.secure_boot_key, context.config.secure_boot_key,
+ "--ro-bind",
+ context.config.secure_boot_certificate,
+ context.config.secure_boot_certificate,
+ "--ro-bind", context.workspace / "mkosi.esl", context.workspace / "mkosi.esl",
+ ],
+ ),
+ )
+
def find_and_install_shim_binary(
context: Context,
earlyconfig.flush()
- bwrap(
- context,
+ run(
[
mkimage,
"--directory", directory,
"search",
"search_fs_file",
],
- options=["--bind", context.root / "usr", "/usr"],
+ sandbox=context.sandbox(
+ options=["--ro-bind", context.root / "usr", "/usr", "--bind", context.root, context.root],
+ ),
)
for p in directory.glob("*.mod"):
with complete_step("Installing grub boot loader…"):
# We don't setup the mountinfo bind mount with bwrap because we need to know the child process pid to
# be able to do the mount and we don't know the pid beforehand.
- bwrap(
- context,
+ run(
[
"sh", "-c", f"mount --bind {mountinfo} /proc/$$/mountinfo && exec $0 \"$@\"",
setup,
*(["--verbose"] if ARG_DEBUG.get() else []),
context.staging / context.config.output_with_format,
],
- options=["--bind", context.root / "usr", "/usr"],
+ sandbox=context.sandbox(
+ options=[
+ "--bind", context.root / "usr", "/usr",
+ "--bind", context.root, context.root,
+ "--bind", context.staging, context.staging
+ ],
+ ),
)
context: Context,
src: Path,
dst: Path,
- target: Optional[Path] = None,
*,
+ target: Optional[Path] = None,
preserve: bool = True,
) -> None:
t = dst
with umask(~0o755):
t.parent.mkdir(parents=True, exist_ok=True)
+ def copy() -> None:
+ copy_tree(
+ src, t,
+ preserve=preserve,
+ use_subvolumes=context.config.use_subvolumes,
+ tools=context.config.tools(),
+ sandbox=context.sandbox(options=["--ro-bind", src, src, "--bind", t.parent, t.parent]),
+ )
+
if src.is_dir() or (src.is_file() and target):
- copy_tree(src, t, preserve=preserve, use_subvolumes=context.config.use_subvolumes)
+ copy()
elif src.suffix == ".tar":
extract_tar(context, src, t)
elif src.suffix == ".raw":
- run(["systemd-dissect", "--copy-from", src, "/", t])
+ run(
+ ["systemd-dissect", "--copy-from", src, "/", t],
+ sandbox=context.sandbox(
+ devices=True,
+ network=True,
+ options=["--ro-bind", src, src, "--bind", t.parent, t.parent],
+ ),
+ )
else:
# If we get an unknown file without a target, we just copy it into /.
- copy_tree(src, t, preserve=preserve, use_subvolumes=context.config.use_subvolumes)
+ copy()
def install_base_trees(context: Context) -> None:
with complete_step("Copying in skeleton file trees…"):
for tree in context.config.skeleton_trees:
- install_tree(context, tree.source, context.root, tree.target, preserve=False)
+ install_tree(context, tree.source, context.root, target=tree.target, preserve=False)
def install_package_manager_trees(context: Context) -> None:
with complete_step("Copying in package manager file trees…"):
for tree in context.config.package_manager_trees:
- install_tree(context, tree.source, context.workspace / "pkgmngr", tree.target, preserve=False)
+ install_tree(context, tree.source, context.workspace / "pkgmngr", target=tree.target, preserve=False)
def install_extra_trees(context: Context) -> None:
with complete_step("Copying in extra file trees…"):
for tree in context.config.extra_trees:
- install_tree(context, tree.source, context.root, tree.target, preserve=False)
+ install_tree(context, tree.source, context.root, target=tree.target, preserve=False)
def install_build_dest(context: Context) -> None:
return
with complete_step("Copying in build tree…"):
- copy_tree(context.install_dir, context.root, use_subvolumes=context.config.use_subvolumes)
+ install_tree(context, context.install_dir, context.root)
-def gzip_binary() -> str:
- return "pigz" if shutil.which("pigz") else "gzip"
+def gzip_binary(context: Context) -> str:
+ return "pigz" if find_binary("pigz", root=context.config.tools()) else "gzip"
def gen_kernel_images(context: Context) -> Iterator[tuple[str, Path]]:
# do weird stuff. But let's make sure we're not returning UKIs as the
# UKI on Fedora is named vmlinuz-virt.efi.
for kimg in kver.glob("vmlinuz*"):
- if KernelType.identify(kimg) != KernelType.uki:
+ if KernelType.identify(context.config, kimg) != KernelType.uki:
yield kver.name, kimg
break
*(["--root-password", rootpwopt] if rootpwopt else []),
*([f"--environment={k}='{v}'" for k, v in context.config.environment.items()]),
*(["--tools-tree", str(context.config.tools_tree)] if context.config.tools_tree else []),
+ *([f"--extra-search-path={p}" for p in context.config.extra_search_paths]),
*(["-f"] * context.args.force),
]
include=context.config.kernel_modules_initrd_include,
exclude=context.config.kernel_modules_initrd_exclude,
host=context.config.kernel_modules_initrd_include_host,
+ sandbox=context.sandbox(options=["--ro-bind", context.root, context.root]),
)
)
assert initrds
if len(initrds) == 1:
- copy_tree(initrds[0], output)
+ shutil.copy2(initrds[0], output)
return output
seq = io.BytesIO()
pefile = textwrap.dedent(
f"""\
import pefile
+ import sys
from pathlib import Path
pe = pefile.PE("{binary}", fast_load=True)
section = {{s.Name.decode().strip("\\0"): s for s in pe.sections}}["{section}"]
- Path("{output}").write_bytes(section.get_data(length=section.Misc_VirtualSize))
+ sys.stdout.buffer.write(section.get_data(length=section.Misc_VirtualSize))
"""
)
- bwrap(context, [python_binary(context.config)], input=pefile)
+ with open(output, "wb") as f:
+ run(
+ [python_binary(context.config)],
+ input=pefile,
+ stdout=f,
+ sandbox=context.sandbox(options=["--ro-bind", binary, binary])
+ )
def build_uki(
die(f"Architecture {context.config.architecture} does not support UEFI")
cmd: list[PathString] = [
- shutil.which("ukify") or "/usr/lib/systemd/ukify",
+ find_binary("ukify", root=context.config.tools()) or "/usr/lib/systemd/ukify",
"--cmdline", f"@{context.workspace / 'cmdline'}",
"--os-release", f"@{context.root / 'usr/lib/os-release'}",
"--stub", stub,
"--uname", kver,
]
- if not context.config.tools_tree:
- for p in context.config.extra_search_paths:
- cmd += ["--tools", p]
+ options: list[PathString] = [
+ "--bind", output.parent, output.parent,
+ "--ro-bind", context.workspace / "cmdline", context.workspace / "cmdline",
+ "--ro-bind", context.root / "usr/lib/os-release", context.root / "usr/lib/os-release",
+ "--ro-bind", stub, stub,
+ ]
if context.config.secure_boot:
assert context.config.secure_boot_key
"--secureboot-certificate",
context.config.secure_boot_certificate,
]
+ options += [
+ "--ro-bind", context.config.secure_boot_key, context.config.secure_boot_key,
+ "--ro-bind", context.config.secure_boot_certificate, context.config.secure_boot_certificate,
+ ]
else:
pesign_prepare(context)
cmd += [
"--secureboot-certificate-name",
certificate_common_name(context, context.config.secure_boot_certificate),
]
+ options += ["--ro-bind", context.workspace / "pesign", context.workspace / "pesign"]
- sign_expected_pcr = (context.config.sign_expected_pcr == ConfigFeature.enabled or
- (context.config.sign_expected_pcr == ConfigFeature.auto and
- shutil.which("systemd-measure") is not None))
+ sign_expected_pcr = (
+ context.config.sign_expected_pcr == ConfigFeature.enabled or
+ (
+ context.config.sign_expected_pcr == ConfigFeature.auto and
+ find_binary("systemd-measure", "/usr/lib/systemd/systemd-measure", root=context.config.tools())
+ )
+ )
if sign_expected_pcr:
cmd += [
"--pcr-private-key", context.config.secure_boot_key,
"--pcr-banks", "sha1,sha256",
]
+ options += ["--ro-bind", context.config.secure_boot_key, context.config.secure_boot_key]
cmd += ["build", "--linux", kimg]
+ options += ["--ro-bind", kimg, kimg]
for initrd in initrds:
cmd += ["--initrd", initrd]
+ options += ["--ro-bind", initrd, initrd]
with complete_step(f"Generating unified kernel image for kernel version {kver}"):
- bwrap(context, cmd)
+ run(cmd, sandbox=context.sandbox(options=options))
def want_efi(config: Config) -> bool:
def find_entry_token(context: Context) -> str:
if (
- "--version" not in run(["kernel-install", "--help"], stdout=subprocess.PIPE).stdout or
- systemd_tool_version("kernel-install") < "255.1"
+ "--version" not in run(["kernel-install", "--help"],
+ stdout=subprocess.PIPE, sandbox=context.sandbox()).stdout or
+ systemd_tool_version(context.config, "kernel-install") < "255.1"
):
return context.config.image_id or context.config.distribution.name
- output = json.loads(bwrap(context, ["kernel-install", "--root", context.root, "--json=pretty", "inspect"],
- stdout=subprocess.PIPE).stdout)
+ output = json.loads(run(["kernel-install", "--root", context.root, "--json=pretty", "inspect"],
+ sandbox=context.sandbox(options=["--ro-bind", context.root, context.root]),
+ stdout=subprocess.PIPE).stdout)
logging.debug(json.dumps(output, indent=4))
return cast(str, output["EntryToken"])
extract_pe_section(context, output, ".initrd", context.staging / context.config.output_split_initrd)
-def compressor_command(compression: Compression) -> list[PathString]:
+def compressor_command(context: Context, compression: Compression) -> list[PathString]:
"""Returns a command suitable for compressing archives."""
if compression == Compression.gz:
- return [gzip_binary(), "--fast", "--stdout", "-"]
+ return [gzip_binary(context), "--fast", "--stdout", "-"]
elif compression == Compression.xz:
return ["xz", "--check=crc32", "--fast", "-T0", "--stdout", "-"]
elif compression == Compression.zstd:
def maybe_compress(context: Context, compression: Compression, src: Path, dst: Optional[Path] = None) -> None:
if not compression or src.is_dir():
if dst:
- move_tree(src, dst, use_subvolumes=context.config.use_subvolumes)
+ move_tree(
+ src, dst,
+ use_subvolumes=context.config.use_subvolumes,
+ tools=context.config.tools(),
+ sandbox=context.sandbox(options=["--bind", src.parent, src.parent, "--bind", dst.parent, dst.parent]),
+ )
return
if not dst:
src.unlink() # if src == dst, make sure dst doesn't truncate the src file but creates a new file.
with dst.open("wb") as o:
- bwrap(context, compressor_command(compression), stdin=i, stdout=o)
+ run(compressor_command(context, compression), stdin=i, stdout=o, sandbox=context.sandbox())
def copy_vmlinuz(context: Context) -> None:
def calculate_sha256sum(context: Context) -> None:
- if context.config.output_format == OutputFormat.directory:
- return None
-
if not context.config.checksum:
- return None
+ return
+
+ if context.config.output_format == OutputFormat.directory:
+ return
with complete_step("Calculating SHA256SUMS…"):
with open(context.workspace / context.config.output_checksum, "w") as f:
def calculate_signature(context: Context) -> None:
- if not context.config.sign:
- return None
+ if not context.config.sign or not context.config.checksum:
+ return
- with complete_step("Signing SHA256SUMS…"):
- cmdline: list[PathString] = ["gpg", "--detach-sign"]
+ if context.config.output_format == OutputFormat.directory:
+ return
- # Need to specify key before file to sign
- if context.config.key is not None:
- cmdline += ["--default-key", context.config.key]
+ # GPG messes with the user's home directory so we run it as the invoking user.
- cmdline += [
- "--output", context.staging / context.config.output_signature,
- context.staging / context.config.output_checksum,
- ]
+ cmdline: list[PathString] = [
+ "setpriv",
+ f"--reuid={INVOKING_USER.uid}",
+ f"--regid={INVOKING_USER.gid}",
+ "--clear-groups",
+ "gpg",
+ "--detach-sign",
+ ]
- # Set the path of the keyring to use based on the environment if possible and fallback to the default
- # path. Without this the keyring for the root user will instead be used which will fail for a
- # non-root build.
- env = dict(GNUPGHOME=os.environ.get("GNUPGHOME", os.fspath(Path(os.environ["HOME"]) / ".gnupg")))
- if sys.stderr.isatty():
- env |= dict(GPGTTY=os.ttyname(sys.stderr.fileno()))
+ # Need to specify key before file to sign
+ if context.config.key is not None:
+ cmdline += ["--default-key", context.config.key]
- # Do not output warnings about keyring permissions
- bwrap(context, cmdline, stderr=subprocess.DEVNULL, env=env)
+ cmdline += ["--output", "-", "-"]
+
+ home = Path(context.config.environment.get("GNUPGHOME", INVOKING_USER.home() / ".gnupg"))
+ if not home.exists():
+ die(f"GPG home {home} not found")
+
+ env = dict(GNUPGHOME=os.fspath(home))
+ if sys.stderr.isatty():
+ env |= dict(GPGTTY=os.ttyname(sys.stderr.fileno()))
+
+ with (
+ complete_step("Signing SHA256SUMS…"),
+ open(context.staging / context.config.output_checksum, "rb") as i,
+ open(context.staging / context.config.output_signature, "wb") as o,
+ ):
+ run(
+ cmdline,
+ env=env,
+ stdin=i,
+ stdout=o,
+ sandbox=context.sandbox(options=["--perms", "755", "--dir", home, "--bind", home, home]),
+ )
def dir_size(path: Union[Path, os.DirEntry[str]]) -> int:
log_step(f"{path} size is {size}, consumes {space}.")
-def empty_directory(path: Path) -> None:
- try:
- rmtree(*path.iterdir())
- except FileNotFoundError:
- pass
-
-
-def unlink_output(args: Args, config: Config) -> None:
- # We remove any cached images if either the user used --force twice, or he/she called "clean" with it
- # passed once. Let's also remove the downloaded package cache if the user specified one additional
- # "--force".
-
- if args.verb == Verb.clean:
- remove_build_cache = args.force > 0
- remove_package_cache = args.force > 1
- else:
- remove_build_cache = args.force > 1
- remove_package_cache = args.force > 2
-
- with complete_step("Removing output files…"):
- if config.output_dir_or_cwd().exists():
- for p in config.output_dir_or_cwd().iterdir():
- if p.name.startswith(config.output):
- rmtree(p)
-
- if remove_build_cache:
- if config.cache_dir:
- for p in cache_tree_paths(config):
- if p.exists():
- with complete_step(f"Removing cache entry {p}…"):
- rmtree(p)
-
- if config.build_dir and config.build_dir.exists() and any(config.build_dir.iterdir()):
- with complete_step("Clearing out build directory…"):
- empty_directory(config.build_dir)
-
- if remove_package_cache:
- if config.cache_dir and config.cache_dir.exists() and any(config.cache_dir.iterdir()):
- with complete_step("Clearing out package cache…"):
- rmtree(*(
- config.cache_dir / p / d
- for p in ("cache", "lib")
- for d in ("apt", "dnf", "libdnf5", "pacman", "zypp")
- ))
-
-
def cache_tree_paths(config: Config) -> tuple[Path, Path, Path]:
fragments = [config.distribution, config.release, config.architecture]
die(f"Output path {f} exists already. (Consider invocation with --force.)")
-def systemd_tool_version(tool: PathString) -> GenericVersion:
- return GenericVersion(run([tool, "--version"], stdout=subprocess.PIPE).stdout.split()[2].strip("()"))
+def systemd_tool_version(config: Config, tool: PathString) -> GenericVersion:
+ return GenericVersion(
+ run([tool, "--version"], stdout=subprocess.PIPE, sandbox=config.sandbox()).stdout.split()[2].strip("()")
+ )
-def check_tool(*tools: PathString, reason: str, hint: Optional[str] = None) -> Path:
- tool = find_binary(*tools)
+def check_tool(config: Config, *tools: PathString, reason: str, hint: Optional[str] = None) -> Path:
+ tool = find_binary(*tools, root=config.tools())
if not tool:
die(f"Could not find '{tools[0]}' which is required to {reason}.", hint=hint)
return tool
-def check_systemd_tool(*tools: PathString, version: str, reason: str, hint: Optional[str] = None) -> None:
- tool = check_tool(*tools, reason=reason, hint=hint)
+def check_systemd_tool(
+ config: Config,
+ *tools: PathString,
+ version: str,
+ reason: str,
+ hint: Optional[str] = None,
+) -> None:
+ tool = check_tool(config, *tools, reason=reason, hint=hint)
- v = systemd_tool_version(tool)
+ v = systemd_tool_version(config, tool)
if v < version:
die(f"Found '{tool}' with version {v} but version {version} or newer is required to {reason}.",
hint=f"Use ToolsTree=default to get a newer version of '{tools[0]}'.")
-def check_tools(verb: Verb, config: Config) -> None:
+def check_tools(config: Config, verb: Verb) -> None:
if verb == Verb.build:
if want_efi(config):
check_systemd_tool(
+ config,
"ukify", "/usr/lib/systemd/ukify",
version="254",
reason="build bootable images",
)
if config.output_format in (OutputFormat.disk, OutputFormat.esp):
- check_systemd_tool("systemd-repart", version="254", reason="build disk images")
+ check_systemd_tool(config, "systemd-repart", version="254", reason="build disk images")
if config.selinux_relabel == ConfigFeature.enabled:
- check_tool("setfiles", reason="relabel files")
+ check_tool(config, "setfiles", reason="relabel files")
if verb == Verb.boot:
- check_systemd_tool("systemd-nspawn", version="254", reason="boot images")
+ check_systemd_tool(config, "systemd-nspawn", version="254", reason="boot images")
def configure_ssh(context: Context) -> None:
include=context.config.kernel_modules_include,
exclude=context.config.kernel_modules_exclude,
host=context.config.kernel_modules_include_host,
+ sandbox=context.sandbox(options=["--ro-bind", context.root, context.root]),
)
with complete_step(f"Running depmod for {kver}"):
- bwrap(context, ["depmod", "--all", "--basedir", context.root, kver])
+ run(["depmod", "--all", "--basedir", context.root, kver],
+ sandbox=context.sandbox(options=["--bind", context.root, context.root]))
def run_sysusers(context: Context) -> None:
- if not shutil.which("systemd-sysusers"):
+ if not find_binary("systemd-sysusers", root=context.config.tools()):
logging.info("systemd-sysusers is not installed, not generating system users")
return
with complete_step("Generating system users"):
- bwrap(context, ["systemd-sysusers", "--root", context.root])
+ run(["systemd-sysusers", "--root", context.root],
+ sandbox=context.sandbox(options=["--bind", context.root, context.root]))
def run_preset(context: Context) -> None:
- if not shutil.which("systemctl"):
+ if not find_binary("systemctl", root=context.config.tools()):
logging.info("systemctl is not installed, not applying presets")
return
with complete_step("Applying presets…"):
- bwrap(context, ["systemctl", "--root", context.root, "preset-all"])
- bwrap(context, ["systemctl", "--root", context.root, "--global", "preset-all"])
+ run(["systemctl", "--root", context.root, "preset-all"],
+ sandbox=context.sandbox(options=["--bind", context.root, context.root]))
+ run(["systemctl", "--root", context.root, "--global", "preset-all"],
+ sandbox=context.sandbox(options=["--bind", context.root, context.root]))
def run_hwdb(context: Context) -> None:
if context.config.overlay or context.config.output_format.is_extension_image():
return
- if not shutil.which("systemd-hwdb"):
+ if not find_binary("systemd-hwdb", root=context.config.tools()):
logging.info("systemd-hwdb is not installed, not generating hwdb")
return
with complete_step("Generating hardware database"):
- bwrap(context, ["systemd-hwdb", "--root", context.root, "--usr", "--strict", "update"])
+ run(["systemd-hwdb", "--root", context.root, "--usr", "--strict", "update"],
+ sandbox=context.sandbox(options=["--bind", context.root, context.root]))
# Remove any existing hwdb in /etc in favor of the one we just put in /usr.
(context.root / "etc/udev/hwdb.bin").unlink(missing_ok=True)
return
with complete_step("Applying first boot settings"):
- bwrap(context, ["systemd-firstboot", "--root", context.root, "--force", *options])
+ run(["systemd-firstboot", "--root", context.root, "--force", *options],
+ sandbox=context.sandbox(options=["--bind", context.root, context.root]))
# Initrds generally don't ship with only /usr so there's not much point in putting the credentials in
# /usr/lib/credstore.
die("SELinux relabel is requested but could not find selinux config at /etc/selinux/config")
return
- policy = bwrap(context, ["sh", "-c", f". {selinux} && echo $SELINUXTYPE"], stdout=subprocess.PIPE).stdout.strip()
+ policy = run(["sh", "-c", f". {selinux} && echo $SELINUXTYPE"],
+ sandbox=context.sandbox(options=["--ro-bind", selinux, selinux]),
+ stdout=subprocess.PIPE).stdout.strip()
if not policy:
if context.config.selinux_relabel == ConfigFeature.enabled:
die("SELinux relabel is requested but no selinux policy is configured in /etc/selinux/config")
return
- if not shutil.which("setfiles"):
+ if not find_binary("setfiles", root=context.config.tools()):
logging.info("setfiles is not installed, not relabeling files")
return
die(f"SELinux binary policy not found in {binpolicydir}")
with complete_step(f"Relabeling files using {policy} policy"):
- bwrap(context, ["setfiles", "-mFr", context.root, "-c", binpolicy, fc, context.root],
- check=context.config.selinux_relabel == ConfigFeature.enabled)
+ run(["setfiles", "-mFr", context.root, "-c", binpolicy, fc, context.root],
+ sandbox=context.sandbox(options=["--bind", context.root, context.root]),
+ check=context.config.selinux_relabel == ConfigFeature.enabled)
def need_build_overlay(config: Config) -> bool:
final, build, manifest = cache_tree_paths(context.config)
with complete_step("Installing cache copies"):
- rmtree(final)
+ rmtree(final, sandbox=context.sandbox(options=["--bind", final.parent, final.parent]))
# We only use the cache-overlay directory for caching if we have a base tree, otherwise we just
# cache the root directory.
if (context.workspace / "cache-overlay").exists():
- move_tree(context.workspace / "cache-overlay", final, use_subvolumes=context.config.use_subvolumes)
+ move_tree(
+ context.workspace / "cache-overlay", final,
+ use_subvolumes=context.config.use_subvolumes,
+ tools=context.config.tools(),
+ sandbox=context.sandbox(
+ options=[
+ "--bind", context.workspace, context.workspace,
+ "--bind", final.parent, final.parent,
+ ],
+ ),
+ )
else:
- move_tree(context.root, final, use_subvolumes=context.config.use_subvolumes)
+ move_tree(
+ context.root, final,
+ use_subvolumes=context.config.use_subvolumes,
+ sandbox=context.sandbox(
+ options=[
+ "--bind", context.root.parent, context.root.parent,
+ "--bind", final.parent, final.parent,
+ ],
+ ),
+ )
if need_build_overlay(context.config) and (context.workspace / "build-overlay").exists():
- rmtree(build)
- move_tree(context.workspace / "build-overlay", build, use_subvolumes=context.config.use_subvolumes)
+ rmtree(build, sandbox=context.sandbox(options=["--bind", build.parent, build.parent]))
+ move_tree(
+ context.workspace / "build-overlay", build,
+ use_subvolumes=context.config.use_subvolumes,
+ tools=context.config.tools(),
+ sandbox=context.sandbox(
+ options=[
+ "--bind", context.workspace, context.workspace,
+ "--bind", build.parent, build.parent,
+ ],
+ ),
+ )
manifest.write_text(
json.dumps(
return False
with complete_step("Copying cached trees"):
- copy_tree(final, context.root, use_subvolumes=context.config.use_subvolumes)
+ install_tree(context, final, context.root)
if need_build_overlay(context.config):
(context.workspace / "build-overlay").symlink_to(build)
"--seed", str(context.config.seed) if context.config.seed else "random",
context.staging / context.config.output_with_format,
]
+ options: list[PathString] = ["--bind", context.staging, context.staging]
if root:
cmdline += ["--root", root]
+ options += ["--bind", root, root]
if not context.config.architecture.is_native():
cmdline += ["--architecture", str(context.config.architecture)]
if not (context.staging / context.config.output_with_format).exists():
cmdline += ["--empty=create"]
if context.config.passphrase:
cmdline += ["--key-file", context.config.passphrase]
+ options += ["--ro-bind", context.config.passphrase, context.config.passphrase]
if context.config.verity_key:
cmdline += ["--private-key", context.config.verity_key]
+ options += ["--ro-bind", context.config.verity_key, context.config.verity_key]
if context.config.verity_certificate:
cmdline += ["--certificate", context.config.verity_certificate]
+ options += ["--ro-bind", context.config.verity_certificate, context.config.verity_certificate]
if skip:
cmdline += ["--defer-partitions", ",".join(skip)]
if split:
for d in definitions:
cmdline += ["--definitions", d]
+ options += ["--ro-bind", d, d]
env = {
option: value
with complete_step(msg):
output = json.loads(
- bwrap(context, cmdline, devices=not context.config.repart_offline, stdout=subprocess.PIPE, env=env).stdout
+ run(
+ cmdline,
+ stdout=subprocess.PIPE,
+ env=env,
+ sandbox=context.sandbox(devices=not context.config.repart_offline, options=options),
+ ).stdout
)
logging.debug(json.dumps(output, indent=4))
"--size=auto",
output,
]
+ options: list[PathString] = [
+ "--bind", output.parent, output.parent,
+ "--ro-bind", context.root, context.root,
+ ]
if not context.config.architecture.is_native():
cmdline += ["--architecture", str(context.config.architecture)]
if context.config.passphrase:
cmdline += ["--key-file", context.config.passphrase]
+ options += ["--ro-bind", context.config.passphrase, context.config.passphrase]
if context.config.verity_key:
cmdline += ["--private-key", context.config.verity_key]
+ options += ["--ro-bind", context.config.verity_key, context.config.verity_key]
if context.config.verity_certificate:
cmdline += ["--certificate", context.config.verity_certificate]
+ options += ["--ro-bind", context.config.verity_certificate, context.config.verity_certificate]
if context.config.sector_size:
cmdline += ["--sector-size", str(context.config.sector_size)]
resource_path(mkosi.resources, f"repart/definitions/{context.config.output_format}.repart.d") as r,
complete_step(f"Building {context.config.output_format} extension image")
):
- bwrap(
- context,
+ options += ["--ro-bind", r, r]
+ run(
cmdline + ["--definitions", r],
- devices=not context.config.repart_offline,
env=env,
+ sandbox=context.sandbox(devices=not context.config.repart_offline, options=options),
)
# Make sure all build outputs that are not directories are owned by the user running mkosi.
if not f.is_dir():
os.chown(f, INVOKING_USER.uid, INVOKING_USER.gid, follow_symlinks=False)
- move_tree(f, context.config.output_dir_or_cwd(), use_subvolumes=context.config.use_subvolumes)
+ move_tree(
+ f, context.config.output_dir_or_cwd(),
+ use_subvolumes=context.config.use_subvolumes,
+ tools=context.config.tools(),
+ sandbox=context.sandbox(
+ options=[
+ "--bind", context.staging, context.staging,
+ "--bind", context.config.output_dir_or_cwd(), context.config.output_dir_or_cwd(),
+ ],
+ ),
+ )
def normalize_mtime(root: Path, mtime: Optional[int], directory: Optional[Path] = None) -> None:
def setup_workspace(args: Args, config: Config) -> Iterator[Path]:
with contextlib.ExitStack() as stack:
workspace = Path(tempfile.mkdtemp(dir=config.workspace_dir_or_default(), prefix="mkosi-workspace"))
- stack.callback(lambda: rmtree(workspace))
+ sandbox = config.sandbox(
+ options=["--bind", config.workspace_dir_or_default(), config.workspace_dir_or_default()],
+ )
+ stack.callback(lambda: rmtree(workspace, sandbox=sandbox))
with scopedenv({"TMPDIR" : os.fspath(workspace)}):
try:
finalize_staging(context)
- print_output_size(config.output_dir_or_cwd() / config.output)
+ print_output_size(config.output_dir_or_cwd() / config.output_with_compression)
-def setfacl(root: Path, uid: int, allow: bool) -> None:
- run(["setfacl",
- "--physical",
- "--modify" if allow else "--remove",
- f"user:{uid}:rwx" if allow else f"user:{uid}",
- "-"],
- # Supply files via stdin so we don't clutter --debug run output too much
- input="\n".join([str(root), *(os.fspath(p) for p in root.rglob("*") if p.is_dir())]),
+def setfacl(config: Config, root: Path, uid: int, allow: bool) -> None:
+ run(
+ [
+ "setfacl",
+ "--physical",
+ "--modify" if allow else "--remove",
+ f"user:{uid}:rwx" if allow else f"user:{uid}",
+ "-",
+ ],
+ # Supply files via stdin so we don't clutter --debug run output too much
+ input="\n".join([str(root), *(os.fspath(p) for p in root.rglob("*") if p.is_dir())]),
+ sandbox=config.sandbox(options=["--bind", root, root]),
)
# getfacl complains about absolute paths so make sure we pass a relative one.
if root.exists():
- has_acl = f"user:{uid}:rwx" in run(["getfacl", "-n", root.relative_to(Path.cwd())],
- stdout=subprocess.PIPE).stdout
+ sandbox = config.sandbox(options=["--bind", root, root, "--chdir", root])
+ has_acl = f"user:{uid}:rwx" in run(["getfacl", "-n", "."], sandbox=sandbox, stdout=subprocess.PIPE).stdout
if not has_acl and not always:
yield
try:
if has_acl:
with complete_step(f"Removing ACLs from {root}"):
- setfacl(root, uid, allow=False)
+ setfacl(config, root, uid, allow=False)
yield
finally:
if has_acl or always:
with complete_step(f"Adding ACLs to {root}"):
- setfacl(root, uid, allow=True)
+ setfacl(config, root, uid, allow=True)
@contextlib.contextmanager
if config.output_format.use_outer_compression() and config.compress_output:
die(f"Sorry, can't {opname} a compressed image.")
- cmdline: list[PathString] = ["systemd-nspawn", "--quiet"]
+ cmdline: list[PathString] = ["systemd-nspawn", "--quiet", "--link-journal=no"]
# If we copied in a .nspawn file, make sure it's actually honoured
if config.nspawn_settings:
with contextlib.ExitStack() as stack:
if config.nspawn_settings:
- copy_tree(config.nspawn_settings, config.output_dir_or_cwd() / f"{name}.nspawn")
- stack.callback(lambda: rmtree(config.output_dir_or_cwd() / f"{name}.nspawn"))
+ shutil.copy2(config.nspawn_settings, config.output_dir_or_cwd() / f"{name}.nspawn")
+ stack.callback(lambda: (config.output_dir_or_cwd() / f"{name}.nspawn").unlink())
if config.ephemeral:
fname = stack.enter_context(copy_ephemeral(config, config.output_dir_or_cwd() / config.output))
fname = config.output_dir_or_cwd() / config.output
if config.output_format == OutputFormat.disk and args.verb == Verb.boot:
- run(["systemd-repart",
- "--image", fname,
- *([f"--size={config.runtime_size}"] if config.runtime_size else []),
- "--no-pager",
- "--dry-run=no",
- "--offline=no",
- fname],
- stdin=sys.stdin)
+ run(
+ [
+ "systemd-repart",
+ "--image", fname,
+ *([f"--size={config.runtime_size}"] if config.runtime_size else []),
+ "--no-pager",
+ "--dry-run=no",
+ "--offline=no",
+ fname,
+ ],
+ stdin=sys.stdin,
+ sandbox=config.sandbox(network=True, devices=True, options=["--bind", fname, fname]),
+ )
if config.output_format == OutputFormat.directory:
cmdline += ["--directory", fname]
cmdline += ["--"]
cmdline += args.cmdline
- run(cmdline, stdin=sys.stdin, stdout=sys.stdout, env=os.environ, log=False)
+ run(
+ cmdline,
+ stdin=sys.stdin,
+ stdout=sys.stdout,
+ env=os.environ,
+ log=False,
+ sandbox=config.sandbox(devices=True, network=True, relaxed=True),
+ )
def run_systemd_tool(tool: str, args: Args, config: Config) -> None:
):
die(f"Must be root to run the {args.verb} command")
- if (tool_path := find_binary(tool)) is None:
+ if (tool_path := find_binary(tool, root=config.tools())) is None:
die(f"Failed to find {tool}")
if config.ephemeral:
stdin=sys.stdin,
stdout=sys.stdout,
env=os.environ,
- log=False
+ log=False,
+ preexec_fn=become_root,
+ sandbox=config.sandbox(network=True, devices=config.output_format == OutputFormat.disk, relaxed=True),
)
run_systemd_tool("coredumpctl", args, config)
-def run_serve(config: Config) -> None:
+def run_serve(args: Args, config: Config) -> None:
"""Serve the output directory via a tiny HTTP server"""
- port = "8081"
-
- with chdir(config.output_dir_or_cwd()):
- run([python_binary(config), "-m", "http.server", port],
- user=INVOKING_USER.uid, group=INVOKING_USER.gid, stdin=sys.stdin, stdout=sys.stdout)
+ run([python_binary(config), "-m", "http.server", "8081"],
+ stdin=sys.stdin, stdout=sys.stdout,
+ sandbox=config.sandbox(network=True, relaxed=True, options=["--chdir", config.output_dir_or_cwd()]))
def generate_key_cert_pair(args: Args) -> None:
run(["man", "--local-file", man])
return
elif form == DocFormat.pandoc:
- if not shutil.which("pandoc"):
+ if not find_binary("pandoc"):
logging.error("pandoc is not available")
with resource_path(mkosi.resources, "mkosi.md") as mdr:
pandoc = run(["pandoc", "-t", "man", "-s", mdr], stdout=subprocess.PIPE)
def run_clean(args: Args, config: Config) -> None:
become_root()
- unlink_output(args, config)
+
+ # We remove any cached images if either the user used --force twice, or he/she called "clean" with it
+ # passed once. Let's also remove the downloaded package cache if the user specified one additional
+ # "--force".
+
+ if args.verb == Verb.clean:
+ remove_build_cache = args.force > 0
+ remove_package_cache = args.force > 1
+ else:
+ remove_build_cache = args.force > 1
+ remove_package_cache = args.force > 2
+
+ with complete_step("Removing output files…"):
+ if config.output_dir_or_cwd().exists():
+ rmtree(*(p for p in config.output_dir_or_cwd().iterdir() if p.name.startswith(config.output)))
+
+ if remove_build_cache:
+ if config.cache_dir:
+ with complete_step("Removing cache entries…"):
+ rmtree(*(p for p in cache_tree_paths(config) if p.exists()))
+
+ if config.build_dir and config.build_dir.exists() and any(config.build_dir.iterdir()):
+ with complete_step("Clearing out build directory…"):
+ rmtree(*config.build_dir.iterdir())
+
+ if remove_package_cache:
+ if config.cache_dir and config.cache_dir.exists() and any(config.cache_dir.iterdir()):
+ with complete_step("Clearing out package cache…"):
+ rmtree(
+ *(
+ config.cache_dir / p / d
+ for p in ("cache", "lib")
+ for d in ("apt", "dnf", "libdnf5", "pacman", "zypp")
+ ),
+ )
def run_build(args: Args, config: Config) -> None:
with (
complete_step(f"Building {config.name()} image"),
- mount_usr(config.tools_tree),
prepend_to_environ_path(config),
):
# After tools have been mounted, check if we have what we need
- check_tools(Verb.build, config)
+ check_tools(config, Verb.build)
# Create these as the invoking user to make sure they're owned by the user running mkosi.
for p in (
if args.verb == Verb.build and not args.force:
check_outputs(config)
- # Because we overmount /usr when using a tools tree, we need to make sure we load all python modules we
- # might end up using before overmounting /usr. Any modules that might be dynamically loaded during
- # execution are forcibly loaded early here.
- try_import("importlib.readers")
- try_import("importlib.resources.readers")
- for config in images:
- try_import(f"mkosi.distributions.{config.distribution}")
-
# First, process all directory removals because otherwise if different images share directories a later
# image build could end up deleting the output generated by an earlier image build.
last = images[-1]
- # After we unshare the user namespace, we might not have access to /dev/kvm or related device nodes anymore as
- # access to these might be gated behind the kvm group and we won't be part of the kvm group anymore after
- # unsharing the user namespace. To get around this, open all those device nodes now while we still can so we
- # can pass them as file descriptors to qemu later. Note that we can't pass the kvm file descriptor to qemu
- # until https://gitlab.com/qemu-project/qemu/-/issues/1936 is resolved.
- qemu_device_fds = {
- d: d.open()
- for d in QemuDeviceNode
- if args.verb == Verb.qemu and d.feature(last) != ConfigFeature.disabled and d.available(log=True)
- }
-
- if last.tools_tree and args.verb != Verb.ssh:
- become_root()
+ with prepend_to_environ_path(last):
+ check_tools(last, args.verb)
- with contextlib.ExitStack() as stack:
- if os.getuid() == 0 and args.verb != Verb.ssh:
- init_mount_namespace()
- stack.enter_context(mount_usr(last.tools_tree, umount=False))
-
- stack.enter_context(prepend_to_environ_path(last))
-
- check_tools(args.verb, last)
-
- with prepend_to_environ_path(last):
- if args.verb in (Verb.shell, Verb.boot):
- with acl_toggle_boot(last, INVOKING_USER.uid):
- run_shell(args, last)
-
- if args.verb == Verb.qemu:
- run_qemu(args, last, qemu_device_fds)
-
- if args.verb == Verb.ssh:
- run_ssh(args, last)
-
- if args.verb == Verb.serve:
- run_serve(last)
-
- if args.verb == Verb.journalctl:
- run_journalctl(args, last)
-
- if args.verb == Verb.coredumpctl:
- run_coredumpctl(args, last)
-
- if args.verb == Verb.burn:
- run_burn(args, last)
+ with (
+ acl_toggle_boot(last, INVOKING_USER.uid)
+ if args.verb in (Verb.shell, Verb.boot)
+ else contextlib.nullcontext()
+ ):
+ {
+ Verb.shell: run_shell,
+ Verb.boot: run_shell,
+ Verb.qemu: run_qemu,
+ Verb.ssh: run_ssh,
+ Verb.serve: run_serve,
+ Verb.journalctl: run_journalctl,
+ Verb.coredumpctl: run_coredumpctl,
+ Verb.burn: run_burn,
+ }[args.verb](args, last)
# PYTHON_ARGCOMPLETE_OK
import faulthandler
-import shutil
import signal
import sys
from types import FrameType
from mkosi import run_verb
from mkosi.config import parse_config
from mkosi.log import log_setup
-from mkosi.run import run, uncaught_exception_handler
+from mkosi.run import find_binary, run, uncaught_exception_handler
from mkosi.util import INVOKING_USER
try:
run_verb(args, images)
finally:
- if sys.stderr.isatty() and shutil.which("tput"):
+ if sys.stderr.isatty() and find_binary("tput"):
run(["tput", "cnorm"], check=False)
run(["tput", "smam"], check=False)
# SPDX-License-Identifier: LGPL-2.1+
import os
-import shutil
from collections.abc import Iterable
from pathlib import Path
from typing import Optional
-from mkosi.bubblewrap import bwrap
from mkosi.context import Context
from mkosi.log import log_step
-from mkosi.mounts import finalize_passwd_mounts
+from mkosi.run import find_binary, run
+from mkosi.sandbox import finalize_passwd_mounts
-def tar_binary() -> str:
+def tar_binary(context: Context) -> str:
# Some distros (Mandriva) install BSD tar as "tar", hence prefer
# "gtar" if it exists, which should be GNU tar wherever it exists.
# We are interested in exposing same behaviour everywhere hence
# everywhere. In particular given the limited/different SELinux
# support in BSD tar and the different command line syntax
# compared to GNU tar.
- return "gtar" if shutil.which("gtar") else "tar"
+ return "gtar" if find_binary("gtar", root=context.config.tools()) else "tar"
-def cpio_binary() -> str:
- return "gcpio" if shutil.which("gcpio") else "cpio"
+def cpio_binary(context: Context) -> str:
+ return "gcpio" if find_binary("gcpio", root=context.config.tools()) else "cpio"
def tar_exclude_apivfs_tmp() -> list[str]:
def make_tar(context: Context, src: Path, dst: Path) -> None:
log_step(f"Creating tar archive {dst}…")
- bwrap(
- context,
+ run(
[
- tar_binary(),
+ tar_binary(context),
"--create",
"--file", dst,
"--directory", src,
".",
],
# Make sure tar uses user/group information from the root directory instead of the host.
- options=finalize_passwd_mounts(src) if (src / "etc/passwd").exists() else [],
+ sandbox=context.sandbox(
+ options=[
+ "--bind", dst.parent, dst.parent,
+ "--ro-bind", src, src,
+ *(finalize_passwd_mounts(src) if (src / "etc/passwd").exists() else []),
+ ],
+ ),
)
def extract_tar(context: Context, src: Path, dst: Path, log: bool = True) -> None:
if log:
log_step(f"Extracting tar archive {src}…")
- bwrap(
- context,
+ run(
[
- tar_binary(),
+ tar_binary(context),
"--extract",
"--file", src,
"--directory", dst,
*tar_exclude_apivfs_tmp(),
],
# Make sure tar uses user/group information from the root directory instead of the host.
- options=finalize_passwd_mounts(dst) if (dst / "etc/passwd").exists() else [],
+ sandbox=context.sandbox(
+ options=[
+ "--bind", dst, dst,
+ "--ro-bind", src, src,
+ *(finalize_passwd_mounts(dst) if (dst / "etc/passwd").exists() else []),
+ ],
+ ),
)
files = sorted(files)
log_step(f"Creating cpio archive {dst}…")
- bwrap(
- context,
+ run(
[
- cpio_binary(),
+ cpio_binary(context),
"--create",
"--reproducible",
"--null",
],
input="\0".join(os.fspath(f.relative_to(src)) for f in files),
# Make sure cpio uses user/group information from the root directory instead of the host.
- options=finalize_passwd_mounts(dst),
+ sandbox=context.sandbox(
+ options=[
+ "--bind", dst.parent, dst.parent,
+ "--ro-bind", src, src,
+ *finalize_passwd_mounts(dst),
+ ],
+ ),
)
stdout=sys.stdout,
env=os.environ,
log=False,
+ sandbox=config.sandbox(devices=True, network=True, relaxed=True),
)
from mkosi.distributions import Distribution, detect_distribution
from mkosi.log import ARG_DEBUG, ARG_DEBUG_SHELL, Style, die
from mkosi.pager import page
-from mkosi.run import run
+from mkosi.run import find_binary, run
+from mkosi.sandbox import sandbox_cmd
from mkosi.types import PathString, SupportsRead
from mkosi.util import INVOKING_USER, StrEnum, chdir, flatten, is_power_of_2
from mkosi.versioncomp import GenericVersion
return Path("/var/tmp")
+ def tools(self) -> Path:
+ return self.tools_tree or Path("/")
+
@classmethod
def default(cls) -> "Config":
"""Alternative constructor to generate an all-default MkosiArgs.
j = cls._load_json(s)
return dataclasses.replace(cls.default(), **j)
+ def sandbox(
+ self,
+ *,
+ network: bool = False,
+ devices: bool = False,
+ relaxed: bool = False,
+ scripts: Optional[Path] = None,
+ options: Sequence[PathString] = (),
+ ) -> list[PathString]:
+ mounts: list[PathString] = (
+ flatten(("--ro-bind", d, d) for d in self.extra_search_paths)
+ if not relaxed and not self.tools_tree
+ else []
+ )
+
+ return sandbox_cmd(
+ network=network,
+ devices=devices,
+ relaxed=relaxed,
+ scripts=scripts,
+ tools=self.tools(),
+ options=[*options, *mounts],
+ )
+
def parse_ini(path: Path, only_sections: Collection[str] = ()) -> Iterator[tuple[str, str, str]]:
"""
key, _, value = s.partition("=")
creds[key] = value
- if "firstboot.timezone" not in creds and shutil.which("timedatectl"):
+ if "firstboot.timezone" not in creds and find_binary("timedatectl"):
tz = run(
["timedatectl", "show", "-p", "Timezone", "--value"],
stdout=subprocess.PIPE,
# SPDX-License-Identifier: LGPL-2.1+
+import os
+from collections.abc import Sequence
from pathlib import Path
+from typing import Optional
from mkosi.config import Args, Config
from mkosi.tree import make_tree
+from mkosi.types import PathString
from mkosi.util import umask
if config.overlay:
self.root.mkdir()
else:
- make_tree(self.root, use_subvolumes=self.config.use_subvolumes)
+ make_tree(
+ self.root,
+ use_subvolumes=self.config.use_subvolumes,
+ tools=config.tools(),
+ sandbox=config.sandbox(options=["--bind", self.workspace, self.workspace]),
+ )
self.staging.mkdir()
self.pkgmngr.mkdir()
@property
def install_dir(self) -> Path:
return self.workspace / "dest"
+
+ def sandbox(
+ self,
+ *,
+ network: bool = False,
+ devices: bool = False,
+ scripts: Optional[Path] = None,
+ options: Sequence[PathString] = (),
+ ) -> list[PathString]:
+ return self.config.sandbox(
+ network=network,
+ devices=devices,
+ scripts=scripts,
+ options=[
+ # This mount is writable so bwrap can create extra directories or symlinks inside of it as
+ # needed. This isn't a problem as the package manager directory is created by mkosi and
+ # thrown away when the build finishes.
+ "--bind", self.pkgmngr / "etc", "/etc",
+ *options,
+ *(["--ro-bind", os.fspath(p), os.fspath(p)] if (p := self.pkgmngr / "usr").exists() else []),
+ ],
+ ) + (
+ [
+ "sh",
+ "-c",
+ f"mount -t overlay -o lowerdir={self.pkgmngr / 'usr'}:/usr overlayfs /usr && exec $0 \"$@\"",
+ ] if (self.pkgmngr / "usr").exists() else []
+ )
import os
import shutil
from collections.abc import Iterable, Sequence
-from pathlib import Path
from mkosi.config import Architecture
from mkosi.context import Context
from mkosi.versioncomp import GenericVersion
-def move_rpm_db(root: Path) -> None:
+def move_rpm_db(context: Context) -> None:
"""Link /var/lib/rpm to /usr/lib/sysimage/rpm for compat with old rpm"""
- olddb = root / "var/lib/rpm"
- newdb = root / "usr/lib/sysimage/rpm"
+ olddb = context.root / "var/lib/rpm"
+ newdb = context.root / "usr/lib/sysimage/rpm"
if newdb.exists() and not newdb.is_symlink():
with complete_step("Moving rpm database /usr/lib/sysimage/rpm → /var/lib/rpm"):
- rmtree(olddb)
+ rmtree(olddb, sandbox=context.sandbox(options=["--bind", olddb.parent, olddb.parent]))
shutil.move(newdb, olddb)
newdb.symlink_to(os.path.relpath(olddb, start=newdb.parent))
# On Fedora, the default rpmdb has moved to /usr/lib/sysimage/rpm so if that's the case we
# need to move it back to /var/lib/rpm on CentOS.
- move_rpm_db(context.root)
+ move_rpm_db(context)
@classmethod
def install_packages(cls, context: Context, packages: Sequence[str], apivfs: bool = True) -> None:
from pathlib import Path
from mkosi.archive import extract_tar
-from mkosi.bubblewrap import bwrap
from mkosi.config import Architecture
from mkosi.context import Context
from mkosi.distributions import Distribution, DistributionInstaller, PackageType
from mkosi.installer.apt import invoke_apt, setup_apt
from mkosi.log import die
+from mkosi.run import run
from mkosi.util import umask
# By configuring Debug::pkgDpkgPm=1, apt-get install will not actually execute any dpkg commands, so
# all it does is download the essential debs and tell us their full in the apt cache without actually
# installing them.
- with tempfile.NamedTemporaryFile(mode="r") as f:
+ with tempfile.NamedTemporaryFile(dir="/tmp", mode="r") as f:
cls.install_packages(context, [
"-oDebug::pkgDPkgPm=1",
f"-oDPkg::Pre-Install-Pkgs::=cat >{f.name}",
# then extracting the tar file into the chroot.
for deb in essential:
- with tempfile.NamedTemporaryFile() as f:
- bwrap(context, ["dpkg-deb", "--fsys-tarfile", deb], stdout=f)
- extract_tar(context, Path(f.name), context.root, log=False)
+ with open(deb, "rb") as i, tempfile.NamedTemporaryFile() as o:
+ run(["dpkg-deb", "--fsys-tarfile", "/dev/stdin"], stdin=i, stdout=o, sandbox=context.sandbox())
+ extract_tar(context, Path(o.name), context.root, log=False)
# Finally, run apt to properly install packages in the chroot without having to worry that maintainer
# scripts won't find basic tools that they depend on.
from pathlib import Path
from mkosi.archive import extract_tar
-from mkosi.bubblewrap import apivfs_cmd, bwrap, chroot_cmd
from mkosi.config import Architecture
from mkosi.context import Context
from mkosi.distributions import (
)
from mkosi.log import ARG_DEBUG, complete_step, die
from mkosi.run import run
+from mkosi.sandbox import apivfs_cmd, chroot_cmd
from mkosi.tree import copy_tree, rmtree
from mkosi.types import PathString
from mkosi.util import sort_packages
def invoke_emerge(context: Context, packages: Sequence[str] = (), apivfs: bool = True) -> None:
- bwrap(
- context,
- cmd=apivfs_cmd(context.root) + [
+ run(
+ apivfs_cmd(context.root, tools=context.config.tools()) + [
# We can't mount the stage 3 /usr using `options`, because bwrap isn't available in the stage 3
# tarball which is required by apivfs_cmd(), so we have to mount /usr from the tarball later
# using another bwrap exec.
f"--root={context.root}",
*sort_packages(packages),
],
- network=True,
- options=[
- # TODO: Get rid of as many of these as possible.
- "--bind", context.cache_dir / "stage3/etc", "/etc",
- "--bind", context.cache_dir / "stage3/var", "/var",
- "--ro-bind", "/etc/resolv.conf", "/etc/resolv.conf",
- "--bind", context.cache_dir / "repos", "/var/db/repos",
- ],
+ sandbox=context.sandbox(
+ network=True,
+ options=[
+ # TODO: Get rid of as many of these as possible.
+ "--bind", context.cache_dir / "stage3/etc", "/etc",
+ "--bind", context.cache_dir / "stage3/var", "/var",
+ "--ro-bind", "/etc/resolv.conf", "/etc/resolv.conf",
+ "--bind", context.cache_dir / "repos", "/var/db/repos",
+ ],
+ ),
env=dict(
PKGDIR=str(context.cache_dir / "binpkgs"),
DISTDIR=str(context.cache_dir / "distfiles"),
if stage3_tar.exists():
cmd += ["--time-cond", stage3_tar]
- run(cmd)
+ run(cmd, sandbox=context.sandbox())
if stage3_tar.stat().st_mtime > old:
rmtree(stage3)
chroot = chroot_cmd(
stage3,
+ tools=context.config.tools(),
options=["--bind", context.cache_dir / "repos", "/var/db/repos"],
)
- bwrap(context, cmd=chroot + ["emerge-webrsync"], network=True)
+ run(chroot + ["emerge-webrsync"], sandbox=context.sandbox(network=True))
invoke_emerge(context, packages=["sys-apps/baselayout"], apivfs=False)
# SPDX-License-Identifier: LGPL-2.1+
-import shutil
import tempfile
import xml.etree.ElementTree as ElementTree
from collections.abc import Sequence
from mkosi.installer.rpm import RpmRepository
from mkosi.installer.zypper import invoke_zypper, setup_zypper
from mkosi.log import die
-from mkosi.run import run
+from mkosi.run import find_binary, run
class Installer(DistributionInstaller):
release_url = f"{mirror}/distribution/leap/{release}/repo/oss/"
updates_url = f"{mirror}/update/leap/{release}/oss/"
- zypper = shutil.which("zypper")
+ zypper = find_binary("zypper", root=context.config.tools())
# If we need to use a local mirror, create a temporary repository definition
# that doesn't get in the image, as it is valid only at image build time.
if context.config.local_mirror:
- repos = [RpmRepository("local-mirror", f"baseurl={context.config.local_mirror}", ())]
+ repos = [RpmRepository(id="local-mirror", url=f"baseurl={context.config.local_mirror}", gpgurls=())]
else:
repos = [
- RpmRepository("repo-oss", f"baseurl={release_url}", fetch_gpgurls(release_url) if not zypper else ()),
+ RpmRepository(
+ id="repo-oss",
+ url=f"baseurl={release_url}",
+ gpgurls=fetch_gpgurls(context, release_url) if not zypper else (),
+ ),
]
if updates_url is not None:
repos += [
RpmRepository(
- "repo-update",
- f"baseurl={updates_url}",
- fetch_gpgurls(updates_url) if not zypper else (),
+ id="repo-update",
+ url=f"baseurl={updates_url}",
+ gpgurls=fetch_gpgurls(context, updates_url) if not zypper else (),
)
]
@classmethod
def install_packages(cls, context: Context, packages: Sequence[str], apivfs: bool = True) -> None:
- if shutil.which("zypper"):
+ if find_binary("zypper", root=context.config.tools()):
options = [
"--download", "in-advance",
"--recommends" if context.config.with_recommends else "--no-recommends",
@classmethod
def remove_packages(cls, context: Context, packages: Sequence[str]) -> None:
- if shutil.which("zypper"):
+ if find_binary("zypper", root=context.config.tools()):
invoke_zypper(context, "remove", packages, ["--clean-deps"])
else:
invoke_dnf(context, "remove", packages)
return a
-def fetch_gpgurls(repourl: str) -> tuple[str, ...]:
+def fetch_gpgurls(context: Context, repourl: str) -> tuple[str, ...]:
gpgurls = [f"{repourl}/repodata/repomd.xml.key"]
with tempfile.TemporaryDirectory() as d:
- run([
- "curl",
- "--location",
- "--output-dir", d,
- "--remote-name",
- "--no-progress-meter",
- "--fail",
- f"{repourl}/repodata/repomd.xml",
- ])
+ run(
+ [
+ "curl",
+ "--location",
+ "--output-dir", d,
+ "--remote-name",
+ "--no-progress-meter",
+ "--fail",
+ f"{repourl}/repodata/repomd.xml",
+ ],
+ sandbox=context.sandbox(network=True, options=["--bind", d, d]),
+ )
xml = (Path(d) / "repomd.xml").read_text()
root = ElementTree.fromstring(xml)
import os
-from mkosi.bubblewrap import apivfs_cmd
from mkosi.config import ConfigFeature
from mkosi.context import Context
from mkosi.installer.apt import apt_cmd
from mkosi.installer.pacman import pacman_cmd
from mkosi.installer.rpm import rpm_cmd
from mkosi.installer.zypper import zypper_cmd
+from mkosi.sandbox import apivfs_cmd
from mkosi.tree import rmtree
from mkosi.types import PathString
if not always and os.access(context.root / "usr" / bin / tool, mode=os.F_OK, follow_symlinks=False):
break
else:
- for p in paths:
- rmtree(context.root / p)
+ rmtree(*(context.root / p for p in paths),
+ sandbox=context.sandbox(options=["--bind", context.root, context.root]))
def package_manager_scripts(context: Context) -> dict[str, list[PathString]]:
return {
- "pacman": apivfs_cmd(context.root) + pacman_cmd(context),
- "zypper": apivfs_cmd(context.root) + zypper_cmd(context),
- "dnf" : apivfs_cmd(context.root) + dnf_cmd(context),
- "rpm" : apivfs_cmd(context.root) + rpm_cmd(context),
+ "pacman": apivfs_cmd(context.root, tools=context.config.tools()) + pacman_cmd(context),
+ "zypper": apivfs_cmd(context.root, tools=context.config.tools()) + zypper_cmd(context),
+ "dnf" : apivfs_cmd(context.root, tools=context.config.tools()) + dnf_cmd(context),
+ "rpm" : apivfs_cmd(context.root, tools=context.config.tools()) + rpm_cmd(context),
} | {
- command: apivfs_cmd(context.root) + apt_cmd(context, command) for command in (
+ command: apivfs_cmd(context.root, tools=context.config.tools()) + apt_cmd(context, command) for command in (
"apt",
"apt-cache",
"apt-cdrom",
# SPDX-License-Identifier: LGPL-2.1+
-import shutil
import textwrap
from collections.abc import Sequence
-from mkosi.bubblewrap import apivfs_cmd, bwrap
from mkosi.context import Context
+from mkosi.run import find_binary, run
+from mkosi.sandbox import apivfs_cmd, finalize_crypto_mounts
from mkosi.types import PathString
from mkosi.util import sort_packages, umask
"-o", f"Dir::State={context.cache_dir / 'lib/apt'}",
"-o", f"Dir::State::Status={context.root / 'var/lib/dpkg/status'}",
"-o", f"Dir::Log={context.workspace}",
- "-o", f"Dir::Bin::DPkg={shutil.which('dpkg')}",
+ "-o", f"Dir::Bin::DPkg={find_binary('dpkg', root=context.config.tools())}",
"-o", "Debug::NoLocking=true",
"-o", f"DPkg::Options::=--root={context.root}",
"-o", "DPkg::Options::=--force-unsafe-io",
packages: Sequence[str] = (),
apivfs: bool = True,
) -> None:
- cmd = apivfs_cmd(context.root) if apivfs else []
- bwrap(context, cmd + apt_cmd(context, command) + [operation, *sort_packages(packages)],
- network=True, env=context.config.environment)
+ run(
+ apt_cmd(context, command) + [operation, *sort_packages(packages)],
+ sandbox=(
+ context.sandbox(
+ network=True,
+ options=[
+ "--bind", context.root, context.root,
+ "--bind", context.cache_dir, context.cache_dir,
+ "--ro-bind", context.workspace / "apt.conf", context.workspace / "apt.conf",
+ *finalize_crypto_mounts(tools=context.config.tools()),
+ ],
+ ) + (apivfs_cmd(context.root, tools=context.config.tools()) if apivfs else [])
+ ),
+ env=context.config.environment,
+ )
# SPDX-License-Identifier: LGPL-2.1+
-import shutil
import textwrap
from collections.abc import Iterable
+from pathlib import Path
-from mkosi.bubblewrap import apivfs_cmd, bwrap
from mkosi.context import Context
from mkosi.installer.rpm import RpmRepository, fixup_rpmdb_location, setup_rpm
+from mkosi.run import find_binary, run
+from mkosi.sandbox import apivfs_cmd, finalize_crypto_mounts
from mkosi.types import PathString
from mkosi.util import sort_packages
def dnf_executable(context: Context) -> str:
# Allow the user to override autodetection with an environment variable
dnf = context.config.environment.get("MKOSI_DNF")
+ root = context.config.tools()
- return dnf or shutil.which("dnf5") or shutil.which("dnf") or "yum"
+ return Path(dnf or find_binary("dnf5", root=root) or find_binary("dnf", root=root) or "yum").name
def setup_dnf(context: Context, repositories: Iterable[RpmRepository], filelists: bool = True) -> None:
def invoke_dnf(context: Context, command: str, packages: Iterable[str], apivfs: bool = True) -> None:
- cmd = apivfs_cmd(context.root) if apivfs else []
- bwrap(context, cmd + dnf_cmd(context) + [command, *sort_packages(packages)],
- network=True, env=context.config.environment)
-
- fixup_rpmdb_location(context.root)
+ run(
+ dnf_cmd(context) + [command, *sort_packages(packages)],
+ sandbox=(
+ context.sandbox(
+ network=True,
+ options=[
+ "--bind", context.root, context.root,
+ "--bind", context.cache_dir, context.cache_dir,
+ *finalize_crypto_mounts(tools=context.config.tools()),
+ ],
+ ) + (apivfs_cmd(context.root, tools=context.config.tools()) if apivfs else [])
+ ),
+ env=context.config.environment,
+ )
+
+ fixup_rpmdb_location(context)
# The log directory is always interpreted relative to the install root so there's nothing we can do but
# to remove the log files from the install root afterwards.
from collections.abc import Iterable, Sequence
from typing import NamedTuple
-from mkosi.bubblewrap import apivfs_cmd, bwrap
from mkosi.context import Context
+from mkosi.run import run
+from mkosi.sandbox import apivfs_cmd, finalize_crypto_mounts
from mkosi.types import PathString
from mkosi.util import sort_packages, umask
packages: Sequence[str] = (),
apivfs: bool = True,
) -> None:
- cmd = apivfs_cmd(context.root) if apivfs else []
- bwrap(context, cmd + pacman_cmd(context) + [operation, *options, *sort_packages(packages)],
- network=True, env=context.config.environment)
+ run(
+ pacman_cmd(context) + [operation, *options, *sort_packages(packages)],
+ sandbox=(
+ context.sandbox(
+ network=True,
+ options=[
+ "--bind", context.root, context.root,
+ "--bind", context.cache_dir, context.cache_dir,
+ *finalize_crypto_mounts(tools=context.config.tools()),
+ ],
+ ) + (apivfs_cmd(context.root, tools=context.config.tools()) if apivfs else [])
+ ),
+ env=context.config.environment,
+ )
from pathlib import Path
from typing import NamedTuple, Optional
-from mkosi.bubblewrap import bwrap
from mkosi.context import Context
+from mkosi.run import run
from mkosi.tree import rmtree
from mkosi.types import PathString
def find_rpm_gpgkey(context: Context, key: str, url: str) -> str:
- gpgpath = next(Path("/usr/share/distribution-gpg-keys").rglob(key), None)
+ gpgpath = next((context.config.tools() / "usr/share/distribution-gpg-keys").rglob(key), None)
if gpgpath:
- return f"file://{gpgpath}"
+ return f"file://{Path('/') / gpgpath.relative_to(context.config.tools())}"
gpgpath = next(Path(context.pkgmngr / "etc/pki/rpm-gpg").rglob(key), None)
if gpgpath:
if not (confdir / "macros.lang").exists() and context.config.locale:
(confdir / "macros.lang").write_text(f"%_install_langs {context.config.locale}")
- plugindir = Path(bwrap(context, ["rpm", "--eval", "%{__plugindir}"], stdout=subprocess.PIPE).stdout.strip())
- if plugindir.exists():
+ plugindir = Path(run(["rpm", "--eval", "%{__plugindir}"],
+ sandbox=context.sandbox(), stdout=subprocess.PIPE).stdout.strip())
+ if (plugindir := context.config.tools() / plugindir.relative_to("/")).exists():
with (confdir / "macros.disable-plugins").open("w") as f:
for plugin in plugindir.iterdir():
f.write(f"%__transaction_{plugin.stem} %{{nil}}\n")
-def fixup_rpmdb_location(root: Path) -> None:
+def fixup_rpmdb_location(context: Context) -> None:
# On Debian, rpm/dnf ship with a patch to store the rpmdb under ~/ so it needs to be copied back in the
# right location, otherwise the rpmdb will be broken. See: https://bugs.debian.org/1004863. We also
# replace it with a symlink so that any further rpm operations immediately use the correct location.
- rpmdb_home = root / "root/.rpmdb"
+ rpmdb_home = context.root / "root/.rpmdb"
if not rpmdb_home.exists() or rpmdb_home.is_symlink():
return
# Take into account the new location in F36
- rpmdb = root / "usr/lib/sysimage/rpm"
+ rpmdb = context.root / "usr/lib/sysimage/rpm"
if not rpmdb.exists():
- rpmdb = root / "var/lib/rpm"
- rmtree(rpmdb)
+ rpmdb = context.root / "var/lib/rpm"
+ rmtree(rpmdb, sandbox=context.sandbox(options=["--bind", rpmdb.parent, rpmdb.parent]))
shutil.move(rpmdb_home, rpmdb)
rpmdb_home.symlink_to(os.path.relpath(rpmdb, start=rpmdb_home.parent))
import textwrap
from collections.abc import Sequence
-from mkosi.bubblewrap import apivfs_cmd, bwrap
from mkosi.config import yes_no
from mkosi.context import Context
from mkosi.installer.rpm import RpmRepository, fixup_rpmdb_location, setup_rpm
+from mkosi.run import run
+from mkosi.sandbox import apivfs_cmd, finalize_crypto_mounts
from mkosi.types import PathString
from mkosi.util import sort_packages
options: Sequence[str] = (),
apivfs: bool = True,
) -> None:
- cmd = apivfs_cmd(context.root) if apivfs else []
- bwrap(context, cmd + zypper_cmd(context) + [verb, *options, *sort_packages(packages)],
- network=True, env=context.config.environment)
+ run(
+ zypper_cmd(context) + [verb, *options, *sort_packages(packages)],
+ sandbox=(
+ context.sandbox(
+ network=True,
+ options=[
+ "--bind", context.root, context.root,
+ "--bind", context.cache_dir, context.cache_dir,
+ *finalize_crypto_mounts(tools=context.config.tools()),
+ ],
+ ) + (apivfs_cmd(context.root, tools=context.config.tools()) if apivfs else [])
+ ),
+ env=context.config.environment,
+ )
- fixup_rpmdb_location(context.root)
+ fixup_rpmdb_location(context)
from mkosi.log import complete_step, log_step
from mkosi.run import run
+from mkosi.types import PathString
def loaded_modules() -> list[str]:
return path.name.partition(".")[0]
-def resolve_module_dependencies(root: Path, kver: str, modules: Sequence[str]) -> tuple[set[Path], set[Path]]:
+def resolve_module_dependencies(
+ root: Path,
+ kver: str,
+ modules: Sequence[str],
+ *,
+ sandbox: Sequence[PathString] = (),
+) -> tuple[set[Path], set[Path]]:
"""
Returns a tuple of lists containing the paths to the module and firmware dependencies of the given list
of module names (including the given module paths themselves). The paths are returned relative to the
for i in range(0, len(nametofile.keys()), 8500):
chunk = list(nametofile.keys())[i:i+8500]
info += run(["modinfo", "--basedir", root, "--set-version", kver, "--null", *chunk],
- stdout=subprocess.PIPE).stdout.strip()
+ stdout=subprocess.PIPE, sandbox=sandbox).stdout.strip()
log_step("Calculating required kernel modules and firmware")
include: Sequence[str],
exclude: Sequence[str],
host: bool,
+ sandbox: Sequence[PathString] = (),
) -> Iterator[Path]:
modulesd = root / "usr/lib/modules" / kver
modules = filter_kernel_modules(root, kver, include=include, exclude=exclude, host=host)
names = [module_path_to_name(m) for m in modules]
- mods, firmware = resolve_module_dependencies(root, kver, names)
+ mods, firmware = resolve_module_dependencies(root, kver, names, sandbox=sandbox)
def files() -> Iterator[Path]:
yield modulesd.parent
include: Sequence[str],
exclude: Sequence[str],
host: bool,
+ sandbox: Sequence[PathString] = (),
) -> None:
if not include and not exclude:
return
with complete_step("Applying kernel module filters"):
- required = set(gen_required_kernel_modules(root, kver, include=include, exclude=exclude, host=host))
+ required = set(
+ gen_required_kernel_modules(root, kver, include=include, exclude=exclude, host=host, sandbox=sandbox)
+ )
for m in (root / "usr/lib/modules" / kver).rglob("*.ko*"):
if m in required:
if not (root / dbpath).exists():
dbpath = "/var/lib/rpm"
- c = run(["rpm",
- f"--root={root}",
- f"--dbpath={dbpath}",
- "-qa",
- "--qf", r"%{NEVRA}\t%{SOURCERPM}\t%{NAME}\t%{ARCH}\t%{LONGSIZE}\t%{INSTALLTIME}\n"],
- stdout=subprocess.PIPE)
+ c = run(
+ [
+ "rpm",
+ f"--root={root}",
+ f"--dbpath={dbpath}",
+ "-qa",
+ "--qf", r"%{NEVRA}\t%{SOURCERPM}\t%{NAME}\t%{ARCH}\t%{LONGSIZE}\t%{INSTALLTIME}\n",
+ ],
+ stdout=subprocess.PIPE,
+ sandbox=self.config.sandbox(),
+ )
packages = sorted(c.stdout.splitlines())
source = self.source_packages.get(srpm)
if source is None:
- c = run(["rpm",
- f"--root={root}",
- f"--dbpath={dbpath}",
- "-q",
- "--changelog",
- nevra],
- stdout=subprocess.PIPE,
- stderr=subprocess.DEVNULL)
+ c = run(
+ [
+ "rpm",
+ f"--root={root}",
+ f"--dbpath={dbpath}",
+ "-q",
+ "--changelog",
+ nevra,
+ ],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL,
+ sandbox=self.config.sandbox(),
+ )
changelog = c.stdout.strip()
source = SourcePackageManifest(srpm, changelog)
self.source_packages[srpm] = source
source.add(manifest)
def record_deb_packages(self, root: Path) -> None:
- c = run(["dpkg-query",
- f"--admindir={root}/var/lib/dpkg",
- "--show",
- "--showformat",
- r'${Package}\t${source:Package}\t${Version}\t${Architecture}\t${Installed-Size}\t${db-fsys:Last-Modified}\n'],
- stdout=subprocess.PIPE)
+ c = run(
+ [
+ "dpkg-query",
+ f"--admindir={root}/var/lib/dpkg",
+ "--show",
+ "--showformat",
+ r'${Package}\t${source:Package}\t${Version}\t${Architecture}\t${Installed-Size}\t${db-fsys:Last-Modified}\n',
+ ],
+ stdout=subprocess.PIPE,
+ sandbox=self.config.sandbox(),
+ )
packages = sorted(c.stdout.splitlines())
# We have to run from the root, because if we use the RootDir option to make
# apt from the host look at the repositories in the image, it will also pick
# the 'methods' executables from there, but the ABI might not be compatible.
- result = run(cmd, stdout=subprocess.PIPE)
+ result = run(cmd, stdout=subprocess.PIPE, sandbox=self.config.sandbox())
source_package = SourcePackageManifest(source, result.stdout.strip())
self.source_packages[source] = source_package
from mkosi.run import run
from mkosi.types import PathString
-from mkosi.util import INVOKING_USER, umask
+from mkosi.util import umask
from mkosi.versioncomp import GenericVersion
yield where
finally:
delete_whiteout_files(upperdir)
-
-
-@contextlib.contextmanager
-def mount_usr(tree: Optional[Path], umount: bool = True) -> Iterator[None]:
- if not tree:
- yield
- return
-
- # If we replace /usr, we should ignore any local modifications made to PATH as any of those binaries
- # might not work anymore when /usr is replaced wholesale. We also make sure that both /usr/bin and
- # /usr/sbin/ are searched so that e.g. if the host is Arch and the root is Debian we don't ignore the
- # binaries from /usr/sbin in the Debian root.
- old = os.environ["PATH"]
- os.environ["PATH"] = "/usr/bin:/usr/sbin"
-
- try:
- # If we mounted over /usr, trying to use umount will fail with "target is busy", because umount is
- # being called from /usr, which we're trying to unmount. To work around this issue, we do a lazy
- # unmount.
- with mount(
- what=tree / "usr",
- where=Path("/usr"),
- operation="--bind",
- read_only=True,
- lazy=True,
- umount=umount,
- ):
- yield
- finally:
- os.environ["PATH"] = old
-
-
-@contextlib.contextmanager
-def mount_passwd() -> Iterator[None]:
- with tempfile.NamedTemporaryFile(prefix="mkosi.passwd", mode="w") as passwd:
- passwd.write("root:x:0:0:root:/root:/bin/sh\n")
- if INVOKING_USER.uid != 0:
- name = INVOKING_USER.name()
- home = INVOKING_USER.home()
- passwd.write(f"{name}:x:{INVOKING_USER.uid}:{INVOKING_USER.gid}:{name}:{home}:/bin/sh\n")
- passwd.flush()
- os.fchown(passwd.file.fileno(), INVOKING_USER.uid, INVOKING_USER.gid)
-
- with mount(passwd.name, Path("/etc/passwd"), operation="--bind"):
- yield
-
-
-def finalize_passwd_mounts(root: Path) -> list[PathString]:
- """
- If passwd or a related file exists in the apivfs directory, bind mount it over the host files while we
- run the command, to make sure that the command we run uses user/group information from the apivfs
- directory instead of from the host. If the file doesn't exist yet, mount over /dev/null instead.
- """
- options: list[PathString] = []
-
- for f in ("passwd", "group", "shadow", "gshadow"):
- if not (Path("/etc") / f).exists():
- continue
- p = root / "etc" / f
- if p.exists():
- options += ["--bind", p, f"/etc/{f}"]
- else:
- options += ["--bind", "/dev/null", f"/etc/{f}"]
-
- return options
from mkosi.log import die
from mkosi.run import run
+from mkosi.types import PathString
@dataclasses.dataclass(frozen=True)
GRUB_BOOT_PARTITION_UUID = "21686148-6449-6e6f-744e-656564454649"
-def find_partitions(image: Path) -> list[Partition]:
+def find_partitions(image: Path, *, sandbox: Sequence[PathString]) -> list[Partition]:
output = json.loads(run(["systemd-repart", "--json=short", image],
- stdout=subprocess.PIPE, stderr=subprocess.DEVNULL).stdout)
+ stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
+ sandbox=sandbox).stdout)
return [Partition.from_dict(d) for d in output]
import sys
import tempfile
import uuid
-from collections.abc import Iterator, Mapping
+from collections.abc import Iterator
from pathlib import Path
from typing import Optional
format_bytes,
)
from mkosi.log import die
-from mkosi.mounts import mount_passwd
from mkosi.partition import finalize_root, find_partitions
from mkosi.run import AsyncioThread, become_root, find_binary, fork_and_wait, run, spawn
from mkosi.tree import copy_tree, rmtree
unknown = enum.auto()
@classmethod
- def identify(cls, path: PathString) -> "KernelType":
- type = run(["bootctl", "kernel-identify", path], stdout=subprocess.PIPE).stdout.strip()
+ def identify(cls, config: Config, path: Path) -> "KernelType":
+ type = run(["bootctl", "kernel-identify", path],
+ stdout=subprocess.PIPE, sandbox=config.sandbox(options=["--ro-bind", path, path])).stdout.strip()
try:
return cls(type)
binaries = ["qemu", "qemu-kvm"] if config.architecture.is_native() else []
binaries += [f"qemu-system-{config.architecture.to_qemu()}"]
for binary in binaries:
- if shutil.which(binary) is not None:
+ if find_binary(binary, root=config.tools()) is not None:
return binary
die("Couldn't find QEMU/KVM binary")
def find_ovmf_firmware(config: Config) -> tuple[Path, bool]:
FIRMWARE_LOCATIONS = {
Architecture.x86_64: [
- "/usr/share/ovmf/x64/OVMF_CODE.secboot.fd",
- "/usr/share/qemu/ovmf-x86_64.smm.bin",
+ "usr/share/ovmf/x64/OVMF_CODE.secboot.fd",
+ "usr/share/qemu/ovmf-x86_64.smm.bin",
+ "usr/share/edk2/x64/OVMF_CODE.secboot.4m.fd",
+ "usr/share/edk2/x64/OVMF_CODE.secboot.fd",
],
Architecture.x86: [
- "/usr/share/edk2/ovmf-ia32/OVMF_CODE.secboot.fd",
- "/usr/share/OVMF/OVMF32_CODE_4M.secboot.fd"
+ "usr/share/edk2/ovmf-ia32/OVMF_CODE.secboot.fd",
+ "usr/share/OVMF/OVMF32_CODE_4M.secboot.fd",
+ "usr/share/edk2/ia32/OVMF_CODE.secboot.4m.fd",
+ "usr/share/edk2/ia32/OVMF_CODE.secboot.fd",
],
}.get(config.architecture, [])
for firmware in FIRMWARE_LOCATIONS:
- if os.path.exists(firmware):
- return Path(firmware), True
+ if (config.tools() / firmware).exists():
+ return Path("/") / firmware, True
FIRMWARE_LOCATIONS = {
Architecture.x86_64: [
- "/usr/share/ovmf/ovmf_code_x64.bin",
- "/usr/share/ovmf/x64/OVMF_CODE.fd",
- "/usr/share/qemu/ovmf-x86_64.bin",
+ "usr/share/ovmf/ovmf_code_x64.bin",
+ "usr/share/ovmf/x64/OVMF_CODE.fd",
+ "usr/share/qemu/ovmf-x86_64.bin",
+ "usr/share/edk2/x64/OVMF_CODE.4m.fd",
+ "usr/share/edk2/x64/OVMF_CODE.fd",
],
- Architecture.x86: ["/usr/share/ovmf/ovmf_code_ia32.bin", "/usr/share/edk2/ovmf-ia32/OVMF_CODE.fd"],
- Architecture.arm64: ["/usr/share/AAVMF/AAVMF_CODE.fd"],
- Architecture.arm: ["/usr/share/AAVMF/AAVMF32_CODE.fd"],
+ Architecture.x86: [
+ "usr/share/ovmf/ovmf_code_ia32.bin",
+ "usr/share/edk2/ovmf-ia32/OVMF_CODE.fd",
+ "usr/share/edk2/ia32/OVMF_CODE.4m.fd",
+ "usr/share/edk2/ia32/OVMF_CODE.fd",
+ ],
+ Architecture.arm64: ["usr/share/AAVMF/AAVMF_CODE.fd"],
+ Architecture.arm: ["usr/share/AAVMF/AAVMF32_CODE.fd"],
}.get(config.architecture, [])
for firmware in FIRMWARE_LOCATIONS:
- if os.path.exists(firmware):
+ if (config.tools() / firmware).exists():
logging.warning("Couldn't find OVMF firmware blob with secure boot support, "
"falling back to OVMF firmware blobs without secure boot support.")
- return Path(firmware), False
+ return Path("/") / firmware, False
# If we can't find an architecture specific path, fall back to some generic paths that might also work.
FIRMWARE_LOCATIONS = [
- "/usr/share/edk2/ovmf/OVMF_CODE.secboot.fd",
- "/usr/share/edk2-ovmf/OVMF_CODE.secboot.fd",
- "/usr/share/qemu/OVMF_CODE.secboot.fd",
- "/usr/share/ovmf/OVMF.secboot.fd",
- "/usr/share/OVMF/OVMF_CODE_4M.secboot.fd",
- "/usr/share/OVMF/OVMF_CODE.secboot.fd",
+ "usr/share/edk2/ovmf/OVMF_CODE.secboot.fd",
+ "usr/share/edk2-ovmf/OVMF_CODE.secboot.fd",
+ "usr/share/qemu/OVMF_CODE.secboot.fd",
+ "usr/share/ovmf/OVMF.secboot.fd",
+ "usr/share/OVMF/OVMF_CODE_4M.secboot.fd",
+ "usr/share/OVMF/OVMF_CODE.secboot.fd",
]
for firmware in FIRMWARE_LOCATIONS:
- if os.path.exists(firmware):
- return Path(firmware), True
+ if (config.tools() / firmware).exists():
+ return Path("/") / firmware, True
FIRMWARE_LOCATIONS = [
- "/usr/share/edk2/ovmf/OVMF_CODE.fd",
- "/usr/share/edk2-ovmf/OVMF_CODE.fd",
- "/usr/share/qemu/OVMF_CODE.fd",
- "/usr/share/ovmf/OVMF.fd",
- "/usr/share/OVMF/OVMF_CODE_4M.fd",
- "/usr/share/OVMF/OVMF_CODE.fd",
+ "usr/share/edk2/ovmf/OVMF_CODE.fd",
+ "usr/share/edk2-ovmf/OVMF_CODE.fd",
+ "usr/share/qemu/OVMF_CODE.fd",
+ "usr/share/ovmf/OVMF.fd",
+ "usr/share/OVMF/OVMF_CODE_4M.fd",
+ "usr/share/OVMF/OVMF_CODE.fd",
]
for firmware in FIRMWARE_LOCATIONS:
- if os.path.exists(firmware):
+ if (config.tools() / firmware).exists():
logging.warn("Couldn't find OVMF firmware blob with secure boot support, "
"falling back to OVMF firmware blobs without secure boot support.")
- return Path(firmware), False
+ return Path("/") / firmware, False
die("Couldn't find OVMF UEFI firmware blob.")
if config.architecture == Architecture.x86_64:
OVMF_VARS_LOCATIONS += [
- "/usr/share/ovmf/x64/OVMF_VARS.fd",
- "/usr/share/qemu/ovmf-x86_64-vars.bin",
+ "usr/share/ovmf/x64/OVMF_VARS.fd",
+ "usr/share/qemu/ovmf-x86_64-vars.bin",
+ "usr/share/edk2/x64/OVMF_VARS.4m.fd",
+ "usr/share/edk2/x64/OVMF_VARS.fd",
]
elif config.architecture == Architecture.x86:
OVMF_VARS_LOCATIONS += [
- "/usr/share/edk2/ovmf-ia32/OVMF_VARS.fd",
- "/usr/share/OVMF/OVMF32_VARS_4M.fd",
+ "usr/share/edk2/ovmf-ia32/OVMF_VARS.fd",
+ "usr/share/OVMF/OVMF32_VARS_4M.fd",
+ "usr/share/edk2/ia32/OVMF_VARS.4m.fd",
+ "usr/share/edk2/ia32/OVMF_VARS.fd",
]
elif config.architecture == Architecture.arm:
- OVMF_VARS_LOCATIONS += ["/usr/share/AAVMF/AAVMF32_VARS.fd"]
+ OVMF_VARS_LOCATIONS += ["usr/share/AAVMF/AAVMF32_VARS.fd"]
elif config.architecture == Architecture.arm64:
- OVMF_VARS_LOCATIONS += ["/usr/share/AAVMF/AAVMF_VARS.fd"]
+ OVMF_VARS_LOCATIONS += ["usr/share/AAVMF/AAVMF_VARS.fd"]
OVMF_VARS_LOCATIONS += [
- "/usr/share/edk2/ovmf/OVMF_VARS.fd",
- "/usr/share/edk2-ovmf/OVMF_VARS.fd",
- "/usr/share/qemu/OVMF_VARS.fd",
- "/usr/share/ovmf/OVMF_VARS.fd",
- "/usr/share/OVMF/OVMF_VARS_4M.fd",
- "/usr/share/OVMF/OVMF_VARS.fd",
+ "usr/share/edk2/ovmf/OVMF_VARS.fd",
+ "usr/share/edk2-ovmf/OVMF_VARS.fd",
+ "usr/share/qemu/OVMF_VARS.fd",
+ "usr/share/ovmf/OVMF_VARS.fd",
+ "usr/share/OVMF/OVMF_VARS_4M.fd",
+ "usr/share/OVMF/OVMF_VARS.fd",
]
for location in OVMF_VARS_LOCATIONS:
- if os.path.exists(location):
- return Path(location)
+ if (config.tools() / location).exists():
+ return config.tools() / location
die("Couldn't find OVMF UEFI variables file.")
@contextlib.contextmanager
-def start_swtpm() -> Iterator[Path]:
+def start_swtpm(config: Config) -> Iterator[Path]:
with tempfile.TemporaryDirectory(prefix="mkosi-swtpm") as state:
- # Make sure qemu can access the swtpm socket in this directory.
- os.chown(state, INVOKING_USER.uid, INVOKING_USER.gid)
-
- cmdline = [
- "swtpm",
- "socket",
- "--tpm2",
- "--tpmstate", f"dir={state}",
- ]
+ cmdline = ["swtpm", "socket", "--tpm2", "--tpmstate", f"dir={state}"]
# We create the socket ourselves and pass the fd to swtpm to avoid race conditions where we start qemu before
# swtpm has had the chance to create the socket (or where we try to chown it first).
sock.bind(os.fspath(path))
sock.listen()
- # Make sure qemu can connect to the swtpm socket.
- os.chown(path, INVOKING_USER.uid, INVOKING_USER.gid)
-
cmdline += ["--ctrl", f"type=unixio,fd={sock.fileno()}"]
- with spawn(
- cmdline,
- user=INVOKING_USER.uid,
- group=INVOKING_USER.gid,
- pass_fds=(sock.fileno(),)
- ) as proc:
+ with spawn(cmdline, pass_fds=(sock.fileno(),), sandbox=config.sandbox()) as proc:
try:
yield path
finally:
proc.wait()
-def find_virtiofsd() -> Optional[Path]:
- if p := find_binary("virtiofsd"):
+def find_virtiofsd(*, tools: Path = Path("/")) -> Optional[Path]:
+ if p := find_binary("virtiofsd", root=tools):
return p
- if (p := Path("/usr/libexec/virtiofsd")).exists():
- return p
+ if (p := tools / "usr/libexec/virtiofsd").exists():
+ return Path("/") / p.relative_to(tools)
- if (p := Path("/usr/lib/virtiofsd")).exists():
- return p
+ if (p := tools / "usr/lib/virtiofsd").exists():
+ return Path("/") / p.relative_to(tools)
return None
@contextlib.contextmanager
-def start_virtiofsd(directory: Path, *, uidmap: bool) -> Iterator[Path]:
- virtiofsd = find_virtiofsd()
+def start_virtiofsd(config: Config, directory: Path, *, uidmap: bool) -> Iterator[Path]:
+ virtiofsd = find_virtiofsd(tools=config.tools())
if virtiofsd is None:
die("virtiofsd must be installed to boot directory images or use RuntimeTrees= with mkosi qemu")
"--xattr",
# qemu's client doesn't seem to support announcing submounts so disable the feature to avoid the warning.
"--no-announce-submounts",
+ "--sandbox=chroot",
]
- # Map the given user/group to root in the virtual machine for the virtiofs instance to make sure all files
- # created by root in the VM are owned by the user running mkosi on the host.
- if uidmap:
- cmdline += [
- "--uid-map", f":0:{INVOKING_USER.uid}:1:",
- "--gid-map", f":0:{INVOKING_USER.gid}:1:"
- ]
-
# We create the socket ourselves and pass the fd to virtiofsd to avoid race conditions where we start qemu
# before virtiofsd has had the chance to create the socket (or where we try to chown it first).
with (
tempfile.TemporaryDirectory(prefix="mkosi-virtiofsd") as context,
socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock,
):
- # Make sure qemu can access the virtiofsd socket in this directory.
+ # Make sure virtiofsd can access the socket in this directory.
os.chown(context, INVOKING_USER.uid, INVOKING_USER.gid)
# Make sure we can use the socket name as a unique identifier for the fs as well but make sure it's not too
sock.bind(os.fspath(path))
sock.listen()
- # Make sure qemu can connect to the virtiofsd socket.
+ # Make sure virtiofsd can connect to the socket.
os.chown(path, INVOKING_USER.uid, INVOKING_USER.gid)
cmdline += ["--fd", str(sock.fileno())]
- # virtiofsd has to run unprivileged to use the --uid-map and --gid-map options, so run it as the given
- # user/group if those are provided.
with spawn(
cmdline,
+ pass_fds=(sock.fileno(),),
+ # When not invoked as root, bubblewrap will automatically map the current uid/gid to the requested uid/gid
+ # in the user namespace it spawns, so by specifying --uid 0 --gid 0 we'll get a userns with the current
+ # uid/gid mapped to root in the userns. --cap-add=all is required to make virtiofsd work. Since it drops
+ # capabilities itself, we don't bother figuring out the exact set of capabilities it needs.
user=INVOKING_USER.uid if uidmap else None,
group=INVOKING_USER.gid if uidmap else None,
- pass_fds=(sock.fileno(),),
- preexec_fn=become_root if not uidmap and os.getuid() != 0 else None,
+ preexec_fn=become_root if not uidmap else None,
+ sandbox=config.sandbox(
+ options=[
+ "--uid", "0",
+ "--gid", "0",
+ "--cap-add", "all",
+ "--bind", directory, directory,
+ ],
+ ),
) as proc:
try:
yield path
copy_tree(
src, tmp,
preserve=config.output_format == OutputFormat.directory,
- use_subvolumes=config.use_subvolumes
+ use_subvolumes=config.use_subvolumes,
+ tools=config.tools(),
+ sandbox=config.sandbox(options=["--ro-bind", src, src, "--bind", tmp.parent, tmp.parent]),
)
fork_and_wait(copy)
if config.output_format == OutputFormat.directory:
become_root()
- rmtree(tmp)
+ rmtree(tmp, sandbox=config.sandbox(options=["--ro-bind", src, src, "--bind", tmp.parent, tmp.parent]))
fork_and_wait(rm)
def qemu_version(config: Config) -> GenericVersion:
- return GenericVersion(run([find_qemu_binary(config), "--version"], stdout=subprocess.PIPE).stdout.split()[3])
+ return GenericVersion(run([find_qemu_binary(config), "--version"],
+ stdout=subprocess.PIPE, sandbox=config.sandbox()).stdout.split()[3])
def want_scratch(config: Config) -> bool:
return config.runtime_scratch == ConfigFeature.enabled or (
config.runtime_scratch == ConfigFeature.auto and
- shutil.which(f"mkfs.{config.distribution.filesystem()}") is not None
+ find_binary(f"mkfs.{config.distribution.filesystem()}", root=config.tools()) is not None
)
-def run_qemu(args: Args, config: Config, qemu_device_fds: Mapping[QemuDeviceNode, int]) -> None:
+def run_qemu(args: Args, config: Config) -> None:
if config.output_format not in (
OutputFormat.disk,
OutputFormat.cpio,
if config.qemu_kvm == ConfigFeature.enabled and not config.architecture.is_native():
die(f"KVM acceleration requested but {config.architecture} does not match the native host architecture")
+ # After we unshare the user namespace to sandbox qemu, we might not have access to /dev/kvm or related device nodes
+ # anymore as access to these might be gated behind the kvm group and we won't be part of the kvm group anymore
+ # after unsharing the user namespace. To get around this, open all those device nodes early can pass them as file
+ # descriptors to qemu later. Note that we can't pass the kvm file descriptor to qemu until version 9.0.
+ qemu_device_fds = {
+ d: d.open()
+ for d in QemuDeviceNode
+ if d.feature(config) != ConfigFeature.disabled and d.available(log=True)
+ }
+
have_kvm = ((qemu_version(config) < QEMU_KVM_DEVICE_VERSION and QemuDeviceNode.kvm.available()) or
(qemu_version(config) >= QEMU_KVM_DEVICE_VERSION and QemuDeviceNode.kvm in qemu_device_fds))
if (config.qemu_kvm == ConfigFeature.enabled and not have_kvm):
if config.qemu_firmware == QemuFirmware.auto:
if kernel:
- firmware = QemuFirmware.uefi if KernelType.identify(kernel) != KernelType.unknown else QemuFirmware.linux
+ firmware = (
+ QemuFirmware.uefi
+ if KernelType.identify(config, kernel) != KernelType.unknown
+ else QemuFirmware.linux
+ )
elif (
config.output_format in (OutputFormat.cpio, OutputFormat.directory) or
config.architecture.to_efi() is None
notifications: dict[str, str] = {}
with contextlib.ExitStack() as stack:
- if (
- os.getuid() == 0 and
- not INVOKING_USER.invoked_as_root and
- config.runtime_trees
- ):
- # In this scenario newuidmap might fail when invoked by virtiofsd as the user running virtiofsd will not
- # be resolvable to a name via NSS so we have to trick newuidmap by mounting over /etc/passwd. Once
- # https://gitlab.com/virtio-fs/virtiofsd/-/issues/137 is fixed we can set up the user namespace ourselves
- # without uidmap to avoid having to mount over /etc/passwd.
- stack.enter_context(mount_passwd())
-
if firmware == QemuFirmware.uefi:
ovmf_vars = stack.enter_context(tempfile.NamedTemporaryFile(prefix="mkosi-ovmf-vars"))
shutil.copy2(config.qemu_firmware_variables or find_ovmf_vars(config), Path(ovmf_vars.name))
- # Make sure qemu can access the ephemeral vars.
- os.chown(ovmf_vars.name, INVOKING_USER.uid, INVOKING_USER.gid)
cmdline += ["-drive", f"file={ovmf_vars.name},if=pflash,format=raw"]
if ovmf_supports_sb:
cmdline += [
# CD-ROM devices have sector size 2048 so we transform disk images into ones with sector size 2048.
src = (config.output_dir_or_cwd() / config.output_with_compression).resolve()
fname = src.parent / f"{src.name}-{uuid.uuid4().hex}"
- run(["systemd-repart",
- "--definitions", "",
- "--no-pager",
- "--pretty=no",
- "--offline=yes",
- "--empty=create",
- "--size=auto",
- "--sector-size=2048",
- "--copy-from", src,
- fname])
+ run(
+ [
+ "systemd-repart",
+ "--definitions", "",
+ "--no-pager",
+ "--pretty=no",
+ "--offline=yes",
+ "--empty=create",
+ "--size=auto",
+ "--sector-size=2048",
+ "--copy-from", src,
+ fname,
+ ],
+ sandbox=config.sandbox(options=["--bind", fname.parent, fname.parent, "--ro-bind", src, src]),
+ )
stack.callback(lambda: fname.unlink())
elif config.ephemeral and config.output_format not in (OutputFormat.cpio, OutputFormat.uki):
fname = stack.enter_context(
else:
fname = config.output_dir_or_cwd() / config.output_with_compression
- # Make sure qemu can access the ephemeral copy. Not required for directory output because we don't pass that
- # directly to qemu, but indirectly via virtiofsd.
- if config.output_format != OutputFormat.directory:
- os.chown(fname, INVOKING_USER.uid, INVOKING_USER.gid)
-
if config.output_format == OutputFormat.disk and config.runtime_size:
- run(["systemd-repart",
- "--definitions", "",
- "--no-pager",
- f"--size={config.runtime_size}",
- "--pretty=no",
- "--offline=yes",
- fname])
+ run(
+ [
+ "systemd-repart",
+ "--definitions", "",
+ "--no-pager",
+ f"--size={config.runtime_size}",
+ "--pretty=no",
+ "--offline=yes",
+ fname,
+ ],
+ sandbox=config.sandbox(options=["--bind", fname, fname]),
+ )
if (
kernel and
- (KernelType.identify(kernel) != KernelType.uki or not config.architecture.supports_smbios(firmware))
+ (
+ KernelType.identify(config, kernel) != KernelType.uki or
+ not config.architecture.supports_smbios(firmware)
+ )
):
kcl = config.kernel_command_line + config.kernel_command_line_extra
else:
f = stack.enter_context(tempfile.NamedTemporaryFile(prefix="mkosi-fw-cfg", mode="w"))
f.write(v)
f.flush()
- os.fchown(f.fileno(), INVOKING_USER.uid, INVOKING_USER.gid)
cmdline += ["-fw_cfg", f"name=opt/io.systemd.credentials/{k},file={f.name}"]
elif kernel:
kcl += [f"systemd.set_credential_binary={k}:{payload}"]
elif config.output_format == OutputFormat.disk:
# We can't rely on gpt-auto-generator when direct kernel booting so synthesize a root=
# kernel argument instead.
- root = finalize_root(find_partitions(fname))
+ root = finalize_root(
+ find_partitions(fname, sandbox=config.sandbox(options=["--ro-bind", fname, fname]))
+ )
if not root:
die("Cannot perform a direct kernel boot without a root or usr partition")
kcl += [root]
elif config.output_format == OutputFormat.directory:
- sock = stack.enter_context(start_virtiofsd(fname, uidmap=False))
+ sock = stack.enter_context(start_virtiofsd(config, fname, uidmap=False))
cmdline += [
"-chardev", f"socket,id={sock.name},path={sock}",
"-device", f"vhost-user-fs-pci,queue-size=1024,chardev={sock.name},tag=root",
kcl += ["root=root", "rootfstype=virtiofs", "rw"]
for tree in config.runtime_trees:
- sock = stack.enter_context(start_virtiofsd(tree.source, uidmap=True))
+ sock = stack.enter_context(start_virtiofsd(config, tree.source, uidmap=True))
tag = tree.target.name if tree.target else tree.source.name
cmdline += [
"-chardev", f"socket,id={sock.name},path={sock}",
if want_scratch(config):
scratch = stack.enter_context(tempfile.NamedTemporaryFile(dir="/var/tmp", prefix="mkosi-scratch"))
scratch.truncate(1024**4)
- os.chown(scratch.name, INVOKING_USER.uid, INVOKING_USER.gid)
run([f"mkfs.{config.distribution.filesystem()}", "-L", "scratch", scratch.name],
- stdout=subprocess.DEVNULL, stderr=None)
+ stdout=subprocess.DEVNULL, stderr=None, sandbox=config.sandbox())
cmdline += [
"-drive", f"if=none,id=scratch,file={scratch.name},format=raw",
"-device", "scsi-hd,drive=scratch",
if (
kernel and
- (KernelType.identify(kernel) != KernelType.uki or not config.architecture.supports_smbios(firmware))
+ (
+ KernelType.identify(config, kernel) != KernelType.uki or
+ not config.architecture.supports_smbios(firmware)
+ )
):
cmdline += ["-append", " ".join(kcl)]
elif config.architecture.supports_smbios(firmware):
if config.output_format == OutputFormat.cpio:
cmdline += ["-initrd", fname]
elif (
- kernel and KernelType.identify(kernel) != KernelType.uki and
+ kernel and KernelType.identify(config, kernel) != KernelType.uki and
"-initrd" not in args.cmdline and
(config.output_dir_or_cwd() / config.output_split_initrd).exists()
):
if (
firmware == QemuFirmware.uefi and
config.qemu_swtpm != ConfigFeature.disabled and
- shutil.which("swtpm") is not None
+ find_binary("swtpm", root=config.tools()) is not None
):
- sock = stack.enter_context(start_swtpm())
+ sock = stack.enter_context(start_swtpm(config))
cmdline += ["-chardev", f"socket,id=chrtpm,path={sock}",
"-tpmdev", "emulator,id=tpm0,chardev=chrtpm"]
tempfile.NamedTemporaryFile(dir=drive.directory or "/var/tmp", prefix=f"mkosi-drive-{drive.id}")
)
file.truncate(drive.size)
- os.chown(file.name, INVOKING_USER.uid, INVOKING_USER.gid)
arg = f"if=none,id={drive.id},file={file.name},format=raw"
if drive.options:
with spawn(
cmdline,
- # On Debian/Ubuntu, only users in the kvm group can access /dev/kvm. The invoking user might be part of the
- # kvm group, but the user namespace fake root user will definitely not be. Thus, we have to run qemu as the
- # invoking user to make sure we can access /dev/kvm. Of course, if we were invoked as root, none of this
- # matters as the root user will always be able to access /dev/kvm.
- user=INVOKING_USER.uid if not INVOKING_USER.invoked_as_root else None,
- group=INVOKING_USER.gid if not INVOKING_USER.invoked_as_root else None,
stdin=sys.stdin,
stdout=sys.stdout,
pass_fds=qemu_device_fds.values(),
env=os.environ,
log=False,
foreground=True,
+ sandbox=config.sandbox(network=True, devices=True, relaxed=True),
) as qemu:
# We have to close these before we wait for qemu otherwise we'll deadlock as qemu will never exit.
for fd in qemu_device_fds.values():
run(
cmd,
- user=INVOKING_USER.uid,
- group=INVOKING_USER.gid,
stdin=sys.stdin,
stdout=sys.stdout,
env=os.environ,
log=False,
+ sandbox=config.sandbox(network=True, devices=True, relaxed=True),
)
from types import TracebackType
from typing import Any, Callable, NoReturn, Optional
-from mkosi.log import ARG_DEBUG, die
+from mkosi.log import ARG_DEBUG, ARG_DEBUG_SHELL, die
from mkosi.types import _FILE, CompletedProcess, PathString, Popen
from mkosi.util import INVOKING_USER, flock
env: Mapping[str, str] = {},
cwd: Optional[Path] = None,
log: bool = True,
+ preexec_fn: Optional[Callable[[], None]] = None,
+ sandbox: Sequence[PathString] = (),
) -> CompletedProcess:
+ sandbox = [os.fspath(x) for x in sandbox]
cmdline = [os.fspath(x) for x in cmdline]
if ARG_DEBUG.get():
- logging.info(f"+ {shlex.join(cmdline)}")
+ logging.info(f"+ {shlex.join(sandbox + cmdline)}")
if not stdout and not stderr:
# Unless explicit redirection is done, print all subprocess
elif stdin is None:
stdin = subprocess.DEVNULL
+ def preexec() -> None:
+ make_foreground_process()
+ if preexec_fn:
+ preexec_fn()
+
try:
# subprocess.run() will use SIGKILL to kill processes when an exception is raised.
# We'd prefer it to use SIGTERM instead but since this we can't configure which signal
# subprocess.run().
with sigkill_to_sigterm():
return subprocess.run(
- cmdline,
+ sandbox + cmdline,
check=check,
stdin=stdin,
stdout=stdout,
group=group,
env=env,
cwd=cwd,
- preexec_fn=make_foreground_process,
+ preexec_fn=preexec,
)
except FileNotFoundError as e:
die(f"{e.filename} not found.")
except subprocess.CalledProcessError as e:
+ if ARG_DEBUG_SHELL.get():
+ subprocess.run(
+ [*sandbox, "sh"],
+ check=False,
+ stdin=sys.stdin,
+ text=True,
+ user=user,
+ group=group,
+ env=env,
+ cwd=cwd,
+ preexec_fn=preexec,
+ )
if log:
log_process_failure(cmdline, e.returncode)
- raise e
+ # Remove the sandboxing stuff from the command line to show a more readable error to users.
+ e.cmd = cmdline
+ raise
finally:
make_foreground_process(new_process_group=False)
log: bool = True,
foreground: bool = False,
preexec_fn: Optional[Callable[[], None]] = None,
+ sandbox: Sequence[PathString] = (),
) -> Iterator[Popen]:
+ sandbox = [os.fspath(x) for x in sandbox]
cmdline = [os.fspath(x) for x in cmdline]
if ARG_DEBUG.get():
- logging.info(f"+ {shlex.join(cmdline)}")
+ logging.info(f"+ {shlex.join(sandbox + cmdline)}")
if not stdout and not stderr:
# Unless explicit redirection is done, print all subprocess
try:
with subprocess.Popen(
- cmdline,
+ sandbox + cmdline,
stdin=stdin,
stdout=stdout,
stderr=stderr,
make_foreground_process(new_process_group=False)
-def find_binary(*names: PathString, root: Optional[Path] = None) -> Optional[Path]:
+def find_binary(*names: PathString, root: Path = Path("/")) -> Optional[Path]:
+ if root != Path("/"):
+ path = ":".join(os.fspath(p) for p in (root / "usr/bin", root / "usr/sbin"))
+ else:
+ path = os.environ["PATH"]
+
for name in names:
- path = ":".join(os.fspath(p) for p in [root / "usr/bin", root / "usr/sbin"]) if root else os.environ["PATH"]
+ if Path(name).is_absolute():
+ name = root / Path(name).relative_to("/")
+ elif "/" in str(name):
+ name = root / name
+
if (binary := shutil.which(name, path=path)):
- return Path("/") / Path(binary).relative_to(root or "/")
+ if root != Path("/") and not Path(binary).is_relative_to(root):
+ return Path(binary)
+ else:
+ return Path("/") / Path(binary).relative_to(root)
return None
# SPDX-License-Identifier: LGPL-2.1+
-import contextlib
import enum
import logging
import os
-import subprocess
-import sys
-from collections.abc import Mapping, Sequence
+from collections.abc import Sequence
from pathlib import Path
from typing import Optional
-from mkosi.context import Context
-from mkosi.log import ARG_DEBUG_SHELL
-from mkosi.mounts import finalize_passwd_mounts, mount_overlay
-from mkosi.run import find_binary, log_process_failure, run
-from mkosi.types import _FILE, CompletedProcess, PathString
-from mkosi.util import flatten, one_zero
+from mkosi.run import find_binary
+from mkosi.types import PathString
+from mkosi.util import INVOKING_USER, flatten, one_zero
# https://github.com/torvalds/linux/blob/master/include/uapi/linux/capability.h
return (int(hexcap, 16) & (1 << capability.value)) != 0
-def finalize_mounts(context: Context) -> list[str]:
+def finalize_passwd_mounts(root: Path) -> list[PathString]:
+ """
+ If passwd or a related file exists in the apivfs directory, bind mount it over the host files while we
+ run the command, to make sure that the command we run uses user/group information from the apivfs
+ directory instead of from the host. If the file doesn't exist yet, mount over /dev/null instead.
+ """
+ options: list[PathString] = []
+
+ for f in ("passwd", "group", "shadow", "gshadow"):
+ if not (Path("/etc") / f).exists():
+ continue
+ p = root / "etc" / f
+ if p.exists():
+ options += ["--bind", p, f"/etc/{f}"]
+ else:
+ options += ["--bind", "/dev/null", f"/etc/{f}"]
+
+ return options
+
+
+def finalize_crypto_mounts(tools: Path = Path("/")) -> list[PathString]:
mounts = [
- ((context.config.tools_tree or Path("/")) / subdir, Path("/") / subdir, True)
+ (tools / subdir, Path("/") / subdir)
for subdir in (
Path("etc/pki"),
Path("etc/ssl"),
Path("etc/pacman.d/gnupg"),
Path("var/lib/ca-certificates"),
)
- if ((context.config.tools_tree or Path("/")) / subdir).exists()
+ if (tools / subdir).exists()
]
- mounts += [
- (d, d, False)
- for d in (context.workspace, context.config.cache_dir, context.config.output_dir, context.config.build_dir)
- if d
- ]
-
- mounts += [(d, d, True) for d in context.config.extra_search_paths]
-
return flatten(
- ["--ro-bind" if readonly else "--bind", os.fspath(src), os.fspath(target)]
- for src, target, readonly
+ ["--ro-bind", src, target]
+ for src, target
in sorted(set(mounts), key=lambda s: s[1])
)
-def bwrap(
- context: Context,
- cmd: Sequence[PathString],
+def sandbox_cmd(
*,
network: bool = False,
devices: bool = False,
- options: Sequence[PathString] = (),
- log: bool = True,
scripts: Optional[Path] = None,
- env: Mapping[str, str] = {},
- stdin: _FILE = None,
- stdout: _FILE = None,
- stderr: _FILE = None,
- input: Optional[str] = None,
- check: bool = True,
-) -> CompletedProcess:
+ tools: Path = Path("/"),
+ relaxed: bool = False,
+ options: Sequence[PathString] = (),
+) -> list[PathString]:
cmdline: list[PathString] = [
"bwrap",
- "--ro-bind", "/usr", "/usr",
- "--ro-bind-try", "/nix/store", "/nix/store",
- # This mount is writable so bwrap can create extra directories or symlinks inside of it as needed. This isn't a
- # problem as the package manager directory is created by mkosi and thrown away when the build finishes.
- "--bind", context.pkgmngr / "etc", "/etc",
- "--ro-bind-try", "/etc/alternatives", "/etc/alternatives",
- "--bind", "/var/tmp", "/var/tmp",
+ "--ro-bind", tools / "usr", "/usr",
"--bind", "/tmp", "/tmp",
- "--bind", Path.cwd(), Path.cwd(),
- "--chdir", Path.cwd(),
*(["--unshare-net"] if not network and have_effective_cap(Capability.CAP_NET_ADMIN) else []),
"--die-with-parent",
"--proc", "/proc",
"--setenv", "SYSTEMD_OFFLINE", one_zero(network),
]
+ if (tools / "nix/store").exists():
+ cmdline += ["--bind", tools / "nix/store", "/nix/store"]
+
if devices:
cmdline += [
"--bind", "/sys", "/sys",
+ "--bind", "/run", "/run",
"--dev-bind", "/dev", "/dev",
]
else:
cmdline += ["--dev", "/dev"]
- for p in Path("/").iterdir():
- if p.is_symlink():
- cmdline += ["--symlink", p.readlink(), p]
+ if relaxed:
+ dirs = ("/etc", "/opt", "/srv", "/media", "/mnt", "/var", os.fspath(INVOKING_USER.home()))
- if network:
- cmdline += ["--bind", "/etc/resolv.conf", "/etc/resolv.conf"]
+ for d in dirs:
+ cmdline += ["--bind", d, d]
+
+ # `Path.parents` only supports slices and negative indexing from Python 3.10 onwards.
+ # TODO: Remove list() when we depend on Python 3.10 or newer.
+ if (d := os.fspath(list(Path.cwd().parents)[-2])) not in (*dirs, "/home", "/usr", "/nix", "/tmp"):
+ cmdline += ["--bind", d, d]
+ else:
+ cmdline += ["--bind", "/var/tmp", "/var/tmp"]
+
+ for d in ("bin", "sbin", "lib", "lib32", "lib64"):
+ if (p := tools / d).is_symlink():
+ cmdline += ["--symlink", p.readlink(), Path("/") / p.relative_to(tools)]
- cmdline += finalize_mounts(context) + [
- "--setenv", "PATH", f"{scripts or ''}:{os.environ['PATH']}",
+ path = "/usr/bin:/usr/sbin" if tools != Path("/") else os.environ["PATH"]
+
+ cmdline += [
+ "--setenv", "PATH", f"{scripts or ''}:{path}",
*options,
- "sh", "-c", "chmod 1777 /dev/shm && exec $0 \"$@\"",
]
- if setpgid := find_binary("setpgid"):
+ # If we're using /usr from a tools tree, we have to use /etc/alternatives from the tools tree as well if it
+ # exists since that points directly back to /usr. Apply this after the options so the caller can mount
+ # something else to /etc without overriding this mount.
+ if (tools / "etc/alternatives").exists():
+ cmdline += ["--ro-bind", tools / "etc/alternatives", "/etc/alternatives"]
+
+ if scripts:
+ cmdline += ["--ro-bind", scripts, scripts]
+
+ if network:
+ cmdline += ["--bind", "/etc/resolv.conf", "/etc/resolv.conf"]
+
+ if devices:
+ shm = ":"
+ else:
+ shm = "chmod 1777 /dev/shm"
+
+ cmdline += ["sh", "-c", f"{shm} && exec $0 \"$@\""]
+
+ if setpgid := find_binary("setpgid", root=tools):
cmdline += [setpgid, "--foreground", "--"]
- try:
- with (
- mount_overlay([Path("/usr"), context.pkgmngr / "usr"], where=Path("/usr"), lazy=True)
- if (context.pkgmngr / "usr").exists()
- else contextlib.nullcontext()
- ):
- return run(
- [*cmdline, *cmd],
- env=env,
- log=False,
- stdin=stdin,
- stdout=stdout,
- stderr=stderr,
- input=input,
- check=check,
- )
- except subprocess.CalledProcessError as e:
- if log:
- log_process_failure([os.fspath(s) for s in cmd], e.returncode)
- if ARG_DEBUG_SHELL.get():
- run([*cmdline, "sh"], stdin=sys.stdin, check=False, env=env, log=False)
- raise e
-
-
-def apivfs_cmd(root: Path) -> list[PathString]:
+ return cmdline
+
+
+def apivfs_cmd(root: Path, *, tools: Path = Path("/")) -> list[PathString]:
cmdline: list[PathString] = [
"bwrap",
"--dev-bind", "/", "/",
- "--chdir", Path.cwd(),
"--tmpfs", root / "run",
"--tmpfs", root / "tmp",
"--bind", os.getenv("TMPDIR", "/var/tmp"), root / "var/tmp",
cmdline += finalize_passwd_mounts(root)
- if setpgid := find_binary("setpgid"):
+ if setpgid := find_binary("setpgid", root=tools):
cmdline += [setpgid, "--foreground", "--"]
chmod = f"chmod 1777 {root / 'tmp'} {root / 'var/tmp'} {root / 'dev/shm'}"
return cmdline
-def chroot_cmd(root: Path, *, resolve: bool = False, options: Sequence[PathString] = ()) -> list[PathString]:
+def chroot_cmd(
+ root: Path,
+ *,
+ resolve: bool = False,
+ tools: Path = Path("/"),
+ options: Sequence[PathString] = (),
+) -> list[PathString]:
cmdline: list[PathString] = [
"sh", "-c",
# No exec here because we need to clean up the /work directory afterwards.
if setpgid := find_binary("setpgid", root=root):
cmdline += [setpgid, "--foreground", "--"]
- return apivfs_cmd(root) + cmdline
+ return apivfs_cmd(root, tools=tools) + cmdline
import shutil
import subprocess
import tempfile
-from collections.abc import Iterator
+from collections.abc import Iterator, Sequence
from pathlib import Path
from mkosi.config import ConfigFeature
from mkosi.log import die
-from mkosi.run import run
+from mkosi.run import find_binary, run
from mkosi.types import PathString
-def statfs(path: Path) -> str:
- return run(["stat", "--file-system", "--format", "%T", path], stdout=subprocess.PIPE).stdout.strip()
+def statfs(path: Path, *, sandbox: Sequence[PathString] = ()) -> str:
+ return run(["stat", "--file-system", "--format", "%T", path],
+ sandbox=sandbox, stdout=subprocess.PIPE).stdout.strip()
-def is_subvolume(path: Path) -> bool:
- return path.is_dir() and statfs(path) == "btrfs" and path.stat().st_ino == 256
+def is_subvolume(path: Path, *, sandbox: Sequence[PathString] = ()) -> bool:
+ return path.is_dir() and statfs(path, sandbox=sandbox) == "btrfs" and path.stat().st_ino == 256
-def make_tree(path: Path, use_subvolumes: ConfigFeature = ConfigFeature.disabled) -> None:
- if use_subvolumes == ConfigFeature.enabled and not shutil.which("btrfs"):
+def make_tree(
+ path: Path,
+ *,
+ use_subvolumes: ConfigFeature = ConfigFeature.disabled,
+ tools: Path = Path("/"),
+ sandbox: Sequence[PathString] = (),
+) -> None:
+ if use_subvolumes == ConfigFeature.enabled and not find_binary("btrfs", root=tools):
die("Subvolumes requested but the btrfs command was not found")
- if statfs(path.parent) != "btrfs":
+ if statfs(path.parent, sandbox=sandbox) != "btrfs":
if use_subvolumes == ConfigFeature.enabled:
die(f"Subvolumes requested but {path} is not located on a btrfs filesystem")
path.mkdir()
return
- if use_subvolumes != ConfigFeature.disabled and shutil.which("btrfs") is not None:
+ if use_subvolumes != ConfigFeature.disabled and find_binary("btrfs", root=tools) is not None:
result = run(["btrfs", "subvolume", "create", path],
- check=use_subvolumes == ConfigFeature.enabled).returncode
+ sandbox=sandbox, check=use_subvolumes == ConfigFeature.enabled).returncode
else:
result = 1
preserve: bool = True,
dereference: bool = False,
use_subvolumes: ConfigFeature = ConfigFeature.disabled,
+ tools: Path = Path("/"),
+ sandbox: Sequence[PathString] = (),
) -> None:
subvolume = (use_subvolumes == ConfigFeature.enabled or
- use_subvolumes == ConfigFeature.auto and shutil.which("btrfs") is not None)
+ use_subvolumes == ConfigFeature.auto and find_binary("btrfs", root=tools) is not None)
- if use_subvolumes == ConfigFeature.enabled and not shutil.which("btrfs"):
+ if use_subvolumes == ConfigFeature.enabled and not find_binary("btrfs", root=tools):
die("Subvolumes requested but the btrfs command was not found")
copy: list[PathString] = [
if (
not subvolume or
not preserve or
- not is_subvolume(src) or
- not shutil.which("btrfs") or
+ not is_subvolume(src, sandbox=sandbox) or
+ not find_binary("btrfs", root=tools) or
(dst.exists() and any(dst.iterdir()))
):
with (
if not preserve
else contextlib.nullcontext()
):
- run(copy)
+ run(copy, sandbox=sandbox)
return
# btrfs can't snapshot to an existing directory so make sure the destination does not exist.
dst.rmdir()
result = run(["btrfs", "subvolume", "snapshot", src, dst],
- check=use_subvolumes == ConfigFeature.enabled).returncode
+ check=use_subvolumes == ConfigFeature.enabled, sandbox=sandbox).returncode
if result != 0:
with (
preserve_target_directories_stat(src, dst)
if not preserve
else contextlib.nullcontext()
):
- run(copy)
+ run(copy, sandbox=sandbox)
-def rmtree(*paths: Path) -> None:
- run(["rm", "-rf", "--", *paths])
+def rmtree(*paths: Path, sandbox: Sequence[PathString] = ()) -> None:
+ if paths:
+ run(["rm", "-rf", "--", *paths], sandbox=sandbox)
-def move_tree(src: Path, dst: Path, use_subvolumes: ConfigFeature = ConfigFeature.disabled) -> None:
+def move_tree(
+ src: Path,
+ dst: Path,
+ *,
+ use_subvolumes: ConfigFeature = ConfigFeature.disabled,
+ tools: Path = Path("/"),
+ sandbox: Sequence[PathString] = (),
+) -> None:
if src == dst:
return
if e.errno != errno.EXDEV:
raise e
- copy_tree(src, dst, use_subvolumes=use_subvolumes)
- rmtree(src)
+ copy_tree(src, dst, use_subvolumes=use_subvolumes, tools=tools, sandbox=sandbox)
+ rmtree(src, sandbox=sandbox)
os.chmod(path, st.st_mode | stat.S_IEXEC)
-def try_import(module: str) -> None:
- try:
- importlib.import_module(module)
- except ModuleNotFoundError:
- pass
-
-
@contextlib.contextmanager
def flock(path: Path) -> Iterator[int]:
fd = os.open(path, os.O_CLOEXEC|os.O_RDONLY)
import subprocess
import tempfile
import textwrap
+import time
from collections.abc import Iterator
from pathlib import Path
import pytest
from mkosi.distributions import Distribution
+from mkosi.log import die
from mkosi.mounts import mount
-from mkosi.run import run
+from mkosi.run import find_binary, run
from mkosi.tree import copy_tree
+from mkosi.types import PathString
from mkosi.util import INVOKING_USER
+from mkosi.versioncomp import GenericVersion
from . import Image
image.qemu()
+def wait_for_device(device: PathString) -> None:
+ if (
+ find_binary("udevadm") and
+ GenericVersion(run(["udevadm", "--version"], stdout=subprocess.PIPE).stdout.strip()) >= 251
+ ):
+ run(["udevadm", "wait", "--timeout=30", "/dev/vg_mkosi/lv0"])
+ return
+
+ for i in range(30):
+ if Path(device).exists():
+ return
+
+ time.sleep(1)
+
+ die(f"Device {device} did not appear within 30 seconds")
+
+
@pytest.mark.skipif(os.getuid() != 0, reason="mkosi-initrd LVM test can only be executed as root")
def test_initrd_lvm(initrd: Image) -> None:
with Image(
stack.callback(lambda: run(["vgchange", "-an", "vg_mkosi"]))
run(["lvm", "lvcreate", "-l", "100%FREE", "-n", "lv0", "vg_mkosi"])
run(["lvm", "lvs"])
- run(["udevadm", "wait", "/dev/vg_mkosi/lv0"])
+ wait_for_device("/dev/vg_mkosi/lv0")
run([f"mkfs.{image.config.distribution.filesystem()}", "-L", "root", "/dev/vg_mkosi/lv0"])
with tempfile.TemporaryDirectory() as mnt, mount(Path("/dev/vg_mkosi/lv0"), Path(mnt)):
stack.callback(lambda: run(["vgchange", "-an", "vg_mkosi"]))
run(["lvm", "lvcreate", "-l", "100%FREE", "-n", "lv0", "vg_mkosi"])
run(["lvm", "lvs"])
- run(["udevadm", "wait", "/dev/vg_mkosi/lv0"])
+ wait_for_device("/dev/vg_mkosi/lv0")
run([f"mkfs.{image.config.distribution.filesystem()}", "-L", "root", "/dev/vg_mkosi/lv0"])
with tempfile.TemporaryDirectory() as mnt, mount(Path("/dev/vg_mkosi/lv0"), Path(mnt)):