]> git.ipfire.org Git - thirdparty/mkosi.git/commitdiff
Move MkosiConfig and load_args() and config.py
authorDaan De Meyer <daan.j.demeyer@gmail.com>
Fri, 21 Apr 2023 20:38:55 +0000 (22:38 +0200)
committerDaan De Meyer <daan.j.demeyer@gmail.com>
Mon, 24 Apr 2023 10:44:50 +0000 (12:44 +0200)
And all the other related changes to make that work.

mkosi/__init__.py
mkosi/__main__.py
mkosi/config.py
mkosi/distributions/centos.py
mkosi/manifest.py
mkosi/mounts.py
mkosi/state.py
mkosi/util.py
tests/test_config.py [new file with mode: 0644]
tests/test_parse_load_args.py
tests/test_util.py

index 6262725dd14b3f896f9ff8dede2a6e94f39de418..0899115aacb37d411867df0ab41c4a1ba5c73325 100644 (file)
@@ -13,7 +13,6 @@ import itertools
 import json
 import logging
 import os
-import platform
 import resource
 import shutil
 import subprocess
@@ -25,9 +24,10 @@ from pathlib import Path
 from textwrap import dedent
 from typing import Callable, ContextManager, Optional, TextIO, TypeVar, Union, cast
 
+from mkosi.config import GenericVersion, MkosiConfig, machine_name
 from mkosi.install import add_dropin_config_from_resource, copy_path, flock
 from mkosi.log import ARG_DEBUG, Style, color_error, complete_step, die, log_step
-from mkosi.manifest import GenericVersion, Manifest
+from mkosi.manifest import Manifest
 from mkosi.mounts import dissect_and_mount, mount_overlay, scandir_recursive
 from mkosi.pager import page
 from mkosi.remove import unlink_try_hard
@@ -45,34 +45,24 @@ from mkosi.util import (
     Compression,
     Distribution,
     ManifestFormat,
-    MkosiConfig,
     OutputFormat,
     Verb,
     current_user,
     flatten,
     format_rlimit,
-    is_dnf_distribution,
     patch_file,
+    prepend_to_environ_path,
     set_umask,
     tmp_dir,
 )
 
 MKOSI_COMMANDS_NEED_BUILD = (Verb.shell, Verb.boot, Verb.qemu, Verb.serve)
 MKOSI_COMMANDS_SUDO = (Verb.shell, Verb.boot)
-MKOSI_COMMANDS_CMDLINE = (Verb.build, Verb.shell, Verb.boot, Verb.qemu, Verb.ssh)
 
 
 T = TypeVar("T")
 
 
