state.partition_table = table
-def create_image(state: MkosiState, for_cache: bool) -> Optional[BinaryIO]:
+def create_image(state: MkosiState) -> Optional[BinaryIO]:
initialize_partition_table(state, force=True)
if state.partition_table is None:
return None
f: BinaryIO = cast(
BinaryIO,
- tempfile.NamedTemporaryFile(prefix=".mkosi-", delete=not for_cache, dir=state.config.output.parent),
+ tempfile.NamedTemporaryFile(prefix=".mkosi-", delete=not state.for_cache, dir=state.config.output.parent),
)
output.append(f)
disable_cow(f.name)
return f
-def reuse_cache_image(state: MkosiState, for_cache: bool) -> Tuple[Optional[BinaryIO], bool]:
+def reuse_cache_image(state: MkosiState) -> Tuple[Optional[BinaryIO], bool]:
if not state.config.incremental:
return None, False
if not state.config.output_format.is_disk_rw():
return None, False
fname = state.cache_pre_dev if state.do_run_build_script else state.cache_pre_inst
- if for_cache:
+ if state.for_cache:
if fname and os.path.exists(fname):
# Cache already generated, skip generation, note that manually removing the exising cache images is
# necessary if Packages or BuildPackages change
remove(state.config.remove_packages)
-def reset_machine_id(state: MkosiState, for_cache: bool) -> None:
+def reset_machine_id(state: MkosiState) -> None:
"""Make /etc/machine-id an empty file.
This way, on the next boot is either initialized and committed (if /etc is
if state.do_run_build_script:
return
- if for_cache:
+ if state.for_cache:
return
with complete_step("Resetting machine ID"):
patch_file(state.root / "etc/shadow", set_root_pw)
-def invoke_fstrim(state: MkosiState, for_cache: bool) -> None:
+def invoke_fstrim(state: MkosiState) -> None:
if state.do_run_build_script:
return
return
if not state.config.output_format.is_disk():
return
- if for_cache:
+ if state.for_cache:
return
with complete_step("Trimming File System"):
os.unlink(root_home(state) / "prepare")
-def run_postinst_script(
- state: MkosiState, loopdev: Optional[Path], for_cache: bool
-) -> None:
+def run_postinst_script(state: MkosiState) -> None:
if state.config.postinst_script is None:
return
- if for_cache:
+ if state.for_cache:
return
verb = "build" if state.do_run_build_script else "final"
return config.output_dir or Path(os.getcwd())
-def run_finalize_script(state: MkosiState, for_cache: bool) -> None:
+def run_finalize_script(state: MkosiState) -> None:
if state.config.finalize_script is None:
return
- if for_cache:
+ if state.for_cache:
return
verb = "build" if state.do_run_build_script else "final"
run_workspace_command(state, ["bootctl", "install"])
-def install_extra_trees(state: MkosiState, for_cache: bool) -> None:
+def install_extra_trees(state: MkosiState) -> None:
if not state.config.extra_trees:
return
- if for_cache:
+ if state.for_cache:
return
with complete_step("Copying in extra file trees…"):
copy_file(src_path, dest_path)
-def install_build_src(state: MkosiState, for_cache: bool) -> None:
- if for_cache:
+def install_build_src(state: MkosiState) -> None:
+ if state.for_cache:
return
if state.do_run_build_script:
shutil.copytree(state.config.build_sources, target, symlinks=not resolve_symlinks, ignore=ignore)
-def install_build_dest(state: MkosiState, for_cache: bool) -> None:
+def install_build_dest(state: MkosiState) -> None:
if state.do_run_build_script:
return
- if for_cache:
+ if state.for_cache:
return
if state.config.build_script is None:
copy_path(install_dir(state), state.root, copystat=False)
-def make_read_only(config: MkosiConfig, root: Path, for_cache: bool, b: bool = True) -> None:
- if not config.read_only:
+def make_read_only(state: MkosiState, directory: Path, b: bool = True) -> None:
+ if not state.config.read_only:
return
- if for_cache:
+ if state.for_cache:
return
- if config.output_format not in (OutputFormat.gpt_btrfs, OutputFormat.subvolume):
+ if state.config.output_format not in (OutputFormat.gpt_btrfs, OutputFormat.subvolume):
return
- if is_generated_root(config):
+ if is_generated_root(state.config):
return
- with complete_step("Marking root subvolume read-only"):
- btrfs_subvol_make_ro(root, b)
+ with complete_step("Marking subvolume read-only"):
+ btrfs_subvol_make_ro(directory, b)
def xz_binary() -> str:
return "gtar" if shutil.which("gtar") else "tar"
-def make_tar(state: MkosiState, for_cache: bool) -> Optional[BinaryIO]:
+def make_tar(state: MkosiState) -> Optional[BinaryIO]:
if state.do_run_build_script:
return None
if state.config.output_format != OutputFormat.tar:
return None
- if for_cache:
+ if state.for_cache:
return None
root_dir = state.root / "usr" if state.config.usr_only else state.root
lambda entry: Path(entry.path).relative_to(root))
-def make_cpio(
- state: MkosiState, for_cache: bool
-) -> Optional[BinaryIO]:
+def make_cpio(state: MkosiState) -> Optional[BinaryIO]:
if state.do_run_build_script:
return None
if state.config.output_format != OutputFormat.cpio:
return None
- if for_cache:
+ if state.for_cache:
return None
root_dir = state.root / "usr" if state.config.usr_only else state.root
return f
-def generate_squashfs(config: MkosiConfig, root: Path, for_cache: bool) -> Optional[BinaryIO]:
- if not config.output_format.is_squashfs():
+def generate_squashfs(state: MkosiState, directory: Path) -> Optional[BinaryIO]:
+ if not state.config.output_format.is_squashfs():
return None
- if for_cache:
+ if state.for_cache:
return None
- command = config.mksquashfs_tool[0] if config.mksquashfs_tool else "mksquashfs"
- comp_args = config.mksquashfs_tool[1:] if config.mksquashfs_tool and config.mksquashfs_tool[1:] else ["-noappend"]
+ command = state.config.mksquashfs_tool[0] if state.config.mksquashfs_tool else "mksquashfs"
+ comp_args = state.config.mksquashfs_tool[1:] if state.config.mksquashfs_tool and state.config.mksquashfs_tool[1:] else ["-noappend"]
- compress = should_compress_fs(config)
+ compress = should_compress_fs(state.config)
# mksquashfs default is true, so no need to specify anything to have the default compression.
if isinstance(compress, str):
comp_args += ["-comp", compress]
with complete_step("Creating squashfs file system…"):
f: BinaryIO = cast(
- BinaryIO, tempfile.NamedTemporaryFile(prefix=".mkosi-squashfs", dir=os.path.dirname(config.output))
+ BinaryIO, tempfile.NamedTemporaryFile(prefix=".mkosi-squashfs", dir=os.path.dirname(state.config.output))
)
- run([command, root, f.name, *comp_args])
+ run([command, directory, f.name, *comp_args])
return f
-def generate_ext4(config: MkosiConfig, root: Path, label: str, for_cache: bool) -> Optional[BinaryIO]:
- if config.output_format != OutputFormat.gpt_ext4:
+def generate_ext4(state: MkosiState, directory: Path, label: str) -> Optional[BinaryIO]:
+ if state.config.output_format != OutputFormat.gpt_ext4:
return None
- if for_cache:
+ if state.for_cache:
return None
with complete_step("Creating ext4 root file system…"):
f: BinaryIO = cast(
- BinaryIO, tempfile.NamedTemporaryFile(prefix=".mkosi-mkfs-ext4", dir=os.path.dirname(config.output))
+ BinaryIO, tempfile.NamedTemporaryFile(prefix=".mkosi-mkfs-ext4", dir=os.path.dirname(state.config.output))
)
- f.truncate(config.root_size)
- run(["mkfs.ext4", "-I", "256", "-L", label, "-M", "/", "-d", root, f.name])
+ f.truncate(state.config.root_size)
+ run(["mkfs.ext4", "-I", "256", "-L", label, "-M", "/", "-d", directory, f.name])
- if config.minimize:
+ if state.config.minimize:
with complete_step("Minimizing ext4 root file system…"):
run(["resize2fs", "-M", f.name])
return f
-def generate_btrfs(config: MkosiConfig, root: Path, label: str, for_cache: bool) -> Optional[BinaryIO]:
- if config.output_format != OutputFormat.gpt_btrfs:
+def generate_btrfs(state: MkosiState, directory: Path, label: str) -> Optional[BinaryIO]:
+ if state.config.output_format != OutputFormat.gpt_btrfs:
return None
- if for_cache:
+ if state.for_cache:
return None
with complete_step("Creating minimal btrfs root file system…"):
- f: BinaryIO = cast(BinaryIO, tempfile.NamedTemporaryFile(prefix=".mkosi-mkfs-btrfs", dir=config.output.parent))
- f.truncate(config.root_size)
+ f: BinaryIO = cast(BinaryIO, tempfile.NamedTemporaryFile(prefix=".mkosi-mkfs-btrfs", dir=state.config.output.parent))
+ f.truncate(state.config.root_size)
cmdline: Sequence[PathString] = [
- "mkfs.btrfs", "-L", label, "-d", "single", "-m", "single", "--rootdir", root, f.name
+ "mkfs.btrfs", "-L", label, "-d", "single", "-m", "single", "--rootdir", directory, f.name
]
- if config.minimize:
+ if state.config.minimize:
try:
run([*cmdline, "--shrink"])
except subprocess.CalledProcessError:
return f
-def generate_xfs(state: MkosiState, directory: Path, label: str, for_cache: bool) -> Optional[BinaryIO]:
+def generate_xfs(state: MkosiState, directory: Path, label: str) -> Optional[BinaryIO]:
if state.config.output_format != OutputFormat.gpt_xfs:
return None
- if for_cache:
+ if state.for_cache:
return None
with complete_step("Creating xfs root file system…"):
return f
-def make_generated_root(state: MkosiState, for_cache: bool) -> Optional[BinaryIO]:
+def make_generated_root(state: MkosiState) -> Optional[BinaryIO]:
if not is_generated_root(state.config):
return None
patched_root = state.root / "usr" if state.config.usr_only else state.root
if state.config.output_format == OutputFormat.gpt_xfs:
- return generate_xfs(state, patched_root, label, for_cache)
+ return generate_xfs(state, patched_root, label)
if state.config.output_format == OutputFormat.gpt_ext4:
- return generate_ext4(state.config, patched_root, label, for_cache)
+ return generate_ext4(state, patched_root, label)
if state.config.output_format == OutputFormat.gpt_btrfs:
- return generate_btrfs(state.config, patched_root, label, for_cache)
+ return generate_btrfs(state, patched_root, label)
if state.config.output_format.is_squashfs():
- return generate_squashfs(state.config, patched_root, for_cache)
+ return generate_squashfs(state, patched_root)
return None
raw: Optional[BinaryIO],
loopdev: Optional[Path],
image: Optional[BinaryIO],
- for_cache: bool,
) -> Optional[Partition]:
if not is_generated_root(state.config):
return None
if not state.config.output_format.is_disk():
return None
- if for_cache:
+ if state.for_cache:
return None
assert raw is not None
assert loopdev is not None
read_only=state.config.read_only)
-def make_verity(state: MkosiState, dev: Optional[Path], for_cache: bool) -> Tuple[Optional[BinaryIO], Optional[str]]:
+def make_verity(state: MkosiState, dev: Optional[Path]) -> Tuple[Optional[BinaryIO], Optional[str]]:
if state.do_run_build_script or state.config.verity is False:
return None, None
- if for_cache:
+ if state.for_cache:
return None, None
assert dev is not None
loopdev: Optional[Path],
verity: Optional[BinaryIO],
root_hash: Optional[str],
- for_cache: bool,
) -> Optional[Partition]:
if verity is None:
return None
- if for_cache:
+ if state.for_cache:
return None
assert loopdev is not None
assert raw is not None
def make_verity_sig(
- state: MkosiState, root_hash: Optional[str], for_cache: bool
+ state: MkosiState, root_hash: Optional[str]
) -> Tuple[Optional[BinaryIO], Optional[bytes], Optional[str]]:
if state.do_run_build_script or state.config.verity != "signed":
return None, None, None
- if for_cache:
+ if state.for_cache:
return None, None, None
assert root_hash is not None
verity_sig: Optional[BinaryIO],
root_hash: Optional[str],
fingerprint: Optional[str],
- for_cache: bool,
) -> Optional[Partition]:
if verity_sig is None:
return None
- if for_cache:
+ if state.for_cache:
return None
assert loopdev is not None
assert raw is not None
part_uuid=u)
-def patch_root_uuid(
- state: MkosiState, loopdev: Optional[Path], root_hash: Optional[str], for_cache: bool
-) -> None:
+def patch_root_uuid(state: MkosiState, loopdev: Optional[Path], root_hash: Optional[str]) -> None:
if root_hash is None:
return
assert loopdev is not None
- if for_cache:
+ if state.for_cache:
return
# Use the first 128bit of the root hash as partition UUID of the root partition
print('Root partition-type UUID:', u)
-def extract_partition(
- state: MkosiState, dev: Optional[Path], for_cache: bool
-) -> Optional[BinaryIO]:
-
- if state.do_run_build_script or for_cache or not state.config.split_artifacts:
+def extract_partition(state: MkosiState, dev: Optional[Path]) -> Optional[BinaryIO]:
+ if state.do_run_build_script or state.for_cache or not state.config.split_artifacts:
return None
assert dev is not None
def install_unified_kernel(
state: MkosiState,
root_hash: Optional[str],
- for_cache: bool,
- cached: bool,
mount: Callable[[], ContextManager[None]],
) -> None:
# Iterates through all kernel versions included in the image and generates a combined
# in the initrd, and not one from the cache. Hence even though
# dracut is slow we invoke it only during the last final build,
# never for the cached builds.
- if for_cache:
+ if state.for_cache:
return
# Don't bother running dracut if this is a development build. Strictly speaking it would probably be a
def secure_boot_sign(
state: MkosiState,
directory: Path,
- for_cache: bool,
cached: bool,
mount: Callable[[], ContextManager[None]],
replace: bool = False,
return
if not state.config.secure_boot:
return
- if for_cache and state.config.verity:
+ if state.for_cache and state.config.verity:
return
if cached and state.config.verity is False:
return
def extract_unified_kernel(
state: MkosiState,
- for_cache: bool,
mount: Callable[[], ContextManager[None]],
) -> Optional[BinaryIO]:
- if state.do_run_build_script or for_cache or not state.config.split_artifacts or not state.config.bootable:
+ if state.do_run_build_script or state.for_cache or not state.config.split_artifacts or not state.config.bootable:
return None
with mount():
def extract_kernel_image_initrd(
state: MkosiState,
- for_cache: bool,
mount: Callable[[], ContextManager[None]],
) -> Union[Tuple[BinaryIO, BinaryIO], Tuple[None, None]]:
- if state.do_run_build_script or for_cache or not state.config.bootable:
+ if state.do_run_build_script or state.for_cache or not state.config.bootable:
return None, None
with mount():
def extract_kernel_cmdline(
state: MkosiState,
- for_cache: bool,
mount: Callable[[], ContextManager[None]],
) -> Optional[TextIO]:
- if state.do_run_build_script or for_cache or not state.config.bootable:
+ if state.do_run_build_script or state.for_cache or not state.config.bootable:
return None
with mount():
assert artifact is None
- make_read_only(state.config, state.root, for_cache=False, b=False)
+ make_read_only(state, state.root, b=False)
os.rename(state.root, state.config.output)
- make_read_only(state.config, state.config.output, for_cache=False, b=True)
+ make_read_only(state, state.config.output, b=True)
elif state.config.output_format.is_disk() or state.config.output_format in (
OutputFormat.plain_squashfs,
MkosiPrinter.info(" Netdev: " + yes_no(config.netdev))
-def reuse_cache_tree(
- state: MkosiState, for_cache: bool, cached: bool
-) -> bool:
+def reuse_cache_tree(state: MkosiState, cached: bool) -> bool:
"""If there's a cached version of this tree around, use it and
initialize our new root directly from it. Returns a boolean indicating
whether we are now operating on a cached version or not."""
if not state.config.incremental:
return False
- if for_cache:
+ if state.for_cache:
return False
if state.config.output_format.is_disk_rw():
return False
config.build_dir.mkdir(mode=0o755, exist_ok=True)
-def setup_ssh(state: MkosiState, for_cache: bool, cached: bool) -> Optional[TextIO]:
+def setup_ssh(state: MkosiState, cached: bool) -> Optional[TextIO]:
if state.do_run_build_script or not state.config.ssh:
return None
if not cached:
run(["systemctl", "--root", state.root, "enable", unit])
- if for_cache:
+ if state.for_cache:
return None
authorized_keys = root_home(state) / ".ssh/authorized_keys"
return Path(prefix) / state.machine_id / kver
-def run_kernel_install(state: MkosiState, for_cache: bool, cached: bool) -> None:
+def run_kernel_install(state: MkosiState, cached: bool) -> None:
if not state.config.bootable or state.do_run_build_script:
return
- if not state.config.cache_initrd and for_cache:
+ if not state.config.cache_initrd and state.for_cache:
return
if state.config.cache_initrd and cached:
state: MkosiState,
*,
manifest: Optional[Manifest] = None,
- for_cache: bool = False,
cleanup: bool = False,
) -> BuildOutput:
# If there's no build script set, there's no point in executing
make_build_dir(state.config)
- raw, cached = reuse_cache_image(state, for_cache)
- if for_cache and cached:
+ raw, cached = reuse_cache_image(state)
+ if state.for_cache and cached:
# Found existing cache image, exiting build_image
return BuildOutput.empty()
assert raw is not None
refresh_partition_table(state, raw)
else:
- raw = create_image(state, for_cache)
+ raw = create_image(state)
with attach_base_image(state.config.base_image, state.partition_table) as base_image, \
attach_image_loopback(raw, state.partition_table) as loopdev, \
with mount_image(state, cached, base_image, loopdev, encrypted.without_generated_root(state.config)):
prepare_tree(state, cached)
- cached_tree = reuse_cache_tree(state, for_cache, cached)
+ cached_tree = reuse_cache_tree(state, cached)
install_skeleton_trees(state, cached_tree)
install_distribution(state, cached_tree)
install_etc_locale(state.root, cached_tree)
install_etc_hostname(state, cached_tree)
run_prepare_script(state, cached_tree)
- install_build_src(state, for_cache)
- install_build_dest(state, for_cache)
- install_extra_trees(state, for_cache)
+ install_build_src(state)
+ install_build_dest(state)
+ install_extra_trees(state)
configure_dracut(state, cached_tree)
- run_kernel_install(state, for_cache, cached_tree)
+ run_kernel_install(state, cached_tree)
install_boot_loader(state, loopdev, cached_tree)
set_root_password(state, cached_tree)
set_serial_terminal(state, cached_tree)
set_autologin(state, cached_tree)
- sshkey = setup_ssh(state, for_cache, cached_tree)
+ sshkey = setup_ssh(state, cached_tree)
setup_netdev(state, cached_tree)
- run_postinst_script(state, loopdev, for_cache)
+ run_postinst_script(state)
# Sign systemd-boot / sd-boot EFI binaries
- secure_boot_sign(state, state.root / 'usr/lib/systemd/boot/efi', for_cache, cached,
+ secure_boot_sign(state, state.root / 'usr/lib/systemd/boot/efi', cached,
mount=contextlib.nullcontext)
if cleanup:
if cleanup:
clean_package_manager_metadata(state)
remove_files(state)
- reset_machine_id(state, for_cache)
+ reset_machine_id(state)
reset_random_seed(state.root)
- run_finalize_script(state, for_cache)
- invoke_fstrim(state, for_cache)
- make_read_only(state.config, state.root, for_cache)
+ run_finalize_script(state)
+ invoke_fstrim(state)
+ make_read_only(state, state.root)
- generated_root = make_generated_root(state, for_cache)
- generated_root_part = insert_generated_root(state, raw, loopdev, generated_root, for_cache)
+ generated_root = make_generated_root(state)
+ generated_root_part = insert_generated_root(state, raw, loopdev, generated_root)
split_root = (
- (generated_root or extract_partition(state, encrypted.root, for_cache))
+ (generated_root or extract_partition(state, encrypted.root))
if state.config.split_artifacts
else None
)
else:
root_for_verity = None
- verity, root_hash = make_verity(state, root_for_verity, for_cache)
+ verity, root_hash = make_verity(state, root_for_verity)
- patch_root_uuid(state, loopdev, root_hash, for_cache)
+ patch_root_uuid(state, loopdev, root_hash)
- insert_verity(state, raw, loopdev, verity, root_hash, for_cache)
+ insert_verity(state, raw, loopdev, verity, root_hash)
split_verity = verity if state.config.split_artifacts else None
- verity_sig, root_hash_p7s, fingerprint = make_verity_sig(state, root_hash, for_cache)
- insert_verity_sig(state, raw, loopdev, verity_sig, root_hash, fingerprint, for_cache)
+ verity_sig, root_hash_p7s, fingerprint = make_verity_sig(state, root_hash)
+ insert_verity_sig(state, raw, loopdev, verity_sig, root_hash, fingerprint)
split_verity_sig = verity_sig if state.config.split_artifacts else None
# This time we mount read-only, as we already generated
encrypted.without_generated_root(state.config),
root_read_only=True)
- install_unified_kernel(state, root_hash, for_cache, cached, mount)
+ install_unified_kernel(state, root_hash, mount)
# Sign EFI binaries under these directories within the ESP
for esp_dir in ['efi/EFI/BOOT', 'efi/EFI/systemd', 'efi/EFI/Linux']:
- secure_boot_sign(state, state.root / esp_dir, for_cache, cached, mount, replace=True)
+ secure_boot_sign(state, state.root / esp_dir, cached, mount, replace=True)
split_kernel = (
- extract_unified_kernel(state, for_cache, mount)
+ extract_unified_kernel(state, mount)
if state.config.split_artifacts
else None
)
- split_kernel_image, split_initrd = extract_kernel_image_initrd(state, for_cache, mount)
- split_kernel_cmdline = extract_kernel_cmdline(state, for_cache, mount)
+ split_kernel_image, split_initrd = extract_kernel_image_initrd(state, mount)
+ split_kernel_cmdline = extract_kernel_cmdline(state, mount)
- archive = make_tar(state, for_cache) or \
- make_cpio(state, for_cache)
+ archive = make_tar(state) or make_cpio(state)
return BuildOutput(
raw or generated_root,
cache=cache,
do_run_build_script=False,
machine_id=config.machine_id or uuid.uuid4().hex,
+ for_cache=False,
)
# If caching is requested, then make sure we have cache images around we can make use of
if config.build_script:
with complete_step("Running first (development) stage to generate cached copy…"):
# Generate the cache version of the build image, and store it as "cache-pre-dev"
- state.do_run_build_script = True
- image = build_image(state, for_cache=True)
+ state = dataclasses.replace(state, do_run_build_script=True, for_cache=True)
+ image = build_image(state)
save_cache(state, image.raw_name(), state.cache_pre_dev)
remove_artifacts(state, image.raw, image.archive)
with complete_step("Running second (final) stage to generate cached copy…"):
# Generate the cache version of the build image, and store it as "cache-pre-inst"
- state.do_run_build_script = False
- image = build_image(state, for_cache=True)
+ state = dataclasses.replace(state, do_run_build_script=False, for_cache=True)
+ image = build_image(state)
save_cache(state, image.raw_name(), state.cache_pre_inst)
remove_artifacts(state, image.raw, image.archive)
if config.build_script:
with complete_step("Running first (development) stage…"):
# Run the image builder for the first (development) stage in preparation for the build script
- state.do_run_build_script = True
+ state = dataclasses.replace(state, do_run_build_script=True, for_cache=False)
image = build_image(state)
run_build_script(state, image.raw)
# Run the image builder for the second (final) stage
if not config.skip_final_phase:
with complete_step("Running second (final) stage…"):
- state.do_run_build_script = False
+ state = dataclasses.replace(state, do_run_build_script=False, for_cache=False)
image = build_image(state, manifest=manifest, cleanup=True)
else:
MkosiPrinter.print_step("Skipping (second) final image build phase.")