from mkosi.pager import page
from mkosi.qemu import copy_ephemeral, machine_cid, run_qemu
from mkosi.remove import unlink_try_hard
-from mkosi.run import become_root, fork_and_wait, run, run_workspace_command, spawn
+from mkosi.run import (
+ become_root,
+ bwrap,
+ bwrap_cmd,
+ fork_and_wait,
+ run,
+ run_workspace_command,
+ spawn,
+)
from mkosi.state import MkosiState
from mkosi.types import PathString
from mkosi.util import (
shutil.unpack_archive(path, d)
bases += [d]
elif path.suffix == ".raw":
+ # We want to use bwrap() here but it doesn't propagate mounts so we use run() instead.
run(["systemd-dissect", "-M", path, d])
stack.callback(lambda: run(["systemd-dissect", "-U", d]))
bases += [d]
d = state.workspace / "build-overlay"
if not d.is_symlink():
d.mkdir(mode=0o755, exist_ok=True)
- return mount_overlay([state.root], state.workspace.joinpath("build-overlay"), state.root, read_only)
+ return mount_overlay([state.root], state.workspace / "build-overlay", state.root, read_only)
def run_prepare_script(state: MkosiState, build: bool) -> None:
return
with complete_step("Running finalize script…"):
- run([state.config.finalize_script],
- env={**state.environment, "BUILDROOT": str(state.root), "OUTPUTDIR": str(state.staging)})
+ bwrap([state.config.finalize_script],
+ root=state.config.tools_tree,
+ env={**state.environment, "BUILDROOT": str(state.root), "OUTPUTDIR": str(state.staging)})
-def certificate_common_name(certificate: Path) -> str:
- output = run([
+def certificate_common_name(state: MkosiState, certificate: Path) -> str:
+ output = bwrap([
"openssl",
"x509",
"-noout",
"-subject",
"-nameopt", "multiline",
"-in", certificate,
- ], text=True, stdout=subprocess.PIPE).stdout
+ ], root=state.config.tools_tree, stdout=subprocess.PIPE).stdout
for line in output.splitlines():
if not line.strip().startswith("commonName"):
# pesign takes a certificate directory and a certificate common name as input arguments, so we have
# to transform our input key and cert into that format. Adapted from
# https://www.mankier.com/1/pesign#Examples-Signing_with_the_certificate_and_private_key_in_individual_files
- run(["openssl",
- "pkcs12",
- "-export",
- # Arcane incantation to create a pkcs12 certificate without a password.
- "-keypbe", "NONE",
- "-certpbe", "NONE",
- "-nomaciter",
- "-passout", "pass:",
- "-out", state.workspace / "secure-boot.p12",
- "-inkey", state.config.secure_boot_key,
- "-in", state.config.secure_boot_certificate])
-
- run(["pk12util",
- "-K", "",
- "-W", "",
- "-i", state.workspace / "secure-boot.p12",
- "-d", state.workspace / "pesign"])
+ bwrap(["openssl",
+ "pkcs12",
+ "-export",
+ # Arcane incantation to create a pkcs12 certificate without a password.
+ "-keypbe", "NONE",
+ "-certpbe", "NONE",
+ "-nomaciter",
+ "-passout", "pass:",
+ "-out", state.workspace / "secure-boot.p12",
+ "-inkey", state.config.secure_boot_key,
+ "-in", state.config.secure_boot_certificate],
+ root=state.config.tools_tree)
+
+ bwrap(["pk12util",
+ "-K", "",
+ "-W", "",
+ "-i", state.workspace / "secure-boot.p12",
+ "-d", state.workspace / "pesign"],
+ root=state.config.tools_tree)
def install_boot_loader(state: MkosiState) -> None:
if (state.config.secure_boot_sign_tool == SecureBootSignTool.sbsign or
state.config.secure_boot_sign_tool == SecureBootSignTool.auto and
shutil.which("sbsign") is not None):
- run(["sbsign",
- "--key", state.config.secure_boot_key,
- "--cert", state.config.secure_boot_certificate,
- "--output", output,
- input])
+ bwrap(["sbsign",
+ "--key", state.config.secure_boot_key,
+ "--cert", state.config.secure_boot_certificate,
+ "--output", output,
+ input],
+ root=state.config.tools_tree)
elif (state.config.secure_boot_sign_tool == SecureBootSignTool.pesign or
state.config.secure_boot_sign_tool == SecureBootSignTool.auto and
shutil.which("pesign") is not None):
pesign_prepare(state)
- run(["pesign",
- "--certdir", state.workspace / "pesign",
- "--certificate", certificate_common_name(state.config.secure_boot_certificate),
- "--sign",
- "--force",
- "--in", input,
- "--out", output])
+ bwrap(["pesign",
+ "--certdir", state.workspace / "pesign",
+ "--certificate", certificate_common_name(state, state.config.secure_boot_certificate),
+ "--sign",
+ "--force",
+ "--in", input,
+ "--out", output],
+ root=state.config.tools_tree)
else:
die("One of sbsign or pesign is required to use SecureBoot=")
with complete_step("Installing boot loader…"):
- run(["bootctl", "install", "--root", state.root, "--all-architectures"], env={"SYSTEMD_ESP_PATH": "/efi"})
+ bwrap(["bootctl", "install", "--root", state.root, "--all-architectures"],
+ env={"SYSTEMD_ESP_PATH": "/efi"}, root=state.config.tools_tree)
if state.config.secure_boot:
assert state.config.secure_boot_key
keys.mkdir(parents=True, exist_ok=True)
# sbsiglist expects a DER certificate.
- run(["openssl",
- "x509",
- "-outform", "DER",
- "-in", state.config.secure_boot_certificate,
- "-out", state.workspace / "mkosi.der"])
- run(["sbsiglist",
- "--owner", str(uuid.uuid4()),
- "--type", "x509",
- "--output", state.workspace / "mkosi.esl",
- state.workspace / "mkosi.der"])
+ bwrap(["openssl",
+ "x509",
+ "-outform", "DER",
+ "-in", state.config.secure_boot_certificate,
+ "-out", state.workspace / "mkosi.der"],
+ root=state.config.tools_tree)
+ bwrap(["sbsiglist",
+ "--owner", str(uuid.uuid4()),
+ "--type", "x509",
+ "--output", state.workspace / "mkosi.esl",
+ state.workspace / "mkosi.der"],
+ root=state.config.tools_tree)
# We reuse the key for all secure boot databases to keep things simple.
for db in ["PK", "KEK", "db"]:
- run(["sbvarsign",
- "--attr",
- "NON_VOLATILE,BOOTSERVICE_ACCESS,RUNTIME_ACCESS,TIME_BASED_AUTHENTICATED_WRITE_ACCESS",
- "--key", state.config.secure_boot_key,
- "--cert", state.config.secure_boot_certificate,
- "--output", keys / f"{db}.auth",
- db,
- state.workspace / "mkosi.esl"])
+ bwrap(["sbvarsign",
+ "--attr",
+ "NON_VOLATILE,BOOTSERVICE_ACCESS,RUNTIME_ACCESS,TIME_BASED_AUTHENTICATED_WRITE_ACCESS",
+ "--key", state.config.secure_boot_key,
+ "--cert", state.config.secure_boot_certificate,
+ "--output", keys / f"{db}.auth",
+ db,
+ state.workspace / "mkosi.esl"],
+ root=state.config.tools_tree)
def install_base_trees(state: MkosiState) -> None:
elif path.suffix == ".tar":
shutil.unpack_archive(path, state.root)
elif path.suffix == ".raw":
- run(["systemd-dissect", "--copy-from", path, "/", state.root])
+ bwrap(["systemd-dissect", "--copy-from", path, "/", state.root],
+ root=state.config.tools_tree)
else:
die(f"Unsupported base tree source {path}")
t.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
if source.is_dir() or target:
- copy_path(source, t, preserve_owner=False)
+ copy_path(source, t, preserve_owner=False, root=state.config.tools_tree)
else:
shutil.unpack_archive(source, t)
t.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
if source.is_dir() or target:
- copy_path(source, t, preserve_owner=False)
+ copy_path(source, t, preserve_owner=False, root=state.config.tools_tree)
else:
shutil.unpack_archive(source, t)
t.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
if source.is_dir() or target:
- copy_path(source, t, preserve_owner=False)
+ copy_path(source, t, preserve_owner=False, root=state.config.tools_tree)
else:
shutil.unpack_archive(source, t)
return
with complete_step("Copying in build tree…"):
- copy_path(state.install_dir, state.root)
+ copy_path(state.install_dir, state.root, root=state.config.tools_tree)
def gzip_binary() -> str:
]
with complete_step("Creating archive…"):
- run(cmd)
+ bwrap(cmd, root=state.config.tools_tree)
def find_files(dir: Path, root: Path) -> Iterator[Path]:
if state.config.output_format != OutputFormat.cpio:
return
- make_cpio(state.root, find_files(state.root, state.root), state.staging / state.config.output_with_format)
+ make_cpio(state, find_files(state.root, state.root), state.staging / state.config.output_with_format)
-def make_cpio(root: Path, files: Iterator[Path], output: Path) -> None:
- with complete_step(f"Creating cpio {output}…"):
+def make_cpio(state: MkosiState, files: Iterator[Path], output: Path) -> None:
+ with complete_step(f"Creating cpio {output}…"), bwrap_cmd(root=state.config.tools_tree) as bwrap:
cmd: list[PathString] = [
- "cpio", "-o", "--reproducible", "--null", "-H", "newc", "--quiet", "-D", root, "-O", output
+ *bwrap,
+ "cpio",
+ "-o",
+ "--reproducible",
+ "--null",
+ "-H", "newc",
+ "--quiet",
+ "-D", state.root,
+ "-O", output
]
with spawn(cmd, stdin=subprocess.PIPE, text=True) as cpio:
return path.name.partition(".")[0]
-def resolve_module_dependencies(root: Path, kver: str, modules: Sequence[str]) -> tuple[set[Path], set[Path]]:
+def resolve_module_dependencies(state: MkosiState, kver: str, modules: Sequence[str]) -> tuple[set[Path], set[Path]]:
"""
Returns a tuple of lists containing the paths to the module and firmware dependencies of the given list
of module names (including the given module paths themselves). The paths are returned relative to the
- given root directory.
+ root directory.
"""
modulesd = Path("usr/lib/modules") / kver
- builtin = set(module_path_to_name(Path(m)) for m in (root / modulesd / "modules.builtin").read_text().splitlines())
- allmodules = set((root / modulesd / "kernel").glob("**/*.ko*"))
- nametofile = {module_path_to_name(m): m.relative_to(root) for m in allmodules}
+ builtin = set(module_path_to_name(Path(m)) for m in (state.root / modulesd / "modules.builtin").read_text().splitlines())
+ allmodules = set((state.root / modulesd / "kernel").glob("**/*.ko*"))
+ nametofile = {module_path_to_name(m): m.relative_to(state.root) for m in allmodules}
# We could run modinfo once for each module but that's slow. Luckily we can pass multiple modules to
# modinfo and it'll process them all in a single go. We get the modinfo for all modules to build two maps
# that map the path of the module to its module dependencies and its firmware dependencies respectively.
- info = run(["modinfo", "--basedir", root, "--set-version", kver, "--null", *nametofile.keys(), *builtin],
- text=True, stdout=subprocess.PIPE).stdout
+ info = bwrap(["modinfo", "--basedir", state.root, "--set-version", kver, "--null", *nametofile.keys(), *builtin],
+ stdout=subprocess.PIPE, root=state.config.tools_tree).stdout
moddep = {}
firmwaredep = {}
depends += [d for d in value.strip().split(",") if d]
elif key == "firmware":
- firmware += [f.relative_to(root) for f in root.joinpath("usr/lib/firmware").glob(f"{value.strip()}*")]
+ firmware += [f.relative_to(state.root) for f in state.root.joinpath("usr/lib/firmware").glob(f"{value.strip()}*")]
elif key == "name":
name = value.strip()
state.config.kernel_modules_initrd_exclude)
names = [module_path_to_name(m) for m in modules]
- mods, firmware = resolve_module_dependencies(state.root, kver, names)
+ mods, firmware = resolve_module_dependencies(state, kver, names)
for p in sorted(mods) + sorted(firmware):
yield p
kmods = state.workspace / f"initramfs-kernel-modules-{kver}.img"
with complete_step(f"Generating kernel modules initrd for kernel {kver}"):
- make_cpio(state.root, files(), kmods)
+ make_cpio(state, files(), kmods)
# Debian/Ubuntu do not compress their kernel modules, so we compress the initramfs instead. Note that
# this is not ideal since the compressed kernel modules will all be decompressed on boot which
return
for kver, kimg in gen_kernel_images(state):
- copy_path(state.root / kimg, state.staging / state.config.output_split_kernel)
+ shutil.copy(state.root / kimg, state.staging / state.config.output_split_kernel)
break
if state.config.output_format == OutputFormat.cpio and state.config.bootable == ConfigFeature.auto:
"--repository-key-check", yes_no(state.config.repository_key_check),
"--repositories", ",".join(state.config.repositories),
"--package-manager-tree", ",".join(format_source_target(s, t) for s, t in state.config.package_manager_trees),
+ *(["--tools-tree", str(state.config.tools_tree)] if state.config.tools_tree else []),
*(["--compress-output", str(state.config.compress_output)] if state.config.compress_output else []),
"--with-network", yes_no(state.config.with_network),
"--cache-only", yes_no(state.config.cache_only),
cmd += [
"--signtool", "pesign",
"--secureboot-certificate-dir", state.workspace / "pesign",
- "--secureboot-certificate-name", certificate_common_name(state.config.secure_boot_certificate),
+ "--secureboot-certificate-name", certificate_common_name(state, state.config.secure_boot_certificate),
]
sign_expected_pcr = (state.config.sign_expected_pcr == ConfigFeature.enabled or
if state.config.kernel_modules_initrd:
cmd += [gen_kernel_modules_initrd(state, kver)]
- run(cmd)
+ bwrap(cmd, root=state.config.tools_tree)
if not state.staging.joinpath(state.config.output_split_uki).exists():
- copy_path(boot_binary, state.staging / state.config.output_split_uki)
+ shutil.copy(boot_binary, state.staging / state.config.output_split_uki)
print_output_size(boot_binary)
src.unlink() # if src == dst, make sure dst doesn't truncate the src file but creates a new file.
with dst.open("wb") as o:
- run(compressor_command(compression), user=state.uid, group=state.gid, stdin=i, stdout=o)
+ bwrap(compressor_command(compression), user=state.uid, group=state.gid, stdin=i, stdout=o,
+ root=state.config.tools_tree)
def copy_nspawn_settings(state: MkosiState) -> None:
return None
with complete_step("Copying nspawn settings file…"):
- copy_path(state.config.nspawn_settings, state.staging / state.config.output_nspawn_settings)
+ shutil.copy(state.config.nspawn_settings, state.staging / state.config.output_nspawn_settings)
def hash_file(of: TextIO, path: Path) -> None:
state.staging / state.config.output_checksum,
]
- run(
+ bwrap(
cmdline,
# Do not output warnings about keyring permissions
stderr=subprocess.DEVNULL,
'GNUPGHOME',
Path(os.environ['HOME']).joinpath('.gnupg')
)
- }
+ },
+ root=state.config.tools_tree,
)
for base in config.base_trees:
check_tree_input(base)
+ check_tree_input(config.tools_tree)
+
for tree in (config.skeleton_trees,
config.extra_trees):
for item in tree:
state.config.kernel_modules_exclude)
names = [module_path_to_name(m) for m in modules]
- mods, firmware = resolve_module_dependencies(state.root, kver, names)
+ mods, firmware = resolve_module_dependencies(state, kver, names)
allmodules = set(m.relative_to(state.root) for m in (state.root / modulesd).glob("**/*.ko*"))
allfirmware = set(m.relative_to(state.root) for m in (state.root / "usr/lib/firmware").glob("**/*") if not m.is_dir())
process_kernel_modules(state, kver)
with complete_step(f"Running depmod for {kver}"):
- run(["depmod", "--all", "--basedir", state.root, kver])
+ bwrap(["depmod", "--all", "--basedir", state.root, kver], root=state.config.tools_tree)
def run_sysusers(state: MkosiState) -> None:
with complete_step("Generating system users"):
- run(["systemd-sysusers", "--root", state.root])
+ bwrap(["systemd-sysusers", "--root", state.root], root=state.config.tools_tree)
def run_preset(state: MkosiState) -> None:
with complete_step("Applying presets…"):
- run(["systemctl", "--root", state.root, "preset-all"])
+ bwrap(["systemctl", "--root", state.root, "preset-all"], root=state.config.tools_tree)
def run_hwdb(state: MkosiState) -> None:
with complete_step("Generating hardware database"):
- run(["systemd-hwdb", "--root", state.root, "--usr", "--strict", "update"])
+ bwrap(["systemd-hwdb", "--root", state.root, "--usr", "--strict", "update"],
+ root=state.config.tools_tree)
def run_firstboot(state: MkosiState) -> None:
return
with complete_step("Applying first boot settings"):
- run(["systemd-firstboot", "--root", state.root, "--force", *options])
+ bwrap(["systemd-firstboot", "--root", state.root, "--force", *options],
+ root=state.config.tools_tree)
# Initrds generally don't ship with only /usr so there's not much point in putting the credentials in
# /usr/lib/credstore.
if not selinux.exists():
return
- policy = run(["sh", "-c", f". {selinux} && echo $SELINUXTYPE"], text=True, stdout=subprocess.PIPE).stdout.strip()
+ policy = bwrap(["sh", "-c", f". {selinux} && echo $SELINUXTYPE"],
+ stdout=subprocess.PIPE, root=state.config.tools_tree).stdout.strip()
if not policy:
return
env[option] = value
with complete_step("Generating disk image"):
- output = json.loads(run(cmdline, stdout=subprocess.PIPE, env=env).stdout)
+ output = json.loads(bwrap(cmdline, stdout=subprocess.PIPE, env=env,
+ root=state.config.tools_tree).stdout)
roothash = usrhash = None
for p in output:
bwrap_params=bwrap, stdout=sys.stdout, env=env | state.environment)
-def setfacl(root: Path, uid: int, allow: bool) -> None:
- run(["setfacl",
- "--physical",
- "--modify" if allow else "--remove",
- f"user:{uid}:rwx" if allow else f"user:{uid}",
- "-"],
- text=True,
- # Supply files via stdin so we don't clutter --debug run output too much
- input="\n".join([str(root),
- *(e.path for e in cast(Iterator[os.DirEntry[str]], scandir_recursive(root)) if e.is_dir())])
+def setfacl(config: MkosiConfig, root: Path, uid: int, allow: bool) -> None:
+ bwrap(["setfacl",
+ "--physical",
+ "--modify" if allow else "--remove",
+ f"user:{uid}:rwx" if allow else f"user:{uid}",
+ "-"],
+ root=config.tools_tree,
+ # Supply files via stdin so we don't clutter --debug run output too much
+ input="\n".join([str(root),
+ *(e.path for e in cast(Iterator[os.DirEntry[str]], scandir_recursive(root)) if e.is_dir())])
)
# getfacl complains about absolute paths so make sure we pass a relative one.
if root.exists():
- has_acl = f"user:{uid}:rwx" in run(["getfacl", "-n", root.relative_to(Path.cwd())], stdout=subprocess.PIPE, text=True).stdout
+ has_acl = f"user:{uid}:rwx" in bwrap([
+ "getfacl", "-n", root.relative_to(Path.cwd())],
+ stdout=subprocess.PIPE,
+ root=config.tools_tree,
+ ).stdout
+
if not has_acl and not always:
yield
return
try:
if has_acl:
with complete_step(f"Removing ACLs from {root}"):
- setfacl(root, uid, allow=False)
+ setfacl(config, root, uid, allow=False)
yield
finally:
if has_acl or always:
with complete_step(f"Adding ACLs to {root}"):
- setfacl(root, uid, allow=True)
+ setfacl(config, root, uid, allow=True)
@contextlib.contextmanager
fname = config.output_dir / config.output
if config.output_format == OutputFormat.disk and args.verb == Verb.boot:
- run(["systemd-repart",
- "--image", fname,
- "--size", "8G",
- "--no-pager",
- "--dry-run=no",
- "--offline=no",
- fname])
+ bwrap(["systemd-repart",
+ "--image", fname,
+ "--size", "8G",
+ "--no-pager",
+ "--dry-run=no",
+ "--offline=no",
+ fname],
+ root=config.tools_tree)
if config.output_format == OutputFormat.directory:
cmdline += ["--directory", fname]
stack.enter_context(acl_toggle_boot(config))
- run(cmdline, stdin=sys.stdin, stdout=sys.stdout, env=os.environ, log=False)
+ bwrap(cmdline,
+ stdin=sys.stdin,
+ stdout=sys.stdout,
+ env=os.environ,
+ log=False,
+ root=config.tools_tree)
def run_ssh(args: MkosiArgs, config: MkosiConfig) -> None:
cmd += args.cmdline
- run(cmd, stdin=sys.stdin, stdout=sys.stdout, env=os.environ, log=False)
+ bwrap(cmd, stdin=sys.stdin, stdout=sys.stdout, env=os.environ, log=False, root=config.tools_tree)
def run_serve(config: MkosiConfig) -> None: