from mkosi.pager import page
from mkosi.partition import Partition, finalize_root, finalize_roothash
from mkosi.qemu import KernelType, QemuDeviceNode, copy_ephemeral, run_qemu, run_ssh
-from mkosi.run import (
- become_root,
- find_binary,
- fork_and_wait,
- init_mount_namespace,
- run,
- run_openssl,
-)
+from mkosi.run import become_root, find_binary, fork_and_wait, init_mount_namespace, run
from mkosi.state import MkosiState
from mkosi.tree import copy_tree, move_tree, rmtree
from mkosi.types import PathString
extract_tar(state, path, d)
bases += [d]
elif path.suffix == ".raw":
- run(["systemd-dissect", "-M", path, d])
- stack.callback(lambda: run(["systemd-dissect", "-U", d]))
+ bwrap(state, ["systemd-dissect", "-M", path, d])
+ stack.callback(lambda: bwrap(state, ["systemd-dissect", "-U", d]))
bases += [d]
else:
die(f"Unsupported base tree source {path}")
)
-def certificate_common_name(certificate: Path) -> str:
- output = run_openssl([
- "x509",
- "-noout",
- "-subject",
- "-nameopt", "multiline",
- "-in", certificate,
- ], stdout=subprocess.PIPE).stdout
+def certificate_common_name(state: MkosiState, certificate: Path) -> str:
+ output = bwrap(
+ state,
+ [
+ "openssl",
+ "x509",
+ "-noout",
+ "-subject",
+ "-nameopt", "multiline",
+ "-in", certificate,
+ ],
+ stdout=subprocess.PIPE,
+ ).stdout
for line in output.splitlines():
if not line.strip().startswith("commonName"):
# pesign takes a certificate directory and a certificate common name as input arguments, so we have
# to transform our input key and cert into that format. Adapted from
# https://www.mankier.com/1/pesign#Examples-Signing_with_the_certificate_and_private_key_in_individual_files
- run(["openssl",
- "pkcs12",
- "-export",
- # Arcane incantation to create a pkcs12 certificate without a password.
- "-keypbe", "NONE",
- "-certpbe", "NONE",
- "-nomaciter",
- "-passout", "pass:",
- "-out", state.workspace / "secure-boot.p12",
- "-inkey", state.config.secure_boot_key,
- "-in", state.config.secure_boot_certificate])
-
- run(["pk12util",
- "-K", "",
- "-W", "",
- "-i", state.workspace / "secure-boot.p12",
- "-d", state.workspace / "pesign"])
+ bwrap(
+ state,
+ [
+ "openssl",
+ "pkcs12",
+ "-export",
+ # Arcane incantation to create a pkcs12 certificate without a password.
+ "-keypbe", "NONE",
+ "-certpbe", "NONE",
+ "-nomaciter",
+ "-passout", "pass:",
+ "-out", state.workspace / "secure-boot.p12",
+ "-inkey", state.config.secure_boot_key,
+ "-in", state.config.secure_boot_certificate,
+ ],
+ )
+
+ bwrap(
+ state,
+ [
+ "pk12util",
+ "-K", "",
+ "-W", "",
+ "-i", state.workspace / "secure-boot.p12",
+ "-d", state.workspace / "pesign",
+ ],
+ )
def efi_boot_binary(state: MkosiState) -> Path:
state.config.secure_boot_sign_tool == SecureBootSignTool.auto and
shutil.which("sbsign") is not None
):
- run([
- "sbsign",
- "--key", state.config.secure_boot_key,
- "--cert", state.config.secure_boot_certificate,
- "--output", output,
- input,
- ])
+ bwrap(
+ state,
+ [
+ "sbsign",
+ "--key", state.config.secure_boot_key,
+ "--cert", state.config.secure_boot_certificate,
+ "--output", output,
+ input,
+ ],
+ )
elif (
state.config.secure_boot_sign_tool == SecureBootSignTool.pesign or
state.config.secure_boot_sign_tool == SecureBootSignTool.auto and
shutil.which("pesign") is not None
):
pesign_prepare(state)
- run([
- "pesign",
- "--certdir", state.workspace / "pesign",
- "--certificate", certificate_common_name(state.config.secure_boot_certificate),
- "--sign",
- "--force",
- "--in", input,
- "--out", output,
- ])
+ bwrap(
+ state,
+ [
+ "pesign",
+ "--certdir", state.workspace / "pesign",
+ "--certificate", certificate_common_name(state, state.config.secure_boot_certificate),
+ "--sign",
+ "--force",
+ "--in", input,
+ "--out", output,
+ ],
+ )
else:
die("One of sbsign or pesign is required to use SecureBoot=")
sign_efi_binary(state, input, output)
with complete_step("Installing systemd-boot…"):
- run(["bootctl", "install", "--root", state.root, "--all-architectures", "--no-variables"],
- env={"SYSTEMD_ESP_PATH": "/efi", "SYSTEMD_LOG_LEVEL": "debug"})
+ bwrap(
+ state,
+ ["bootctl", "install", "--root", state.root, "--all-architectures", "--no-variables"],
+ env={"SYSTEMD_ESP_PATH": "/efi", "SYSTEMD_LOG_LEVEL": "debug"},
+ )
if state.config.shim_bootloader != ShimBootloader.none:
shutil.copy2(
keys.mkdir(parents=True, exist_ok=True)
# sbsiglist expects a DER certificate.
- run_openssl(["x509",
- "-outform", "DER",
- "-in", state.config.secure_boot_certificate,
- "-out", state.workspace / "mkosi.der"])
- run(["sbsiglist",
- "--owner", str(uuid.uuid4()),
- "--type", "x509",
- "--output", state.workspace / "mkosi.esl",
- state.workspace / "mkosi.der"])
+ bwrap(
+ state,
+ [
+ "openssl",
+ "x509",
+ "-outform", "DER",
+ "-in", state.config.secure_boot_certificate,
+ "-out", state.workspace / "mkosi.der",
+ ],
+ )
+
+ bwrap(
+ state,
+ [
+ "sbsiglist",
+ "--owner", str(uuid.uuid4()),
+ "--type", "x509",
+ "--output", state.workspace / "mkosi.esl",
+ state.workspace / "mkosi.der",
+ ],
+ )
# We reuse the key for all secure boot databases to keep things simple.
for db in ["PK", "KEK", "db"]:
- run(["sbvarsign",
- "--attr",
- "NON_VOLATILE,BOOTSERVICE_ACCESS,RUNTIME_ACCESS,TIME_BASED_AUTHENTICATED_WRITE_ACCESS",
- "--key", state.config.secure_boot_key,
- "--cert", state.config.secure_boot_certificate,
- "--output", keys / f"{db}.auth",
- db,
- state.workspace / "mkosi.esl"])
+ bwrap(
+ state,
+ [
+ "sbvarsign",
+ "--attr",
+ "NON_VOLATILE,BOOTSERVICE_ACCESS,RUNTIME_ACCESS,TIME_BASED_AUTHENTICATED_WRITE_ACCESS",
+ "--key", state.config.secure_boot_key,
+ "--cert", state.config.secure_boot_certificate,
+ "--output", keys / f"{db}.auth",
+ db,
+ state.workspace / "mkosi.esl",
+ ],
+ )
def find_and_install_shim_binary(
# this is not ideal since the compressed kernel modules will all be decompressed on boot which
# requires significant memory.
if state.config.distribution.is_apt_distribution():
- maybe_compress(state.config, Compression.zstd, kmods, kmods)
+ maybe_compress(state, Compression.zstd, kmods, kmods)
return kmods
"""
)
- run([python_binary(state.config)], input=pefile)
+ bwrap(state, [python_binary(state.config)], input=pefile)
def build_uki(
cmd += [
"--signtool", "pesign",
"--secureboot-certificate-dir", state.workspace / "pesign",
- "--secureboot-certificate-name", certificate_common_name(state.config.secure_boot_certificate),
+ "--secureboot-certificate-name", certificate_common_name(state, state.config.secure_boot_certificate),
]
sign_expected_pcr = (state.config.sign_expected_pcr == ConfigFeature.enabled or
cmd += ["--initrd", initrd]
with complete_step(f"Generating unified kernel image for kernel version {kver}"):
- run(cmd)
+ bwrap(state, cmd)
def want_efi(config: MkosiConfig) -> bool:
):
return state.config.image_id or state.config.distribution.name
- output = json.loads(run(["kernel-install", "--root", state.root, "--json=pretty", "inspect"],
+ output = json.loads(bwrap(state, ["kernel-install", "--root", state.root, "--json=pretty", "inspect"],
stdout=subprocess.PIPE).stdout)
logging.debug(json.dumps(output, indent=4))
return cast(str, output["EntryToken"])
def make_uki(state: MkosiState, stub: Path, kver: str, kimg: Path, output: Path) -> None:
microcode = build_microcode_initrd(state)
make_cpio(state, state.root, state.workspace / "initrd")
- maybe_compress(state.config, state.config.compress_output, state.workspace / "initrd", state.workspace / "initrd")
+ maybe_compress(state, state.config.compress_output, state.workspace / "initrd", state.workspace / "initrd")
initrds = [microcode] if microcode else []
initrds += [state.workspace / "initrd"]
die(f"Unknown compression {compression}")
-def maybe_compress(config: MkosiConfig, compression: Compression, src: Path, dst: Optional[Path] = None) -> None:
+def maybe_compress(state: MkosiState, compression: Compression, src: Path, dst: Optional[Path] = None) -> None:
if not compression or src.is_dir():
if dst:
- move_tree(src, dst, use_subvolumes=config.use_subvolumes)
+ move_tree(src, dst, use_subvolumes=state.config.use_subvolumes)
return
if not dst:
src.unlink() # if src == dst, make sure dst doesn't truncate the src file but creates a new file.
with dst.open("wb") as o:
- run(compressor_command(compression), stdin=i, stdout=o)
+ bwrap(state, compressor_command(compression), stdin=i, stdout=o)
def copy_vmlinuz(state: MkosiState) -> None:
if sys.stderr.isatty():
env |= dict(GPGTTY=os.ttyname(sys.stderr.fileno()))
- run(
- cmdline,
- # Do not output warnings about keyring permissions
- stderr=subprocess.DEVNULL,
- env=env,
- )
+ # Do not output warnings about keyring permissions
+ bwrap(state, cmdline, stderr=subprocess.DEVNULL, env=env)
def dir_size(path: Union[Path, os.DirEntry[str]]) -> int:
)
with complete_step(f"Running depmod for {kver}"):
- run(["depmod", "--all", "--basedir", state.root, kver])
+ bwrap(state, ["depmod", "--all", "--basedir", state.root, kver])
def run_sysusers(state: MkosiState) -> None:
return
with complete_step("Generating system users"):
- run(["systemd-sysusers", "--root", state.root])
+ bwrap(state, ["systemd-sysusers", "--root", state.root])
def run_preset(state: MkosiState) -> None:
return
with complete_step("Applying presets…"):
- run(["systemctl", "--root", state.root, "preset-all"])
- run(["systemctl", "--root", state.root, "--global", "preset-all"])
+ bwrap(state, ["systemctl", "--root", state.root, "preset-all"])
+ bwrap(state, ["systemctl", "--root", state.root, "--global", "preset-all"])
def run_hwdb(state: MkosiState) -> None:
return
with complete_step("Generating hardware database"):
- run(["systemd-hwdb", "--root", state.root, "--usr", "--strict", "update"])
+ bwrap(state, ["systemd-hwdb", "--root", state.root, "--usr", "--strict", "update"])
# Remove any existing hwdb in /etc in favor of the one we just put in /usr.
(state.root / "etc/udev/hwdb.bin").unlink(missing_ok=True)
return
with complete_step("Applying first boot settings"):
- run(["systemd-firstboot", "--root", state.root, "--force", *options])
+ bwrap(state, ["systemd-firstboot", "--root", state.root, "--force", *options])
# Initrds generally don't ship with only /usr so there's not much point in putting the credentials in
# /usr/lib/credstore.
die("SELinux relabel is requested but could not find selinux config at /etc/selinux/config")
return
- policy = run(["sh", "-c", f". {selinux} && echo $SELINUXTYPE"], stdout=subprocess.PIPE).stdout.strip()
+ policy = bwrap(state, ["sh", "-c", f". {selinux} && echo $SELINUXTYPE"], stdout=subprocess.PIPE).stdout.strip()
if not policy:
if state.config.selinux_relabel == ConfigFeature.enabled:
die("SELinux relabel is requested but no selinux policy is configured in /etc/selinux/config")
die(f"SELinux binary policy not found in {binpolicydir}")
with complete_step(f"Relabeling files using {policy} policy"):
- run(["setfiles", "-mFr", state.root, "-c", binpolicy, fc, state.root],
- check=state.config.selinux_relabel == ConfigFeature.enabled)
+ bwrap(state, ["setfiles", "-mFr", state.root, "-c", binpolicy, fc, state.root],
+ check=state.config.selinux_relabel == ConfigFeature.enabled)
def need_build_overlay(config: MkosiConfig) -> bool:
}
with complete_step(msg):
- output = json.loads(run(cmdline, stdout=subprocess.PIPE, env=env).stdout)
+ output = json.loads(
+ bwrap(state, cmdline, devices=not state.config.repart_offline, stdout=subprocess.PIPE, env=env).stdout
+ )
logging.debug(json.dumps(output, indent=4))
if split:
for p in partitions:
if p.split_path:
- maybe_compress(state.config, state.config.compress_output, p.split_path)
+ maybe_compress(state, state.config.compress_output, p.split_path)
return partitions
resource_path(mkosi.resources) as r,
complete_step(f"Building {state.config.output_format} extension image")
):
- run(cmdline + ["--definitions", r / f"repart/definitions/{state.config.output_format}.repart.d"], env=env)
+ bwrap(
+ state,
+ cmdline + ["--definitions", r / f"repart/definitions/{state.config.output_format}.repart.d"],
+ devices=not state.config.repart_offline,
+ env=env,
+ )
def finalize_staging(state: MkosiState) -> None:
state.root.rename(state.staging / state.config.output_with_format)
if config.output_format not in (OutputFormat.uki, OutputFormat.esp):
- maybe_compress(state.config, state.config.compress_output,
+ maybe_compress(state, state.config.compress_output,
state.staging / state.config.output_with_format,
state.staging / state.config.output_with_compression)
)
)
- run_openssl(["req",
- "-new",
- "-x509",
- "-newkey", f"rsa:{keylength}",
- "-keyout", "mkosi.key",
- "-out", "mkosi.crt",
- "-days", str(args.genkey_valid_days),
- "-subj", f"/CN={cn}/",
- "-nodes"])
+ run(
+ [
+ "openssl",
+ "req",
+ "-new",
+ "-x509",
+ "-newkey", f"rsa:{keylength}",
+ "-keyout", "mkosi.key",
+ "-out", "mkosi.crt",
+ "-days", str(args.genkey_valid_days),
+ "-subj", f"/CN={cn}/",
+ "-nodes"
+ ],
+ env=dict(OPENSSL_CONF="/dev/null"),
+ )
def bump_image_version() -> None: