From: Daan De Meyer Date: Fri, 21 Apr 2023 20:38:55 +0000 (+0200) Subject: Move MkosiConfig and load_args() and config.py X-Git-Tag: v15~207^2~3 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=8ebb32721abaaa10947814377a0526719cc005c9;p=thirdparty%2Fmkosi.git Move MkosiConfig and load_args() and config.py And all the other related changes to make that work. --- diff --git a/mkosi/__init__.py b/mkosi/__init__.py index 6262725dd..0899115aa 100644 --- a/mkosi/__init__.py +++ b/mkosi/__init__.py @@ -13,7 +13,6 @@ import itertools import json import logging import os -import platform import resource import shutil import subprocess @@ -25,9 +24,10 @@ from pathlib import Path from textwrap import dedent from typing import Callable, ContextManager, Optional, TextIO, TypeVar, Union, cast +from mkosi.config import GenericVersion, MkosiConfig, machine_name from mkosi.install import add_dropin_config_from_resource, copy_path, flock from mkosi.log import ARG_DEBUG, Style, color_error, complete_step, die, log_step -from mkosi.manifest import GenericVersion, Manifest +from mkosi.manifest import Manifest from mkosi.mounts import dissect_and_mount, mount_overlay, scandir_recursive from mkosi.pager import page from mkosi.remove import unlink_try_hard @@ -45,34 +45,24 @@ from mkosi.util import ( Compression, Distribution, ManifestFormat, - MkosiConfig, OutputFormat, Verb, current_user, flatten, format_rlimit, - is_dnf_distribution, patch_file, + prepend_to_environ_path, set_umask, tmp_dir, ) MKOSI_COMMANDS_NEED_BUILD = (Verb.shell, Verb.boot, Verb.qemu, Verb.serve) MKOSI_COMMANDS_SUDO = (Verb.shell, Verb.boot) -MKOSI_COMMANDS_CMDLINE = (Verb.build, Verb.shell, Verb.boot, Verb.qemu, Verb.ssh) T = TypeVar("T") -def list_to_string(seq: Iterator[str]) -> str: - """Print contents of a list to a comma-separated string - - ['a', "b", 11] → "'a', 'b', 11" - """ - return str(list(seq))[1:-1] - - # EFI has its own conventions too EFI_ARCHITECTURES = { "x86_64": "x64", @@ -1032,250 +1022,6 @@ def unlink_output(config: MkosiConfig) -> None: empty_directory(config.cache_dir) -def require_private_file(name: Path, description: str) -> None: - mode = os.stat(name).st_mode & 0o777 - if mode & 0o007: - logging.warning(dedent(f"""\ - Permissions of '{name}' of '{mode:04o}' are too open. - When creating {description} files use an access mode that restricts access to the owner only. - """)) - - -def find_password(args: argparse.Namespace) -> None: - if args.password is not None: - return - - try: - pwfile = Path("mkosi.rootpw") - require_private_file(pwfile, "root password") - - args.password = pwfile.read_text().strip() - - except FileNotFoundError: - pass - - -def find_image_version(args: argparse.Namespace) -> None: - if args.image_version is not None: - return - - try: - with open("mkosi.version") as f: - args.image_version = f.read().strip() - except FileNotFoundError: - pass - - -def load_credentials(args: argparse.Namespace) -> dict[str, str]: - creds = {} - - d = Path("mkosi.credentials") - if d.is_dir(): - for e in d.iterdir(): - if os.access(e, os.X_OK): - creds[e.name] = run([e], text=True, stdout=subprocess.PIPE, env=os.environ).stdout - else: - creds[e.name] = e.read_text() - - for s in args.credentials: - key, _, value = s.partition("=") - creds[key] = value - - if "firstboot.timezone" not in creds: - tz = run( - ["timedatectl", "show", "-p", "Timezone", "--value"], - text=True, - stdout=subprocess.PIPE, - ).stdout.strip() - creds["firstboot.timezone"] = tz - - if "firstboot.locale" not in creds: - creds["firstboot.locale"] = "C.UTF-8" - - if "firstboot.hostname" not in creds: - creds["firstboot.hostname"] = machine_name(args) - - if args.ssh and "ssh.authorized_keys.root" not in creds and "SSH_AUTH_SOCK" in os.environ: - key = run( - ["ssh-add", "-L"], - text=True, - stdout=subprocess.PIPE, - env=os.environ, - ).stdout.strip() - creds["ssh.authorized_keys.root"] = key - - return creds - - -def load_kernel_command_line_extra(args: argparse.Namespace) -> list[str]: - columns, lines = shutil.get_terminal_size() - - cmdline = [ - f"systemd.tty.term.hvc0={os.getenv('TERM', 'vt220')}", - f"systemd.tty.columns.hvc0={columns}", - f"systemd.tty.rows.hvc0={lines}", - f"systemd.tty.term.ttyS0={os.getenv('TERM', 'vt220')}", - f"systemd.tty.columns.ttyS0={columns}", - f"systemd.tty.rows.ttyS0={lines}", - "console=hvc0", - ] - - if args.output_format == OutputFormat.cpio: - cmdline += ["rd.systemd.unit=default.target"] - - for s in args.kernel_command_line_extra: - key, sep, value = s.partition("=") - if " " in value: - value = f'"{value}"' - cmdline += [key if not sep else f"{key}={value}"] - - return cmdline - - -def load_args(args: argparse.Namespace) -> MkosiConfig: - ARG_DEBUG.update(args.debug) - - find_image_version(args) - - if args.cmdline and args.verb not in MKOSI_COMMANDS_CMDLINE: - die(f"Parameters after verb are only accepted for {list_to_string(verb.name for verb in MKOSI_COMMANDS_CMDLINE)}.") - - if args.verb == Verb.qemu and args.output_format in ( - OutputFormat.directory, - OutputFormat.subvolume, - OutputFormat.tar, - ): - die("Directory, subvolume, tar, cpio, and plain squashfs images cannot be booted in qemu.") - - if shutil.which("bsdtar") and args.distribution == Distribution.openmandriva and args.tar_strip_selinux_context: - die("Sorry, bsdtar on OpenMandriva is incompatible with --tar-strip-selinux-context") - - if args.cache_dir: - args.cache_dir = args.cache_dir / f"{args.distribution}~{args.release}" - if args.build_dir: - args.build_dir = args.build_dir / f"{args.distribution}~{args.release}" - if args.output_dir: - args.output_dir = args.output_dir / f"{args.distribution}~{args.release}" - - if args.mirror is None: - if args.distribution in (Distribution.fedora, Distribution.centos): - args.mirror = None - elif args.distribution == Distribution.debian: - args.mirror = "http://deb.debian.org/debian" - elif args.distribution == Distribution.ubuntu: - if args.architecture == "x86" or args.architecture == "x86_64": - args.mirror = "http://archive.ubuntu.com/ubuntu" - else: - args.mirror = "http://ports.ubuntu.com" - elif args.distribution == Distribution.arch: - if args.architecture == "aarch64": - args.mirror = "http://mirror.archlinuxarm.org" - else: - args.mirror = "https://geo.mirror.pkgbuild.com" - elif args.distribution == Distribution.opensuse: - args.mirror = "https://download.opensuse.org" - elif args.distribution == Distribution.rocky: - args.mirror = None - elif args.distribution == Distribution.alma: - args.mirror = None - - if args.sign: - args.checksum = True - - if args.compress_output is None: - args.compress_output = Compression.zst if args.output_format == OutputFormat.cpio else Compression.none - - if args.output is None: - iid = args.image_id if args.image_id is not None else "image" - prefix = f"{iid}_{args.image_version}" if args.image_version is not None else iid - - if args.output_format == OutputFormat.disk: - output = f"{prefix}.raw" - elif args.output_format == OutputFormat.tar: - output = f"{prefix}.tar" - elif args.output_format == OutputFormat.cpio: - output = f"{prefix}.cpio" - else: - output = prefix - args.output = Path(output) - - if args.output_dir is not None: - if "/" not in str(args.output): - args.output = args.output_dir / args.output - else: - logging.warning("Ignoring configured output directory as output file is a qualified path.") - - args.output = args.output.absolute() - - if args.environment: - env = {} - for s in args.environment: - key, sep, value = s.partition("=") - value = value if sep else os.getenv(key, "") - env[key] = value - args.environment = env - else: - args.environment = {} - - args.credentials = load_credentials(args) - args.kernel_command_line_extra = load_kernel_command_line_extra(args) - - if args.secure_boot and args.verb != Verb.genkey: - if args.secure_boot_key is None: - die("UEFI SecureBoot enabled, but couldn't find private key.", - hint="Consider placing it in mkosi.secure-boot.key") - - if args.secure_boot_certificate is None: - die("UEFI SecureBoot enabled, but couldn't find certificate.", - hint="Consider placing it in mkosi.secure-boot.crt") - - if args.sign_expected_pcr is True and not shutil.which("systemd-measure"): - die("Couldn't find systemd-measure needed for the --sign-expected-pcr option.") - - if args.sign_expected_pcr is None: - args.sign_expected_pcr = bool(shutil.which("systemd-measure")) - - # Resolve passwords late so we can accurately determine whether a build is needed - find_password(args) - - if args.verb in (Verb.shell, Verb.boot): - opname = "acquire shell" if args.verb == Verb.shell else "boot" - if args.output_format in (OutputFormat.tar, OutputFormat.cpio): - die(f"Sorry, can't {opname} with a {args.output_format} archive.") - if args.compress_output != Compression.none: - die(f"Sorry, can't {opname} with a compressed image.") - - if args.repo_dirs and not (is_dnf_distribution(args.distribution) or args.distribution == Distribution.arch): - die("--repo-dir is only supported on DNF based distributions and Arch") - - if args.qemu_kvm is True and not qemu_check_kvm_support(): - die("Sorry, the host machine does not support KVM acceleration.") - - if args.qemu_kvm is None: - args.qemu_kvm = qemu_check_kvm_support() - - if args.repositories and not is_dnf_distribution(args.distribution) and args.distribution not in (Distribution.debian, Distribution.ubuntu): - die("Sorry, the --repositories option is only supported on DNF/Debian based distributions") - - if args.initrds: - args.initrds = [p.absolute() for p in args.initrds] - for p in args.initrds: - if not p.exists(): - die(f"Initrd {p} not found") - if not p.is_file(): - die(f"Initrd {p} is not a file") - - if args.overlay and not args.base_trees: - die("--overlay can only be used with --base-tree") - - # For unprivileged builds we need the userxattr OverlayFS mount option, which is only available in Linux v5.11 and later. - with prepend_to_environ_path(args.extra_search_paths): - if (args.build_script is not None or args.base_trees) and GenericVersion(platform.release()) < GenericVersion("5.11") and os.geteuid() != 0: - die("This unprivileged build configuration requires at least Linux v5.11") - - return MkosiConfig(**vars(args)) - - def cache_tree_paths(config: MkosiConfig) -> tuple[Path, Path]: # If the image ID is specified, use cache file names that are independent of the image versions, so that @@ -1950,10 +1696,6 @@ def check_root() -> None: die("Must be invoked as root.") -def machine_name(config: Union[MkosiConfig, argparse.Namespace]) -> str: - return config.image_id or config.output.with_suffix("").name.partition("_")[0] - - def machine_cid(config: MkosiConfig) -> int: cid = int.from_bytes(hashlib.sha256(machine_name(config).encode()).digest()[:4], byteorder='little') # Make sure we don't return any of the well-known CIDs. @@ -2129,18 +1871,6 @@ def find_ovmf_vars(config: MkosiConfig) -> Path: die("Couldn't find OVMF UEFI variables file.") -def qemu_check_kvm_support() -> bool: - kvm = Path("/dev/kvm") - if not kvm.is_char_device(): - return False - # some CI runners may present a non-working KVM device - try: - with kvm.open("r+b"): - return True - except OSError: - return False - - @contextlib.contextmanager def start_swtpm() -> Iterator[Optional[Path]]: @@ -2373,27 +2103,6 @@ def bump_image_version(config: MkosiConfig) -> None: Path("mkosi.version").write_text(new_version + "\n") -@contextlib.contextmanager -def prepend_to_environ_path(paths: Sequence[Path]) -> Iterator[None]: - if not paths: - yield - return - - with tempfile.TemporaryDirectory(prefix="mkosi.path", dir=tmp_dir()) as d: - - for path in paths: - if not path.is_dir(): - Path(d).joinpath(path.name).symlink_to(path.absolute()) - - paths = [Path(d), *paths] - - news = [os.fspath(path) for path in paths if path.is_dir()] - olds = os.getenv("PATH", "").split(":") - os.environ["PATH"] = ":".join(news + olds) - - yield - - def expand_specifier(s: str) -> str: return s.replace("%u", current_user().name) diff --git a/mkosi/__main__.py b/mkosi/__main__.py index fae05a14b..f80aa30f1 100644 --- a/mkosi/__main__.py +++ b/mkosi/__main__.py @@ -8,8 +8,8 @@ import subprocess import sys from collections.abc import Iterator -from mkosi import load_args, run_verb -from mkosi.config import MkosiConfigParser +from mkosi import run_verb +from mkosi.config import MkosiConfigParser, load_args from mkosi.log import ARG_DEBUG, die, log_setup from mkosi.run import excepthook diff --git a/mkosi/config.py b/mkosi/config.py index eba8ae329..a99d2e67c 100644 --- a/mkosi/config.py +++ b/mkosi/config.py @@ -4,17 +4,21 @@ import dataclasses import enum import fnmatch import functools +import logging import os.path import platform import shlex +import shutil +import subprocess import sys import textwrap from collections.abc import Sequence from pathlib import Path from typing import Any, Callable, Optional, Type, Union, cast -from mkosi.log import Style, die +from mkosi.log import ARG_DEBUG, Style, die from mkosi.pager import page +from mkosi.run import run from mkosi.util import ( Compression, Distribution, @@ -25,10 +29,14 @@ from mkosi.util import ( current_user, detect_distribution, flatten, + is_dnf_distribution, + prepend_to_environ_path, + qemu_check_kvm_support, ) __version__ = "14" +MKOSI_COMMANDS_CMDLINE = (Verb.build, Verb.shell, Verb.boot, Verb.qemu, Verb.ssh) ConfigParseCallback = Callable[[str, Optional[str], argparse.Namespace], Any] ConfigMatchCallback = Callable[[str, str, argparse.Namespace], bool] @@ -1463,3 +1471,441 @@ class MkosiConfigParser: setattr(namespace, s.dest, default) return namespace + + + +@functools.total_ordering +class GenericVersion: + def __init__(self, version: str): + self._version = version + + def __eq__(self, other: object) -> bool: + if not isinstance(other, GenericVersion): + return False + cmd = ["systemd-analyze", "compare-versions", self._version, "eq", other._version] + return run(cmd, check=False).returncode == 0 + + def __lt__(self, other: object) -> bool: + if not isinstance(other, GenericVersion): + return False + cmd = ["systemd-analyze", "compare-versions", self._version, "lt", other._version] + return run(cmd, check=False).returncode == 0 + + + +@dataclasses.dataclass(frozen=True) +class MkosiConfig: + """Type-hinted storage for command line arguments. + + Only user configuration is stored here while dynamic state exists in + MkosiState. If a field of the same name exists in both classes always + access the value from state. + """ + + verb: Verb + cmdline: list[str] + force: int + + distribution: Distribution + release: str + mirror: Optional[str] + local_mirror: Optional[str] + repository_key_check: bool + repositories: list[str] + repo_dirs: list[Path] + repart_dirs: list[Path] + overlay: bool + architecture: str + output_format: OutputFormat + manifest_format: list[ManifestFormat] + output: Path + output_dir: Optional[Path] + kernel_command_line: list[str] + secure_boot: bool + secure_boot_key: Optional[Path] + secure_boot_certificate: Optional[Path] + secure_boot_valid_days: str + secure_boot_common_name: str + sign_expected_pcr: bool + compress_output: Compression + image_version: Optional[str] + image_id: Optional[str] + tar_strip_selinux_context: bool + incremental: bool + cache_initrd: bool + packages: list[str] + remove_packages: list[str] + with_docs: bool + with_tests: bool + cache_dir: Optional[Path] + base_trees: list[Path] + extra_trees: list[tuple[Path, Optional[Path]]] + skeleton_trees: list[tuple[Path, Optional[Path]]] + clean_package_metadata: Optional[bool] + remove_files: list[str] + environment: dict[str, str] + build_sources: Path + build_dir: Optional[Path] + install_dir: Optional[Path] + build_packages: list[str] + build_script: Optional[Path] + prepare_script: Optional[Path] + postinst_script: Optional[Path] + finalize_script: Optional[Path] + with_network: bool + cache_only: bool + nspawn_settings: Optional[Path] + checksum: bool + split_artifacts: bool + sign: bool + key: Optional[str] + password: Optional[str] + password_is_hashed: bool + autologin: bool + extra_search_paths: list[Path] + ephemeral: bool + ssh: bool + credentials: dict[str, str] + directory: Optional[Path] + debug: list[str] + auto_bump: bool + workspace_dir: Optional[Path] + initrds: list[Path] + make_initrd: bool + kernel_command_line_extra: list[str] + acl: bool + pager: bool + bootable: Optional[bool] + + # QEMU-specific options + qemu_gui: bool + qemu_smp: str + qemu_mem: str + qemu_kvm: bool + qemu_args: Sequence[str] + + passphrase: Optional[Path] + + def architecture_is_native(self) -> bool: + return self.architecture == platform.machine() + + @property + def output_split_uki(self) -> Path: + return build_auxiliary_output_path(self, ".efi") + + @property + def output_split_kernel(self) -> Path: + return build_auxiliary_output_path(self, ".vmlinuz") + + @property + def output_nspawn_settings(self) -> Path: + return build_auxiliary_output_path(self, ".nspawn") + + @property + def output_checksum(self) -> Path: + return Path("SHA256SUMS") + + @property + def output_signature(self) -> Path: + return Path("SHA256SUMS.gpg") + + @property + def output_sshkey(self) -> Path: + return build_auxiliary_output_path(self, ".ssh") + + @property + def output_manifest(self) -> Path: + return build_auxiliary_output_path(self, ".manifest") + + @property + def output_changelog(self) -> Path: + return build_auxiliary_output_path(self, ".changelog") + + @property + def output_compressed(self) -> Path: + if self.compress_output == Compression.none: + return self.output + + return self.output.parent / f"{self.output.name}.{self.compress_output.name}" + + def output_paths(self) -> tuple[Path, ...]: + return ( + self.output, + self.output_split_uki, + self.output_split_kernel, + self.output_nspawn_settings, + self.output_checksum, + self.output_signature, + self.output_sshkey, + self.output_manifest, + self.output_changelog, + ) + + + +def strip_suffixes(path: Path) -> Path: + while path.suffix in { + ".xz", + ".zstd", + ".raw", + ".tar", + ".cpio", + }: + path = path.with_suffix("") + + return path + + +def build_auxiliary_output_path(args: Union[argparse.Namespace, MkosiConfig], suffix: str) -> Path: + output = strip_suffixes(args.output) + return output.with_name(f"{output.name}{suffix}") + + +def find_image_version(args: argparse.Namespace) -> None: + if args.image_version is not None: + return + + try: + with open("mkosi.version") as f: + args.image_version = f.read().strip() + except FileNotFoundError: + pass + + +def require_private_file(name: Path, description: str) -> None: + mode = os.stat(name).st_mode & 0o777 + if mode & 0o007: + logging.warning(textwrap.dedent(f"""\ + Permissions of '{name}' of '{mode:04o}' are too open. + When creating {description} files use an access mode that restricts access to the owner only. + """)) + + +def find_password(args: argparse.Namespace) -> None: + if args.password is not None: + return + + try: + pwfile = Path("mkosi.rootpw") + require_private_file(pwfile, "root password") + + args.password = pwfile.read_text().strip() + + except FileNotFoundError: + pass + + + +def machine_name(config: Union[MkosiConfig, argparse.Namespace]) -> str: + return config.image_id or config.output.with_suffix("").name.partition("_")[0] + + +def load_credentials(args: argparse.Namespace) -> dict[str, str]: + creds = {} + + d = Path("mkosi.credentials") + if d.is_dir(): + for e in d.iterdir(): + if os.access(e, os.X_OK): + creds[e.name] = run([e], text=True, stdout=subprocess.PIPE, env=os.environ).stdout + else: + creds[e.name] = e.read_text() + + for s in args.credentials: + key, _, value = s.partition("=") + creds[key] = value + + if "firstboot.timezone" not in creds: + tz = run( + ["timedatectl", "show", "-p", "Timezone", "--value"], + text=True, + stdout=subprocess.PIPE, + ).stdout.strip() + creds["firstboot.timezone"] = tz + + if "firstboot.locale" not in creds: + creds["firstboot.locale"] = "C.UTF-8" + + if "firstboot.hostname" not in creds: + creds["firstboot.hostname"] = machine_name(args) + + if args.ssh and "ssh.authorized_keys.root" not in creds and "SSH_AUTH_SOCK" in os.environ: + key = run( + ["ssh-add", "-L"], + text=True, + stdout=subprocess.PIPE, + env=os.environ, + ).stdout.strip() + creds["ssh.authorized_keys.root"] = key + + return creds + + +def load_kernel_command_line_extra(args: argparse.Namespace) -> list[str]: + columns, lines = shutil.get_terminal_size() + + cmdline = [ + f"systemd.tty.term.hvc0={os.getenv('TERM', 'vt220')}", + f"systemd.tty.columns.hvc0={columns}", + f"systemd.tty.rows.hvc0={lines}", + f"systemd.tty.term.ttyS0={os.getenv('TERM', 'vt220')}", + f"systemd.tty.columns.ttyS0={columns}", + f"systemd.tty.rows.ttyS0={lines}", + "console=hvc0", + ] + + if args.output_format == OutputFormat.cpio: + cmdline += ["rd.systemd.unit=default.target"] + + for s in args.kernel_command_line_extra: + key, sep, value = s.partition("=") + if " " in value: + value = f'"{value}"' + cmdline += [key if not sep else f"{key}={value}"] + + return cmdline + + +def load_args(args: argparse.Namespace) -> MkosiConfig: + ARG_DEBUG.update(args.debug) + + find_image_version(args) + + if args.cmdline and args.verb not in MKOSI_COMMANDS_CMDLINE: + die(f"Parameters after verb are only accepted for {' '.join(verb.name for verb in MKOSI_COMMANDS_CMDLINE)}.") + + if args.verb == Verb.qemu and args.output_format in ( + OutputFormat.directory, + OutputFormat.subvolume, + OutputFormat.tar, + ): + die("Directory, subvolume, tar, cpio, and plain squashfs images cannot be booted in qemu.") + + if shutil.which("bsdtar") and args.distribution == Distribution.openmandriva and args.tar_strip_selinux_context: + die("Sorry, bsdtar on OpenMandriva is incompatible with --tar-strip-selinux-context") + + if args.cache_dir: + args.cache_dir = args.cache_dir / f"{args.distribution}~{args.release}" + if args.build_dir: + args.build_dir = args.build_dir / f"{args.distribution}~{args.release}" + if args.output_dir: + args.output_dir = args.output_dir / f"{args.distribution}~{args.release}" + + if args.mirror is None: + if args.distribution in (Distribution.fedora, Distribution.centos): + args.mirror = None + elif args.distribution == Distribution.debian: + args.mirror = "http://deb.debian.org/debian" + elif args.distribution == Distribution.ubuntu: + if args.architecture == "x86" or args.architecture == "x86_64": + args.mirror = "http://archive.ubuntu.com/ubuntu" + else: + args.mirror = "http://ports.ubuntu.com" + elif args.distribution == Distribution.arch: + if args.architecture == "aarch64": + args.mirror = "http://mirror.archlinuxarm.org" + else: + args.mirror = "https://geo.mirror.pkgbuild.com" + elif args.distribution == Distribution.opensuse: + args.mirror = "https://download.opensuse.org" + elif args.distribution == Distribution.rocky: + args.mirror = None + elif args.distribution == Distribution.alma: + args.mirror = None + + if args.sign: + args.checksum = True + + if args.compress_output is None: + args.compress_output = Compression.zst if args.output_format == OutputFormat.cpio else Compression.none + + if args.output is None: + iid = args.image_id if args.image_id is not None else "image" + prefix = f"{iid}_{args.image_version}" if args.image_version is not None else iid + + if args.output_format == OutputFormat.disk: + output = f"{prefix}.raw" + elif args.output_format == OutputFormat.tar: + output = f"{prefix}.tar" + elif args.output_format == OutputFormat.cpio: + output = f"{prefix}.cpio" + else: + output = prefix + args.output = Path(output) + + if args.output_dir is not None: + if "/" not in str(args.output): + args.output = args.output_dir / args.output + else: + logging.warning("Ignoring configured output directory as output file is a qualified path.") + + args.output = args.output.absolute() + + if args.environment: + env = {} + for s in args.environment: + key, sep, value = s.partition("=") + value = value if sep else os.getenv(key, "") + env[key] = value + args.environment = env + else: + args.environment = {} + + args.credentials = load_credentials(args) + args.kernel_command_line_extra = load_kernel_command_line_extra(args) + + if args.secure_boot and args.verb != Verb.genkey: + if args.secure_boot_key is None: + die("UEFI SecureBoot enabled, but couldn't find private key.", + hint="Consider placing it in mkosi.secure-boot.key") + + if args.secure_boot_certificate is None: + die("UEFI SecureBoot enabled, but couldn't find certificate.", + hint="Consider placing it in mkosi.secure-boot.crt") + + if args.sign_expected_pcr is True and not shutil.which("systemd-measure"): + die("Couldn't find systemd-measure needed for the --sign-expected-pcr option.") + + if args.sign_expected_pcr is None: + args.sign_expected_pcr = bool(shutil.which("systemd-measure")) + + # Resolve passwords late so we can accurately determine whether a build is needed + find_password(args) + + if args.verb in (Verb.shell, Verb.boot): + opname = "acquire shell" if args.verb == Verb.shell else "boot" + if args.output_format in (OutputFormat.tar, OutputFormat.cpio): + die(f"Sorry, can't {opname} with a {args.output_format} archive.") + if args.compress_output != Compression.none: + die(f"Sorry, can't {opname} with a compressed image.") + + if args.repo_dirs and not (is_dnf_distribution(args.distribution) or args.distribution == Distribution.arch): + die("--repo-dir is only supported on DNF based distributions and Arch") + + if args.qemu_kvm is True and not qemu_check_kvm_support(): + die("Sorry, the host machine does not support KVM acceleration.") + + if args.qemu_kvm is None: + args.qemu_kvm = qemu_check_kvm_support() + + if args.repositories and not is_dnf_distribution(args.distribution) and args.distribution not in (Distribution.debian, Distribution.ubuntu): + die("Sorry, the --repositories option is only supported on DNF/Debian based distributions") + + if args.initrds: + args.initrds = [p.absolute() for p in args.initrds] + for p in args.initrds: + if not p.exists(): + die(f"Initrd {p} not found") + if not p.is_file(): + die(f"Initrd {p} is not a file") + + if args.overlay and not args.base_trees: + die("--overlay can only be used with --base-tree") + + # For unprivileged builds we need the userxattr OverlayFS mount option, which is only available in Linux v5.11 and later. + with prepend_to_environ_path(args.extra_search_paths): + if (args.build_script is not None or args.base_trees) and GenericVersion(platform.release()) < GenericVersion("5.11") and os.geteuid() != 0: + die("This unprivileged build configuration requires at least Linux v5.11") + + return MkosiConfig(**vars(args)) + diff --git a/mkosi/distributions/centos.py b/mkosi/distributions/centos.py index 61b58eb83..6750b88c3 100644 --- a/mkosi/distributions/centos.py +++ b/mkosi/distributions/centos.py @@ -5,12 +5,13 @@ import shutil from collections.abc import Sequence from pathlib import Path +from mkosi.config import MkosiConfig from mkosi.distributions import DistributionInstaller from mkosi.distributions.fedora import Repo, invoke_dnf, setup_dnf from mkosi.log import complete_step, die from mkosi.remove import unlink_try_hard from mkosi.state import MkosiState -from mkosi.util import Distribution, MkosiConfig +from mkosi.util import Distribution def move_rpm_db(root: Path) -> None: diff --git a/mkosi/manifest.py b/mkosi/manifest.py index bda72a4d1..899170492 100644 --- a/mkosi/manifest.py +++ b/mkosi/manifest.py @@ -1,7 +1,6 @@ # SPDX-License-Identifier: LGPL-2.1+ import dataclasses -import functools import json from datetime import datetime from pathlib import Path @@ -9,8 +8,9 @@ from subprocess import DEVNULL, PIPE from textwrap import dedent from typing import IO, Any, Optional +from mkosi.config import MkosiConfig from mkosi.run import run -from mkosi.util import Distribution, ManifestFormat, MkosiConfig, PackageType +from mkosi.util import Distribution, ManifestFormat, PackageType @dataclasses.dataclass @@ -289,21 +289,3 @@ class Manifest: for package in self.source_packages.values(): print(f"\n{80*'-'}\n", file=out) out.write(package.report()) - - -@functools.total_ordering -class GenericVersion: - def __init__(self, version: str): - self._version = version - - def __eq__(self, other: object) -> bool: - if not isinstance(other, GenericVersion): - return False - cmd = ["systemd-analyze", "compare-versions", self._version, "eq", other._version] - return run(cmd, check=False).returncode == 0 - - def __lt__(self, other: object) -> bool: - if not isinstance(other, GenericVersion): - return False - cmd = ["systemd-analyze", "compare-versions", self._version, "lt", other._version] - return run(cmd, check=False).returncode == 0 diff --git a/mkosi/mounts.py b/mkosi/mounts.py index 6eef34d41..87ab0e548 100644 --- a/mkosi/mounts.py +++ b/mkosi/mounts.py @@ -9,8 +9,8 @@ from collections.abc import Iterator, Sequence from pathlib import Path from typing import Callable, Deque, Optional, TypeVar, Union, cast +from mkosi.config import GenericVersion from mkosi.log import complete_step -from mkosi.manifest import GenericVersion from mkosi.run import run from mkosi.types import PathString diff --git a/mkosi/state.py b/mkosi/state.py index fa326dad3..386317d4c 100644 --- a/mkosi/state.py +++ b/mkosi/state.py @@ -4,9 +4,9 @@ import dataclasses import importlib from pathlib import Path +from mkosi.config import MkosiConfig from mkosi.distributions import DistributionInstaller from mkosi.log import die -from mkosi.util import MkosiConfig @dataclasses.dataclass diff --git a/mkosi/util.py b/mkosi/util.py index 1a6071f84..281347e4e 100644 --- a/mkosi/util.py +++ b/mkosi/util.py @@ -1,6 +1,5 @@ # SPDX-License-Identifier: LGPL-2.1+ -import argparse import ast import contextlib import dataclasses @@ -9,16 +8,16 @@ import functools import getpass import itertools import os -import platform import pwd import re import resource import shutil import sys import tarfile +import tempfile from collections.abc import Iterable, Iterator, Sequence from pathlib import Path -from typing import Any, Callable, Optional, TypeVar, Union +from typing import Any, Callable, Optional, TypeVar from mkosi.log import die @@ -183,174 +182,6 @@ class ManifestFormat(str, enum.Enum): changelog = "changelog" # human-readable text file with package changelogs -KNOWN_SUFFIXES = { - ".xz", - ".zstd", - ".raw", - ".tar", - ".cpio", -} - - -def strip_suffixes(path: Path) -> Path: - while path.suffix in KNOWN_SUFFIXES: - path = path.with_suffix("") - return path - - -@dataclasses.dataclass(frozen=True) -class MkosiConfig: - """Type-hinted storage for command line arguments. - - Only user configuration is stored here while dynamic state exists in - MkosiState. If a field of the same name exists in both classes always - access the value from state. - """ - - verb: Verb - cmdline: list[str] - force: int - - distribution: Distribution - release: str - mirror: Optional[str] - local_mirror: Optional[str] - repository_key_check: bool - repositories: list[str] - repo_dirs: list[Path] - repart_dirs: list[Path] - overlay: bool - architecture: str - output_format: OutputFormat - manifest_format: list[ManifestFormat] - output: Path - output_dir: Optional[Path] - kernel_command_line: list[str] - secure_boot: bool - secure_boot_key: Optional[Path] - secure_boot_certificate: Optional[Path] - secure_boot_valid_days: str - secure_boot_common_name: str - sign_expected_pcr: bool - compress_output: Compression - image_version: Optional[str] - image_id: Optional[str] - tar_strip_selinux_context: bool - incremental: bool - cache_initrd: bool - packages: list[str] - remove_packages: list[str] - with_docs: bool - with_tests: bool - cache_dir: Optional[Path] - base_trees: list[Path] - extra_trees: list[tuple[Path, Optional[Path]]] - skeleton_trees: list[tuple[Path, Optional[Path]]] - clean_package_metadata: Optional[bool] - remove_files: list[str] - environment: dict[str, str] - build_sources: Path - build_dir: Optional[Path] - install_dir: Optional[Path] - build_packages: list[str] - build_script: Optional[Path] - prepare_script: Optional[Path] - postinst_script: Optional[Path] - finalize_script: Optional[Path] - with_network: bool - cache_only: bool - nspawn_settings: Optional[Path] - checksum: bool - split_artifacts: bool - sign: bool - key: Optional[str] - password: Optional[str] - password_is_hashed: bool - autologin: bool - extra_search_paths: list[Path] - ephemeral: bool - ssh: bool - credentials: dict[str, str] - directory: Optional[Path] - debug: list[str] - auto_bump: bool - workspace_dir: Optional[Path] - initrds: list[Path] - make_initrd: bool - kernel_command_line_extra: list[str] - acl: bool - pager: bool - bootable: Optional[bool] - - # QEMU-specific options - qemu_gui: bool - qemu_smp: str - qemu_mem: str - qemu_kvm: bool - qemu_args: Sequence[str] - - passphrase: Optional[Path] - - def architecture_is_native(self) -> bool: - return self.architecture == platform.machine() - - @property - def output_split_uki(self) -> Path: - return build_auxiliary_output_path(self, ".efi") - - @property - def output_split_kernel(self) -> Path: - return build_auxiliary_output_path(self, ".vmlinuz") - - @property - def output_nspawn_settings(self) -> Path: - return build_auxiliary_output_path(self, ".nspawn") - - @property - def output_checksum(self) -> Path: - return Path("SHA256SUMS") - - @property - def output_signature(self) -> Path: - return Path("SHA256SUMS.gpg") - - @property - def output_sshkey(self) -> Path: - return build_auxiliary_output_path(self, ".ssh") - - @property - def output_manifest(self) -> Path: - return build_auxiliary_output_path(self, ".manifest") - - @property - def output_changelog(self) -> Path: - return build_auxiliary_output_path(self, ".changelog") - - @property - def output_compressed(self) -> Path: - if self.compress_output == Compression.none: - return self.output - - return self.output.parent / f"{self.output.name}.{self.compress_output.name}" - - def output_paths(self) -> tuple[Path, ...]: - return ( - self.output, - self.output_split_uki, - self.output_split_kernel, - self.output_nspawn_settings, - self.output_checksum, - self.output_signature, - self.output_sshkey, - self.output_manifest, - self.output_changelog, - ) - - -def build_auxiliary_output_path(args: Union[argparse.Namespace, MkosiConfig], suffix: str) -> Path: - output = strip_suffixes(args.output) - return output.with_name(f"{output.name}{suffix}") - def format_rlimit(rlimit: int) -> str: limits = resource.getrlimit(rlimit) @@ -473,3 +304,36 @@ def chdir(directory: Path) -> Iterator[None]: yield finally: os.chdir(old) + + +@contextlib.contextmanager +def prepend_to_environ_path(paths: Sequence[Path]) -> Iterator[None]: + if not paths: + yield + return + + with tempfile.TemporaryDirectory(prefix="mkosi.path", dir=tmp_dir()) as d: + + for path in paths: + if not path.is_dir(): + Path(d).joinpath(path.name).symlink_to(path.absolute()) + + paths = [Path(d), *paths] + + news = [os.fspath(path) for path in paths if path.is_dir()] + olds = os.getenv("PATH", "").split(":") + os.environ["PATH"] = ":".join(news + olds) + + yield + + +def qemu_check_kvm_support() -> bool: + kvm = Path("/dev/kvm") + if not kvm.is_char_device(): + return False + # some CI runners may present a non-working KVM device + try: + with kvm.open("r+b"): + return True + except OSError: + return False diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 000000000..26350c360 --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,16 @@ +# SPDX-License-Identifier: LGPL-2.1+ + +from pathlib import Path + +from mkosi.config import strip_suffixes + + +def test_strip_suffixes() -> None: + assert strip_suffixes(Path("home/test.zstd")) == Path("home/test") + assert strip_suffixes(Path("home/test.xz")) == Path("home/test") + assert strip_suffixes(Path("home/test.raw")) == Path("home/test") + assert strip_suffixes(Path("home/test.tar")) == Path("home/test") + assert strip_suffixes(Path("home/test.cpio")) == Path("home/test") + assert strip_suffixes(Path("home.xz/test.xz")) == Path("home.xz/test") + assert strip_suffixes(Path("home.xz/test")) == Path("home.xz/test") + assert strip_suffixes(Path("home.xz/test.txt")) == Path("home.xz/test.txt") diff --git a/tests/test_parse_load_args.py b/tests/test_parse_load_args.py index 38bd63f9a..9a06e66ff 100644 --- a/tests/test_parse_load_args.py +++ b/tests/test_parse_load_args.py @@ -9,9 +9,8 @@ from typing import Iterator, List, Optional import pytest -import mkosi -from mkosi.util import Compression, Distribution, MkosiConfig, Verb -from mkosi.config import MkosiConfigParser +from mkosi.util import Compression, Distribution, Verb +from mkosi.config import MkosiConfigParser, MkosiConfig, load_args @contextmanager @@ -27,7 +26,7 @@ def cd_temp_dir() -> Iterator[None]: def parse(argv: Optional[List[str]] = None) -> MkosiConfig: - return mkosi.load_args(MkosiConfigParser().parse(argv)) + return load_args(MkosiConfigParser().parse(argv)) def test_parse_load_verb() -> None: diff --git a/tests/test_util.py b/tests/test_util.py index 448aa0596..614df2cc6 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -12,7 +12,6 @@ from mkosi.util import ( PackageType, safe_tar_extract, set_umask, - strip_suffixes, ) def test_distribution() -> None: @@ -59,13 +58,3 @@ def test_safe_tar_extract(tmp_path: Path) -> None: assert not (evil_target / name).exists() assert not (Path("/tmp") / name).exists() - -def test_strip_suffixes() -> None: - assert strip_suffixes(Path("home/test.zstd")) == Path("home/test") - assert strip_suffixes(Path("home/test.xz")) == Path("home/test") - assert strip_suffixes(Path("home/test.raw")) == Path("home/test") - assert strip_suffixes(Path("home/test.tar")) == Path("home/test") - assert strip_suffixes(Path("home/test.cpio")) == Path("home/test") - assert strip_suffixes(Path("home.xz/test.xz")) == Path("home.xz/test") - assert strip_suffixes(Path("home.xz/test")) == Path("home.xz/test") - assert strip_suffixes(Path("home.xz/test.txt")) == Path("home.xz/test.txt")