-def list_to_string(seq: Iterator[str]) -> str:
-    """Print contents of a list to a comma-separated string
-
-    ['a', "b", 11] → "'a', 'b', 11"
-    """
-    return str(list(seq))[1:-1]
-
-
 # EFI has its own conventions too
 EFI_ARCHITECTURES = {
     "x86_64": "x64",
@@ -1032,250 +1022,6 @@ def unlink_output(config: MkosiConfig) -> None:
                 empty_directory(config.cache_dir)
 
 
-def require_private_file(name: Path, description: str) -> None:
-    mode = os.stat(name).st_mode & 0o777
-    if mode & 0o007:
-        logging.warning(dedent(f"""\
-            Permissions of '{name}' of '{mode:04o}' are too open.
-            When creating {description} files use an access mode that restricts access to the owner only.
-        """))
-
-
-def find_password(args: argparse.Namespace) -> None:
-    if args.password is not None:
-        return
-
-    try:
-        pwfile = Path("mkosi.rootpw")
-        require_private_file(pwfile, "root password")
-
-        args.password = pwfile.read_text().strip()
-
-    except FileNotFoundError:
-        pass
-
-
-def find_image_version(args: argparse.Namespace) -> None:
-    if args.image_version is not None:
-        return
-
-    try:
-        with open("mkosi.version") as f:
-            args.image_version = f.read().strip()
-    except FileNotFoundError:
-        pass
-
-
-def load_credentials(args: argparse.Namespace) -> dict[str, str]:
-    creds = {}
-
-    d = Path("mkosi.credentials")
-    if d.is_dir():
-        for e in d.iterdir():
-            if os.access(e, os.X_OK):
-                creds[e.name] = run([e], text=True, stdout=subprocess.PIPE, env=os.environ).stdout
-            else:
-                creds[e.name] = e.read_text()
-
-    for s in args.credentials:
-        key, _, value = s.partition("=")
-        creds[key] = value
-
-    if "firstboot.timezone" not in creds:
-        tz = run(
-            ["timedatectl", "show", "-p", "Timezone", "--value"],
-            text=True,
-            stdout=subprocess.PIPE,
-        ).stdout.strip()
-        creds["firstboot.timezone"] = tz
-
-    if "firstboot.locale" not in creds:
-        creds["firstboot.locale"] = "C.UTF-8"
-
-    if "firstboot.hostname" not in creds:
-        creds["firstboot.hostname"] = machine_name(args)
-
-    if args.ssh and "ssh.authorized_keys.root" not in creds and "SSH_AUTH_SOCK" in os.environ:
-        key = run(
-            ["ssh-add", "-L"],
-            text=True,
-            stdout=subprocess.PIPE,
-            env=os.environ,
-        ).stdout.strip()
-        creds["ssh.authorized_keys.root"] = key
-
-    return creds
-
-
-def load_kernel_command_line_extra(args: argparse.Namespace) -> list[str]:
-    columns, lines = shutil.get_terminal_size()
-
-    cmdline = [
-        f"systemd.tty.term.hvc0={os.getenv('TERM', 'vt220')}",
-        f"systemd.tty.columns.hvc0={columns}",
-        f"systemd.tty.rows.hvc0={lines}",
-        f"systemd.tty.term.ttyS0={os.getenv('TERM', 'vt220')}",
-        f"systemd.tty.columns.ttyS0={columns}",
-        f"systemd.tty.rows.ttyS0={lines}",
-        "console=hvc0",
-    ]
-
-    if args.output_format == OutputFormat.cpio:
-        cmdline += ["rd.systemd.unit=default.target"]
-
-    for s in args.kernel_command_line_extra:
-        key, sep, value = s.partition("=")
-        if " " in value:
-            value = f'"{value}"'
-        cmdline += [key if not sep else f"{key}={value}"]
-
-    return cmdline
-
-
-def load_args(args: argparse.Namespace) -> MkosiConfig:
-    ARG_DEBUG.update(args.debug)
-
-    find_image_version(args)
-
-    if args.cmdline and args.verb not in MKOSI_COMMANDS_CMDLINE:
-        die(f"Parameters after verb are only accepted for {list_to_string(verb.name for verb in MKOSI_COMMANDS_CMDLINE)}.")
-
-    if args.verb == Verb.qemu and args.output_format in (
-        OutputFormat.directory,
-        OutputFormat.subvolume,
-        OutputFormat.tar,
-    ):
-        die("Directory, subvolume, tar, cpio, and plain squashfs images cannot be booted in qemu.")
-
-    if shutil.which("bsdtar") and args.distribution == Distribution.openmandriva and args.tar_strip_selinux_context:
-        die("Sorry, bsdtar on OpenMandriva is incompatible with --tar-strip-selinux-context")
-
-    if args.cache_dir:
-        args.cache_dir = args.cache_dir / f"{args.distribution}~{args.release}"
-    if args.build_dir:
-        args.build_dir = args.build_dir / f"{args.distribution}~{args.release}"
-    if args.output_dir:
-        args.output_dir = args.output_dir / f"{args.distribution}~{args.release}"
-
-    if args.mirror is None:
-        if args.distribution in (Distribution.fedora, Distribution.centos):
-            args.mirror = None
-        elif args.distribution == Distribution.debian:
-            args.mirror = "http://deb.debian.org/debian"
-        elif args.distribution == Distribution.ubuntu:
-            if args.architecture == "x86" or args.architecture == "x86_64":
-                args.mirror = "http://archive.ubuntu.com/ubuntu"
-            else:
-                args.mirror = "http://ports.ubuntu.com"
-        elif args.distribution == Distribution.arch:
-            if args.architecture == "aarch64":
-                args.mirror = "http://mirror.archlinuxarm.org"
-            else:
-                args.mirror = "https://geo.mirror.pkgbuild.com"
-        elif args.distribution == Distribution.opensuse:
-            args.mirror = "https://download.opensuse.org"
-        elif args.distribution == Distribution.rocky:
-            args.mirror = None
-        elif args.distribution == Distribution.alma:
-            args.mirror = None
-
-    if args.sign:
-        args.checksum = True
-
-    if args.compress_output is None:
-        args.compress_output = Compression.zst if args.output_format == OutputFormat.cpio else Compression.none
-
-    if args.output is None:
-        iid = args.image_id if args.image_id is not None else "image"
-        prefix = f"{iid}_{args.image_version}" if args.image_version is not None else iid
-
-        if args.output_format == OutputFormat.disk:
-            output = f"{prefix}.raw"
-        elif args.output_format == OutputFormat.tar:
-            output = f"{prefix}.tar"
-        elif args.output_format == OutputFormat.cpio:
-            output = f"{prefix}.cpio"
-        else:
-            output = prefix
-        args.output = Path(output)
-
-    if args.output_dir is not None:
-        if "/" not in str(args.output):
-            args.output = args.output_dir / args.output
-        else:
-            logging.warning("Ignoring configured output directory as output file is a qualified path.")
-
-    args.output = args.output.absolute()
-
-    if args.environment:
-        env = {}
-        for s in args.environment:
-            key, sep, value = s.partition("=")
-            value = value if sep else os.getenv(key, "")
-            env[key] = value
-        args.environment = env
-    else:
-        args.environment = {}
-
-    args.credentials = load_credentials(args)
-    args.kernel_command_line_extra = load_kernel_command_line_extra(args)
-
-    if args.secure_boot and args.verb != Verb.genkey:
-        if args.secure_boot_key is None:
-            die("UEFI SecureBoot enabled, but couldn't find private key.",
-                hint="Consider placing it in mkosi.secure-boot.key")
-
-        if args.secure_boot_certificate is None:
-            die("UEFI SecureBoot enabled, but couldn't find certificate.",
-                hint="Consider placing it in mkosi.secure-boot.crt")
-
-    if args.sign_expected_pcr is True and not shutil.which("systemd-measure"):
-        die("Couldn't find systemd-measure needed for the --sign-expected-pcr option.")
-
-    if args.sign_expected_pcr is None:
-        args.sign_expected_pcr = bool(shutil.which("systemd-measure"))
-
-    # Resolve passwords late so we can accurately determine whether a build is needed
-    find_password(args)
-
-    if args.verb in (Verb.shell, Verb.boot):
-        opname = "acquire shell" if args.verb == Verb.shell else "boot"
-        if args.output_format in (OutputFormat.tar, OutputFormat.cpio):
-            die(f"Sorry, can't {opname} with a {args.output_format} archive.")
-        if args.compress_output != Compression.none:
-            die(f"Sorry, can't {opname} with a compressed image.")
-
-    if args.repo_dirs and not (is_dnf_distribution(args.distribution) or args.distribution == Distribution.arch):
-        die("--repo-dir is only supported on DNF based distributions and Arch")
-
-    if args.qemu_kvm is True and not qemu_check_kvm_support():
-        die("Sorry, the host machine does not support KVM acceleration.")
-
-    if args.qemu_kvm is None:
-        args.qemu_kvm = qemu_check_kvm_support()
-
-    if args.repositories and not is_dnf_distribution(args.distribution) and args.distribution not in (Distribution.debian, Distribution.ubuntu):
-        die("Sorry, the --repositories option is only supported on DNF/Debian based distributions")
-
-    if args.initrds:
-        args.initrds = [p.absolute() for p in args.initrds]
-        for p in args.initrds:
-            if not p.exists():
-                die(f"Initrd {p} not found")
-            if not p.is_file():
-                die(f"Initrd {p} is not a file")
-
-    if args.overlay and not args.base_trees:
-        die("--overlay can only be used with --base-tree")
-
-    # For unprivileged builds we need the userxattr OverlayFS mount option, which is only available in Linux v5.11 and later.
-    with prepend_to_environ_path(args.extra_search_paths):
-        if (args.build_script is not None or args.base_trees) and GenericVersion(platform.release()) < GenericVersion("5.11") and os.geteuid() != 0:
-            die("This unprivileged build configuration requires at least Linux v5.11")
-
-    return MkosiConfig(**vars(args))
-
-
 def cache_tree_paths(config: MkosiConfig) -> tuple[Path, Path]:
 
     # If the image ID is specified, use cache file names that are independent of the image versions, so that
@@ -1950,10 +1696,6 @@ def check_root() -> None:
         die("Must be invoked as root.")
 
 
-def machine_name(config: Union[MkosiConfig, argparse.Namespace]) -> str:
-    return config.image_id or config.output.with_suffix("").name.partition("_")[0]
-
-
 def machine_cid(config: MkosiConfig) -> int:
     cid = int.from_bytes(hashlib.sha256(machine_name(config).encode()).digest()[:4], byteorder='little')
     # Make sure we don't return any of the well-known CIDs.
@@ -2129,18 +1871,6 @@ def find_ovmf_vars(config: MkosiConfig) -> Path:
     die("Couldn't find OVMF UEFI variables file.")
 
 
-def qemu_check_kvm_support() -> bool:
-    kvm = Path("/dev/kvm")
-    if not kvm.is_char_device():
-        return False
-    # some CI runners may present a non-working KVM device
-    try:
-        with kvm.open("r+b"):
-            return True
-    except OSError:
-        return False
-
-
 @contextlib.contextmanager
 def start_swtpm() -> Iterator[Optional[Path]]:
 
@@ -2373,27 +2103,6 @@ def bump_image_version(config: MkosiConfig) -> None:
     Path("mkosi.version").write_text(new_version + "\n")
 
 
-@contextlib.contextmanager
-def prepend_to_environ_path(paths: Sequence[Path]) -> Iterator[None]:
-    if not paths:
-        yield
-        return
-
-    with tempfile.TemporaryDirectory(prefix="mkosi.path", dir=tmp_dir()) as d:
-
-        for path in paths:
-            if not path.is_dir():
-                Path(d).joinpath(path.name).symlink_to(path.absolute())
-
-        paths = [Path(d), *paths]
-
-        news = [os.fspath(path) for path in paths if path.is_dir()]
-        olds = os.getenv("PATH", "").split(":")
-        os.environ["PATH"] = ":".join(news + olds)
-
-        yield
-
-
 def expand_specifier(s: str) -> str:
     return s.replace("%u", current_user().name)
 
index fae05a14b5d074e4e4a124575d0a0c98abd35225..f80aa30f192684e36fedde349fcf00754b3cd815 100644 (file)
@@ -8,8 +8,8 @@ import subprocess
 import sys
 from collections.abc import Iterator
 
-from mkosi import load_args, run_verb
-from mkosi.config import MkosiConfigParser
+from mkosi import run_verb
+from mkosi.config import MkosiConfigParser, load_args
 from mkosi.log import ARG_DEBUG, die, log_setup
 from mkosi.run import excepthook
 
index eba8ae329405ce88d7ef9d5f7bf626f36c631589..a99d2e67ced13c6c4b7b873ff6d6d20919e8495a 100644 (file)
@@ -4,17 +4,21 @@ import dataclasses
 import enum
 import fnmatch
 import functools
+import logging
 import os.path
 import platform
 import shlex
+import shutil
+import subprocess
 import sys
 import textwrap
 from collections.abc import Sequence
 from pathlib import Path
 from typing import Any, Callable, Optional, Type, Union, cast
 
-from mkosi.log import Style, die
+from mkosi.log import ARG_DEBUG, Style, die
 from mkosi.pager import page
+from mkosi.run import run
 from mkosi.util import (
     Compression,
     Distribution,
@@ -25,10 +29,14 @@ from mkosi.util import (
     current_user,
     detect_distribution,
     flatten,
+    is_dnf_distribution,
+    prepend_to_environ_path,
+    qemu_check_kvm_support,
 )
 
 __version__ = "14"
 
+MKOSI_COMMANDS_CMDLINE = (Verb.build, Verb.shell, Verb.boot, Verb.qemu, Verb.ssh)
 
 ConfigParseCallback = Callable[[str, Optional[str], argparse.Namespace], Any]
 ConfigMatchCallback = Callable[[str, str, argparse.Namespace], bool]
@@ -1463,3 +1471,441 @@ class MkosiConfigParser:
             setattr(namespace, s.dest, default)
 
         return namespace
+
+
+
+@functools.total_ordering
+class GenericVersion:
+    def __init__(self, version: str):
+        self._version = version
+
+    def __eq__(self, other: object) -> bool:
+        if not isinstance(other, GenericVersion):
+            return False
+        cmd = ["systemd-analyze", "compare-versions", self._version, "eq", other._version]
+        return run(cmd, check=False).returncode == 0
+
+    def __lt__(self, other: object) -> bool:
+        if not isinstance(other, GenericVersion):
+            return False
+        cmd = ["systemd-analyze", "compare-versions", self._version, "lt", other._version]
+        return run(cmd, check=False).returncode == 0
+
+
+
+@dataclasses.dataclass(frozen=True)
+class MkosiConfig:
+    """Type-hinted storage for command line arguments.
+
+    Only user configuration is stored here while dynamic state exists in
+    MkosiState. If a field of the same name exists in both classes always
+    access the value from state.
+    """
+
+    verb: Verb
+    cmdline: list[str]
+    force: int
+
+    distribution: Distribution
+    release: str
+    mirror: Optional[str]
+    local_mirror: Optional[str]
+    repository_key_check: bool
+    repositories: list[str]
+    repo_dirs: list[Path]
+    repart_dirs: list[Path]
+    overlay: bool
+    architecture: str
+    output_format: OutputFormat
+    manifest_format: list[ManifestFormat]
+    output: Path
+    output_dir: Optional[Path]
+    kernel_command_line: list[str]
+    secure_boot: bool
+    secure_boot_key: Optional[Path]
+    secure_boot_certificate: Optional[Path]
+    secure_boot_valid_days: str
+    secure_boot_common_name: str
+    sign_expected_pcr: bool
+    compress_output: Compression
+    image_version: Optional[str]
+    image_id: Optional[str]
+    tar_strip_selinux_context: bool
+    incremental: bool
+    cache_initrd: bool
+    packages: list[str]
+    remove_packages: list[str]
+    with_docs: bool
+    with_tests: bool
+    cache_dir: Optional[Path]
+    base_trees: list[Path]
+    extra_trees: list[tuple[Path, Optional[Path]]]
+    skeleton_trees: list[tuple[Path, Optional[Path]]]
+    clean_package_metadata: Optional[bool]
+    remove_files: list[str]
+    environment: dict[str, str]
+    build_sources: Path
+    build_dir: Optional[Path]
+    install_dir: Optional[Path]
+    build_packages: list[str]
+    build_script: Optional[Path]
+    prepare_script: Optional[Path]
+    postinst_script: Optional[Path]
+    finalize_script: Optional[Path]
+    with_network: bool
+    cache_only: bool
+    nspawn_settings: Optional[Path]
+    checksum: bool
+    split_artifacts: bool
+    sign: bool
+    key: Optional[str]
+    password: Optional[str]
+    password_is_hashed: bool
+    autologin: bool
+    extra_search_paths: list[Path]
+    ephemeral: bool
+    ssh: bool
+    credentials: dict[str, str]
+    directory: Optional[Path]
+    debug: list[str]
+    auto_bump: bool
+    workspace_dir: Optional[Path]
+    initrds: list[Path]
+    make_initrd: bool
+    kernel_command_line_extra: list[str]
+    acl: bool
+    pager: bool
+    bootable: Optional[bool]
+
+    # QEMU-specific options
+    qemu_gui: bool
+    qemu_smp: str
+    qemu_mem: str
+    qemu_kvm: bool
+    qemu_args: Sequence[str]
+
+    passphrase: Optional[Path]
+
+    def architecture_is_native(self) -> bool:
+        return self.architecture == platform.machine()
+
+    @property
+    def output_split_uki(self) -> Path:
+        return build_auxiliary_output_path(self, ".efi")
+
+    @property
+    def output_split_kernel(self) -> Path:
+        return build_auxiliary_output_path(self, ".vmlinuz")
+
+    @property
+    def output_nspawn_settings(self) -> Path:
+        return build_auxiliary_output_path(self, ".nspawn")
+
+    @property
+    def output_checksum(self) -> Path:
+        return Path("SHA256SUMS")
+
+    @property
+    def output_signature(self) -> Path:
+        return Path("SHA256SUMS.gpg")
+
+    @property
+    def output_sshkey(self) -> Path:
+        return build_auxiliary_output_path(self, ".ssh")
+
+    @property
+    def output_manifest(self) -> Path:
+        return build_auxiliary_output_path(self, ".manifest")
+
+    @property
+    def output_changelog(self) -> Path:
+        return build_auxiliary_output_path(self, ".changelog")
+
+    @property
+    def output_compressed(self) -> Path:
+        if self.compress_output == Compression.none:
+            return self.output
+
+        return self.output.parent / f"{self.output.name}.{self.compress_output.name}"
+
+    def output_paths(self) -> tuple[Path, ...]:
+        return (
+            self.output,
+            self.output_split_uki,
+            self.output_split_kernel,
+            self.output_nspawn_settings,
+            self.output_checksum,
+            self.output_signature,
+            self.output_sshkey,
+            self.output_manifest,
+            self.output_changelog,
+        )
+
+
+
+def strip_suffixes(path: Path) -> Path:
+    while path.suffix in {
+        ".xz",
+        ".zstd",
+        ".raw",
+        ".tar",
+        ".cpio",
+    }:
+        path = path.with_suffix("")
+
+    return path
+
+
+def build_auxiliary_output_path(args: Union[argparse.Namespace, MkosiConfig], suffix: str) -> Path:
+    output = strip_suffixes(args.output)
+    return output.with_name(f"{output.name}{suffix}")
+
+
+def find_image_version(args: argparse.Namespace) -> None:
+    if args.image_version is not None:
+        return
+
+    try:
+        with open("mkosi.version") as f:
+            args.image_version = f.read().strip()
+    except FileNotFoundError:
+        pass
+
+
+def require_private_file(name: Path, description: str) -> None:
+    mode = os.stat(name).st_mode & 0o777
+    if mode & 0o007:
+        logging.warning(textwrap.dedent(f"""\
+            Permissions of '{name}' of '{mode:04o}' are too open.
+            When creating {description} files use an access mode that restricts access to the owner only.
+        """))
+
+
+def find_password(args: argparse.Namespace) -> None:
+    if args.password is not None:
+        return
+
+    try:
+        pwfile = Path("mkosi.rootpw")
+        require_private_file(pwfile, "root password")
+
+        args.password = pwfile.read_text().strip()
+
+    except FileNotFoundError:
+        pass
+
+
+
+def machine_name(config: Union[MkosiConfig, argparse.Namespace]) -> str:
+    return config.image_id or config.output.with_suffix("").name.partition("_")[0]
+
+
+def load_credentials(args: argparse.Namespace) -> dict[str, str]:
+    creds = {}
+
+    d = Path("mkosi.credentials")
+    if d.is_dir():
+        for e in d.iterdir():
+            if os.access(e, os.X_OK):
+                creds[e.name] = run([e], text=True, stdout=subprocess.PIPE, env=os.environ).stdout
+            else:
+                creds[e.name] = e.read_text()
+
+    for s in args.credentials:
+        key, _, value = s.partition("=")
+        creds[key] = value
+
+    if "firstboot.timezone" not in creds:
+        tz = run(
+            ["timedatectl", "show", "-p", "Timezone", "--value"],
+            text=True,
+            stdout=subprocess.PIPE,
+        ).stdout.strip()
+        creds["firstboot.timezone"] = tz
+
+    if "firstboot.locale" not in creds:
+        creds["firstboot.locale"] = "C.UTF-8"
+
+    if "firstboot.hostname" not in creds:
+        creds["firstboot.hostname"] = machine_name(args)
+
+    if args.ssh and "ssh.authorized_keys.root" not in creds and "SSH_AUTH_SOCK" in os.environ:
+        key = run(
+            ["ssh-add", "-L"],
+            text=True,
+            stdout=subprocess.PIPE,
+            env=os.environ,
+        ).stdout.strip()
+        creds["ssh.authorized_keys.root"] = key
+
+    return creds
+
+
+def load_kernel_command_line_extra(args: argparse.Namespace) -> list[str]:
+    columns, lines = shutil.get_terminal_size()
+
+    cmdline = [
+        f"systemd.tty.term.hvc0={os.getenv('TERM', 'vt220')}",
+        f"systemd.tty.columns.hvc0={columns}",
+        f"systemd.tty.rows.hvc0={lines}",
+        f"systemd.tty.term.ttyS0={os.getenv('TERM', 'vt220')}",
+        f"systemd.tty.columns.ttyS0={columns}",
+        f"systemd.tty.rows.ttyS0={lines}",
+        "console=hvc0",
+    ]
+
+    if args.output_format == OutputFormat.cpio:
+        cmdline += ["rd.systemd.unit=default.target"]
+
+    for s in args.kernel_command_line_extra:
+        key, sep, value = s.partition("=")
+        if " " in value:
+            value = f'"{value}"'
+        cmdline += [key if not sep else f"{key}={value}"]
+
+    return cmdline
+
+
+def load_args(args: argparse.Namespace) -> MkosiConfig:
+    ARG_DEBUG.update(args.debug)
+
+    find_image_version(args)
+
+    if args.cmdline and args.verb not in MKOSI_COMMANDS_CMDLINE:
+        die(f"Parameters after verb are only accepted for {' '.join(verb.name for verb in MKOSI_COMMANDS_CMDLINE)}.")
+
+    if args.verb == Verb.qemu and args.output_format in (
+        OutputFormat.directory,
+        OutputFormat.subvolume,
+        OutputFormat.tar,
+    ):
+        die("Directory, subvolume, tar, cpio, and plain squashfs images cannot be booted in qemu.")
+
+    if shutil.which("bsdtar") and args.distribution == Distribution.openmandriva and args.tar_strip_selinux_context:
+        die("Sorry, bsdtar on OpenMandriva is incompatible with --tar-strip-selinux-context")
+
+    if args.cache_dir:
+        args.cache_dir = args.cache_dir / f"{args.distribution}~{args.release}"
+    if args.build_dir:
+        args.build_dir = args.build_dir / f"{args.distribution}~{args.release}"
+    if args.output_dir:
+        args.output_dir = args.output_dir / f"{args.distribution}~{args.release}"
+
+    if args.mirror is None:
+        if args.distribution in (Distribution.fedora, Distribution.centos):
+            args.mirror = None
+        elif args.distribution == Distribution.debian:
+            args.mirror = "http://deb.debian.org/debian"
+        elif args.distribution == Distribution.ubuntu:
+            if args.architecture == "x86" or args.architecture == "x86_64":
+                args.mirror = "http://archive.ubuntu.com/ubuntu"
+            else:
+                args.mirror = "http://ports.ubuntu.com"
+        elif args.distribution == Distribution.arch:
+            if args.architecture == "aarch64":
+                args.mirror = "http://mirror.archlinuxarm.org"
+            else:
+                args.mirror = "https://geo.mirror.pkgbuild.com"
+        elif args.distribution == Distribution.opensuse:
+            args.mirror = "https://download.opensuse.org"
+        elif args.distribution == Distribution.rocky:
+            args.mirror = None
+        elif args.distribution == Distribution.alma:
+            args.mirror = None
+
+    if args.sign:
+        args.checksum = True
+
+    if args.compress_output is None:
+        args.compress_output = Compression.zst if args.output_format == OutputFormat.cpio else Compression.none
+
+    if args.output is None:
+        iid = args.image_id if args.image_id is not None else "image"
+        prefix = f"{iid}_{args.image_version}" if args.image_version is not None else iid
+
+        if args.output_format == OutputFormat.disk:
+            output = f"{prefix}.raw"
+        elif args.output_format == OutputFormat.tar:
+            output = f"{prefix}.tar"
+        elif args.output_format == OutputFormat.cpio:
+            output = f"{prefix}.cpio"
+        else:
+            output = prefix
+        args.output = Path(output)
+
+    if args.output_dir is not None:
+        if "/" not in str(args.output):
+            args.output = args.output_dir / args.output
+        else:
+            logging.warning("Ignoring configured output directory as output file is a qualified path.")
+
+    args.output = args.output.absolute()
+
+    if args.environment:
+        env = {}
+        for s in args.environment:
+            key, sep, value = s.partition("=")
+            value = value if sep else os.getenv(key, "")
+            env[key] = value
+        args.environment = env
+    else:
+        args.environment = {}
+
+    args.credentials = load_credentials(args)
+    args.kernel_command_line_extra = load_kernel_command_line_extra(args)
+
+    if args.secure_boot and args.verb != Verb.genkey:
+        if args.secure_boot_key is None:
+            die("UEFI SecureBoot enabled, but couldn't find private key.",
+                hint="Consider placing it in mkosi.secure-boot.key")
+
+        if args.secure_boot_certificate is None:
+            die("UEFI SecureBoot enabled, but couldn't find certificate.",
+                hint="Consider placing it in mkosi.secure-boot.crt")
+
+    if args.sign_expected_pcr is True and not shutil.which("systemd-measure"):
+        die("Couldn't find systemd-measure needed for the --sign-expected-pcr option.")
+
+    if args.sign_expected_pcr is None:
+        args.sign_expected_pcr = bool(shutil.which("systemd-measure"))
+
+    # Resolve passwords late so we can accurately determine whether a build is needed
+    find_password(args)
+
+    if args.verb in (Verb.shell, Verb.boot):
+        opname = "acquire shell" if args.verb == Verb.shell else "boot"
+        if args.output_format in (OutputFormat.tar, OutputFormat.cpio):
+            die(f"Sorry, can't {opname} with a {args.output_format} archive.")
+        if args.compress_output != Compression.none:
+            die(f"Sorry, can't {opname} with a compressed image.")
+
+    if args.repo_dirs and not (is_dnf_distribution(args.distribution) or args.distribution == Distribution.arch):
+        die("--repo-dir is only supported on DNF based distributions and Arch")
+
+    if args.qemu_kvm is True and not qemu_check_kvm_support():
+        die("Sorry, the host machine does not support KVM acceleration.")
+
+    if args.qemu_kvm is None:
+        args.qemu_kvm = qemu_check_kvm_support()
+
+    if args.repositories and not is_dnf_distribution(args.distribution) and args.distribution not in (Distribution.debian, Distribution.ubuntu):
+        die("Sorry, the --repositories option is only supported on DNF/Debian based distributions")
+
+    if args.initrds:
+        args.initrds = [p.absolute() for p in args.initrds]
+        for p in args.initrds:
+            if not p.exists():
+                die(f"Initrd {p} not found")
+            if not p.is_file():
+                die(f"Initrd {p} is not a file")
+
+    if args.overlay and not args.base_trees:
+        die("--overlay can only be used with --base-tree")
+
+    # For unprivileged builds we need the userxattr OverlayFS mount option, which is only available in Linux v5.11 and later.
+    with prepend_to_environ_path(args.extra_search_paths):
+        if (args.build_script is not None or args.base_trees) and GenericVersion(platform.release()) < GenericVersion("5.11") and os.geteuid() != 0:
+            die("This unprivileged build configuration requires at least Linux v5.11")
+
+    return MkosiConfig(**vars(args))
+
index 61b58eb835567635d0bfebcade4b4347d6ba38b5..6750b88c353bc32a7db373f974ba6565f5a48c0d 100644 (file)
@@ -5,12 +5,13 @@ import shutil
 from collections.abc import Sequence
 from pathlib import Path
 
+from mkosi.config import MkosiConfig
 from mkosi.distributions import DistributionInstaller
 from mkosi.distributions.fedora import Repo, invoke_dnf, setup_dnf
 from mkosi.log import complete_step, die
 from mkosi.remove import unlink_try_hard
 from mkosi.state import MkosiState
-from mkosi.util import Distribution, MkosiConfig
+from mkosi.util import Distribution
 
 
 def move_rpm_db(root: Path) -> None:
index bda72a4d1241a19a767c30be7004d3ae01cc1d0d..89917049216a8262261b9beba0d79d9dc2fa99ab 100644 (file)
@@ -1,7 +1,6 @@
 # SPDX-License-Identifier: LGPL-2.1+
 
 import dataclasses
-import functools
 import json
 from datetime import datetime
 from pathlib import Path
@@ -9,8 +8,9 @@ from subprocess import DEVNULL, PIPE
 from textwrap import dedent
 from typing import IO, Any, Optional
 
+from mkosi.config import MkosiConfig
 from mkosi.run import run
-from mkosi.util import Distribution, ManifestFormat, MkosiConfig, PackageType
+from mkosi.util import Distribution, ManifestFormat, PackageType
 
 
 @dataclasses.dataclass
@@ -289,21 +289,3 @@ class Manifest:
         for package in self.source_packages.values():
             print(f"\n{80*'-'}\n", file=out)
             out.write(package.report())
-
-
-@functools.total_ordering
-class GenericVersion:
-    def __init__(self, version: str):
-        self._version = version
-
-    def __eq__(self, other: object) -> bool:
-        if not isinstance(other, GenericVersion):
-            return False
-        cmd = ["systemd-analyze", "compare-versions", self._version, "eq", other._version]
-        return run(cmd, check=False).returncode == 0
-
-    def __lt__(self, other: object) -> bool:
-        if not isinstance(other, GenericVersion):
-            return False
-        cmd = ["systemd-analyze", "compare-versions", self._version, "lt", other._version]
-        return run(cmd, check=False).returncode == 0
index 6eef34d41b15610b55386defa3c9c2193ae3fd2c..87ab0e54882d1b4a8b77782d241338c0d50c14ac 100644 (file)
@@ -9,8 +9,8 @@ from collections.abc import Iterator, Sequence
 from pathlib import Path
 from typing import Callable, Deque, Optional, TypeVar, Union, cast
 
+from mkosi.config import GenericVersion
 from mkosi.log import complete_step
-from mkosi.manifest import GenericVersion
 from mkosi.run import run
 from mkosi.types import PathString
 
index fa326dad3d96baea52695b17bd7329d1275d8275..386317d4c0049643eb0b2ed97aae8005f1f5d47e 100644 (file)
@@ -4,9 +4,9 @@ import dataclasses
 import importlib
 from pathlib import Path
 
+from mkosi.config import MkosiConfig
 from mkosi.distributions import DistributionInstaller
 from mkosi.log import die
-from mkosi.util import MkosiConfig
 
 
 @dataclasses.dataclass
index 1a6071f84785263857a8f04674ff887dd9a62072..281347e4eeb77f08259e8429aec28f34f09086d4 100644 (file)
@@ -1,6 +1,5 @@
 # SPDX-License-Identifier: LGPL-2.1+
 
-import argparse
 import ast
 import contextlib
 import dataclasses
@@ -9,16 +8,16 @@ import functools
 import getpass
 import itertools
 import os
-import platform
 import pwd
 import re
 import resource
 import shutil
 import sys
 import tarfile
+import tempfile
 from collections.abc import Iterable, Iterator, Sequence
 from pathlib import Path
-from typing import Any, Callable, Optional, TypeVar, Union
+from typing import Any, Callable, Optional, TypeVar
 
 from mkosi.log import die
 
@@ -183,174 +182,6 @@ class ManifestFormat(str, enum.Enum):
     changelog = "changelog"  # human-readable text file with package changelogs
 
 
-KNOWN_SUFFIXES = {
-    ".xz",
-    ".zstd",
-    ".raw",
-    ".tar",
-    ".cpio",
-}
-
-
-def strip_suffixes(path: Path) -> Path:
-    while path.suffix in KNOWN_SUFFIXES:
-        path = path.with_suffix("")
-    return path
-
-
-@dataclasses.dataclass(frozen=True)
-class MkosiConfig:
-    """Type-hinted storage for command line arguments.
-
-    Only user configuration is stored here while dynamic state exists in
-    MkosiState. If a field of the same name exists in both classes always
-    access the value from state.
-    """
-
-    verb: Verb
-    cmdline: list[str]
-    force: int
-
-    distribution: Distribution
-    release: str
-    mirror: Optional[str]
-    local_mirror: Optional[str]
-    repository_key_check: bool
-    repositories: list[str]
-    repo_dirs: list[Path]
-    repart_dirs: list[Path]
-    overlay: bool
-    architecture: str
-    output_format: OutputFormat
-    manifest_format: list[ManifestFormat]
-    output: Path
-    output_dir: Optional[Path]
-    kernel_command_line: list[str]
-    secure_boot: bool
-    secure_boot_key: Optional[Path]
-    secure_boot_certificate: Optional[Path]
-    secure_boot_valid_days: str
-    secure_boot_common_name: str
-    sign_expected_pcr: bool
-    compress_output: Compression
-    image_version: Optional[str]
-    image_id: Optional[str]
-    tar_strip_selinux_context: bool
-    incremental: bool
-    cache_initrd: bool
-    packages: list[str]
-    remove_packages: list[str]
-    with_docs: bool
-    with_tests: bool
-    cache_dir: Optional[Path]
-    base_trees: list[Path]
-    extra_trees: list[tuple[Path, Optional[Path]]]
-    skeleton_trees: list[tuple[Path, Optional[Path]]]
-    clean_package_metadata: Optional[bool]
-    remove_files: list[str]
-    environment: dict[str, str]
-    build_sources: Path
-    build_dir: Optional[Path]
-    install_dir: Optional[Path]
-    build_packages: list[str]
-    build_script: Optional[Path]
-    prepare_script: Optional[Path]
-    postinst_script: Optional[Path]
-    finalize_script: Optional[Path]
-    with_network: bool
-    cache_only: bool
-    nspawn_settings: Optional[Path]
-    checksum: bool
-    split_artifacts: bool
-    sign: bool
-    key: Optional[str]
-    password: Optional[str]
-    password_is_hashed: bool
-    autologin: bool
-    extra_search_paths: list[Path]
-    ephemeral: bool
-    ssh: bool
-    credentials: dict[str, str]
-    directory: Optional[Path]
-    debug: list[str]
-    auto_bump: bool
-    workspace_dir: Optional[Path]
-    initrds: list[Path]
-    make_initrd: bool
-    kernel_command_line_extra: list[str]
-    acl: bool
-    pager: bool
-    bootable: Optional[bool]
-
-    # QEMU-specific options
-    qemu_gui: bool
-    qemu_smp: str
-    qemu_mem: str
-    qemu_kvm: bool
-    qemu_args: Sequence[str]
-
-    passphrase: Optional[Path]
-
-    def architecture_is_native(self) -> bool:
-        return self.architecture == platform.machine()
-
-    @property
-    def output_split_uki(self) -> Path:
-        return build_auxiliary_output_path(self, ".efi")
-
-    @property
-    def output_split_kernel(self) -> Path:
-        return build_auxiliary_output_path(self, ".vmlinuz")
-
-    @property
-    def output_nspawn_settings(self) -> Path:
-        return build_auxiliary_output_path(self, ".nspawn")
-
-    @property
-    def output_checksum(self) -> Path:
-        return Path("SHA256SUMS")
-
-    @property
-    def output_signature(self) -> Path:
-        return Path("SHA256SUMS.gpg")
-
-    @property
-    def output_sshkey(self) -> Path:
-        return build_auxiliary_output_path(self, ".ssh")
-
-    @property
-    def output_manifest(self) -> Path:
-        return build_auxiliary_output_path(self, ".manifest")
-
-    @property
-    def output_changelog(self) -> Path:
-        return build_auxiliary_output_path(self, ".changelog")
-
-    @property
-    def output_compressed(self) -> Path:
-        if self.compress_output == Compression.none:
-            return self.output
-
-        return self.output.parent / f"{self.output.name}.{self.compress_output.name}"
-
-    def output_paths(self) -> tuple[Path, ...]:
-        return (
-            self.output,
-            self.output_split_uki,
-            self.output_split_kernel,
-            self.output_nspawn_settings,
-            self.output_checksum,
-            self.output_signature,
-            self.output_sshkey,
-            self.output_manifest,
-            self.output_changelog,
-        )
-
-
-def build_auxiliary_output_path(args: Union[argparse.Namespace, MkosiConfig], suffix: str) -> Path:
-    output = strip_suffixes(args.output)
-    return output.with_name(f"{output.name}{suffix}")
-
 
 def format_rlimit(rlimit: int) -> str:
     limits = resource.getrlimit(rlimit)
@@ -473,3 +304,36 @@ def chdir(directory: Path) -> Iterator[None]:
         yield
     finally:
         os.chdir(old)
+
+
+@contextlib.contextmanager
+def prepend_to_environ_path(paths: Sequence[Path]) -> Iterator[None]:
+    if not paths:
+        yield
+        return
+
+    with tempfile.TemporaryDirectory(prefix="mkosi.path", dir=tmp_dir()) as d:
+
+        for path in paths:
+            if not path.is_dir():
+                Path(d).joinpath(path.name).symlink_to(path.absolute())
+
+        paths = [Path(d), *paths]
+
+        news = [os.fspath(path) for path in paths if path.is_dir()]
+        olds = os.getenv("PATH", "").split(":")
+        os.environ["PATH"] = ":".join(news + olds)
+
+        yield
+
+
+def qemu_check_kvm_support() -> bool:
+    kvm = Path("/dev/kvm")
+    if not kvm.is_char_device():
+        return False
+    # some CI runners may present a non-working KVM device
+    try:
+        with kvm.open("r+b"):
+            return True
+    except OSError:
+        return False
diff --git a/tests/test_config.py b/tests/test_config.py
new file mode 100644 (file)
index 0000000..26350c3
--- /dev/null
@@ -0,0 +1,16 @@
+# SPDX-License-Identifier: LGPL-2.1+
+
+from pathlib import Path
+
+from mkosi.config import strip_suffixes
+
+
+def test_strip_suffixes() -> None:
+    assert strip_suffixes(Path("home/test.zstd")) == Path("home/test")
+    assert strip_suffixes(Path("home/test.xz")) == Path("home/test")
+    assert strip_suffixes(Path("home/test.raw")) == Path("home/test")
+    assert strip_suffixes(Path("home/test.tar")) == Path("home/test")
+    assert strip_suffixes(Path("home/test.cpio")) == Path("home/test")
+    assert strip_suffixes(Path("home.xz/test.xz")) == Path("home.xz/test")
+    assert strip_suffixes(Path("home.xz/test")) == Path("home.xz/test")
+    assert strip_suffixes(Path("home.xz/test.txt")) == Path("home.xz/test.txt")
index 38bd63f9a16fdbeff3474b4b3483c396bdf245e3..9a06e66ff2d65cd13ba035b80fb4c73b7c5dc6a5 100644 (file)
@@ -9,9 +9,8 @@ from typing import Iterator, List, Optional
 
 import pytest
 
-import mkosi
-from mkosi.util import Compression, Distribution, MkosiConfig, Verb
-from mkosi.config import MkosiConfigParser
+from mkosi.util import Compression, Distribution, Verb
+from mkosi.config import MkosiConfigParser, MkosiConfig, load_args
 
 
 @contextmanager
@@ -27,7 +26,7 @@ def cd_temp_dir() -> Iterator[None]:
 
 
 def parse(argv: Optional[List[str]] = None) -> MkosiConfig:
-    return mkosi.load_args(MkosiConfigParser().parse(argv))
+    return load_args(MkosiConfigParser().parse(argv))
 
 
 def test_parse_load_verb() -> None:
index 448aa05965bde9a20c73145470e4c53b0ff77cf4..614df2cc6848eb10b0e12423bea69249b2ad74b9 100644 (file)
@@ -12,7 +12,6 @@ from mkosi.util import (
     PackageType,
     safe_tar_extract,
     set_umask,
-    strip_suffixes,
 )
 
 def test_distribution() -> None:
@@ -59,13 +58,3 @@ def test_safe_tar_extract(tmp_path: Path) -> None:
     assert not (evil_target / name).exists()
     assert not (Path("/tmp") / name).exists()
 
-
-def test_strip_suffixes() -> None:
-    assert strip_suffixes(Path("home/test.zstd")) == Path("home/test")
-    assert strip_suffixes(Path("home/test.xz")) == Path("home/test")
-    assert strip_suffixes(Path("home/test.raw")) == Path("home/test")
-    assert strip_suffixes(Path("home/test.tar")) == Path("home/test")
-    assert strip_suffixes(Path("home/test.cpio")) == Path("home/test")
-    assert strip_suffixes(Path("home.xz/test.xz")) == Path("home.xz/test")
-    assert strip_suffixes(Path("home.xz/test")) == Path("home.xz/test")
-    assert strip_suffixes(Path("home.xz/test.txt")) == Path("home.xz/test.txt")