And all the other related changes to make that work.
import json
import logging
import os
-import platform
import resource
import shutil
import subprocess
from textwrap import dedent
from typing import Callable, ContextManager, Optional, TextIO, TypeVar, Union, cast
+from mkosi.config import GenericVersion, MkosiConfig, machine_name
from mkosi.install import add_dropin_config_from_resource, copy_path, flock
from mkosi.log import ARG_DEBUG, Style, color_error, complete_step, die, log_step
-from mkosi.manifest import GenericVersion, Manifest
+from mkosi.manifest import Manifest
from mkosi.mounts import dissect_and_mount, mount_overlay, scandir_recursive
from mkosi.pager import page
from mkosi.remove import unlink_try_hard
Compression,
Distribution,
ManifestFormat,
- MkosiConfig,
OutputFormat,
Verb,
current_user,
flatten,
format_rlimit,
- is_dnf_distribution,
patch_file,
+ prepend_to_environ_path,
set_umask,
tmp_dir,
)
MKOSI_COMMANDS_NEED_BUILD = (Verb.shell, Verb.boot, Verb.qemu, Verb.serve)
MKOSI_COMMANDS_SUDO = (Verb.shell, Verb.boot)
-MKOSI_COMMANDS_CMDLINE = (Verb.build, Verb.shell, Verb.boot, Verb.qemu, Verb.ssh)
T = TypeVar("T")
-def list_to_string(seq: Iterator[str]) -> str:
- """Print contents of a list to a comma-separated string
-
- ['a', "b", 11] → "'a', 'b', 11"
- """
- return str(list(seq))[1:-1]
-
-
# EFI has its own conventions too
EFI_ARCHITECTURES = {
"x86_64": "x64",
empty_directory(config.cache_dir)
-def require_private_file(name: Path, description: str) -> None:
- mode = os.stat(name).st_mode & 0o777
- if mode & 0o007:
- logging.warning(dedent(f"""\
- Permissions of '{name}' of '{mode:04o}' are too open.
- When creating {description} files use an access mode that restricts access to the owner only.
- """))
-
-
-def find_password(args: argparse.Namespace) -> None:
- if args.password is not None:
- return
-
- try:
- pwfile = Path("mkosi.rootpw")
- require_private_file(pwfile, "root password")
-
- args.password = pwfile.read_text().strip()
-
- except FileNotFoundError:
- pass
-
-
-def find_image_version(args: argparse.Namespace) -> None:
- if args.image_version is not None:
- return
-
- try:
- with open("mkosi.version") as f:
- args.image_version = f.read().strip()
- except FileNotFoundError:
- pass
-
-
-def load_credentials(args: argparse.Namespace) -> dict[str, str]:
- creds = {}
-
- d = Path("mkosi.credentials")
- if d.is_dir():
- for e in d.iterdir():
- if os.access(e, os.X_OK):
- creds[e.name] = run([e], text=True, stdout=subprocess.PIPE, env=os.environ).stdout
- else:
- creds[e.name] = e.read_text()
-
- for s in args.credentials:
- key, _, value = s.partition("=")
- creds[key] = value
-
- if "firstboot.timezone" not in creds:
- tz = run(
- ["timedatectl", "show", "-p", "Timezone", "--value"],
- text=True,
- stdout=subprocess.PIPE,
- ).stdout.strip()
- creds["firstboot.timezone"] = tz
-
- if "firstboot.locale" not in creds:
- creds["firstboot.locale"] = "C.UTF-8"
-
- if "firstboot.hostname" not in creds:
- creds["firstboot.hostname"] = machine_name(args)
-
- if args.ssh and "ssh.authorized_keys.root" not in creds and "SSH_AUTH_SOCK" in os.environ:
- key = run(
- ["ssh-add", "-L"],
- text=True,
- stdout=subprocess.PIPE,
- env=os.environ,
- ).stdout.strip()
- creds["ssh.authorized_keys.root"] = key
-
- return creds
-
-
-def load_kernel_command_line_extra(args: argparse.Namespace) -> list[str]:
- columns, lines = shutil.get_terminal_size()
-
- cmdline = [
- f"systemd.tty.term.hvc0={os.getenv('TERM', 'vt220')}",
- f"systemd.tty.columns.hvc0={columns}",
- f"systemd.tty.rows.hvc0={lines}",
- f"systemd.tty.term.ttyS0={os.getenv('TERM', 'vt220')}",
- f"systemd.tty.columns.ttyS0={columns}",
- f"systemd.tty.rows.ttyS0={lines}",
- "console=hvc0",
- ]
-
- if args.output_format == OutputFormat.cpio:
- cmdline += ["rd.systemd.unit=default.target"]
-
- for s in args.kernel_command_line_extra:
- key, sep, value = s.partition("=")
- if " " in value:
- value = f'"{value}"'
- cmdline += [key if not sep else f"{key}={value}"]
-
- return cmdline
-
-
-def load_args(args: argparse.Namespace) -> MkosiConfig:
- ARG_DEBUG.update(args.debug)
-
- find_image_version(args)
-
- if args.cmdline and args.verb not in MKOSI_COMMANDS_CMDLINE:
- die(f"Parameters after verb are only accepted for {list_to_string(verb.name for verb in MKOSI_COMMANDS_CMDLINE)}.")
-
- if args.verb == Verb.qemu and args.output_format in (
- OutputFormat.directory,
- OutputFormat.subvolume,
- OutputFormat.tar,
- ):
- die("Directory, subvolume, tar, cpio, and plain squashfs images cannot be booted in qemu.")
-
- if shutil.which("bsdtar") and args.distribution == Distribution.openmandriva and args.tar_strip_selinux_context:
- die("Sorry, bsdtar on OpenMandriva is incompatible with --tar-strip-selinux-context")
-
- if args.cache_dir:
- args.cache_dir = args.cache_dir / f"{args.distribution}~{args.release}"
- if args.build_dir:
- args.build_dir = args.build_dir / f"{args.distribution}~{args.release}"
- if args.output_dir:
- args.output_dir = args.output_dir / f"{args.distribution}~{args.release}"
-
- if args.mirror is None:
- if args.distribution in (Distribution.fedora, Distribution.centos):
- args.mirror = None
- elif args.distribution == Distribution.debian:
- args.mirror = "http://deb.debian.org/debian"
- elif args.distribution == Distribution.ubuntu:
- if args.architecture == "x86" or args.architecture == "x86_64":
- args.mirror = "http://archive.ubuntu.com/ubuntu"
- else:
- args.mirror = "http://ports.ubuntu.com"
- elif args.distribution == Distribution.arch:
- if args.architecture == "aarch64":
- args.mirror = "http://mirror.archlinuxarm.org"
- else:
- args.mirror = "https://geo.mirror.pkgbuild.com"
- elif args.distribution == Distribution.opensuse:
- args.mirror = "https://download.opensuse.org"
- elif args.distribution == Distribution.rocky:
- args.mirror = None
- elif args.distribution == Distribution.alma:
- args.mirror = None
-
- if args.sign:
- args.checksum = True
-
- if args.compress_output is None:
- args.compress_output = Compression.zst if args.output_format == OutputFormat.cpio else Compression.none
-
- if args.output is None:
- iid = args.image_id if args.image_id is not None else "image"
- prefix = f"{iid}_{args.image_version}" if args.image_version is not None else iid
-
- if args.output_format == OutputFormat.disk:
- output = f"{prefix}.raw"
- elif args.output_format == OutputFormat.tar:
- output = f"{prefix}.tar"
- elif args.output_format == OutputFormat.cpio:
- output = f"{prefix}.cpio"
- else:
- output = prefix
- args.output = Path(output)
-
- if args.output_dir is not None:
- if "/" not in str(args.output):
- args.output = args.output_dir / args.output
- else:
- logging.warning("Ignoring configured output directory as output file is a qualified path.")
-
- args.output = args.output.absolute()
-
- if args.environment:
- env = {}
- for s in args.environment:
- key, sep, value = s.partition("=")
- value = value if sep else os.getenv(key, "")
- env[key] = value
- args.environment = env
- else:
- args.environment = {}
-
- args.credentials = load_credentials(args)
- args.kernel_command_line_extra = load_kernel_command_line_extra(args)
-
- if args.secure_boot and args.verb != Verb.genkey:
- if args.secure_boot_key is None:
- die("UEFI SecureBoot enabled, but couldn't find private key.",
- hint="Consider placing it in mkosi.secure-boot.key")
-
- if args.secure_boot_certificate is None:
- die("UEFI SecureBoot enabled, but couldn't find certificate.",
- hint="Consider placing it in mkosi.secure-boot.crt")
-
- if args.sign_expected_pcr is True and not shutil.which("systemd-measure"):
- die("Couldn't find systemd-measure needed for the --sign-expected-pcr option.")
-
- if args.sign_expected_pcr is None:
- args.sign_expected_pcr = bool(shutil.which("systemd-measure"))
-
- # Resolve passwords late so we can accurately determine whether a build is needed
- find_password(args)
-
- if args.verb in (Verb.shell, Verb.boot):
- opname = "acquire shell" if args.verb == Verb.shell else "boot"
- if args.output_format in (OutputFormat.tar, OutputFormat.cpio):
- die(f"Sorry, can't {opname} with a {args.output_format} archive.")
- if args.compress_output != Compression.none:
- die(f"Sorry, can't {opname} with a compressed image.")
-
- if args.repo_dirs and not (is_dnf_distribution(args.distribution) or args.distribution == Distribution.arch):
- die("--repo-dir is only supported on DNF based distributions and Arch")
-
- if args.qemu_kvm is True and not qemu_check_kvm_support():
- die("Sorry, the host machine does not support KVM acceleration.")
-
- if args.qemu_kvm is None:
- args.qemu_kvm = qemu_check_kvm_support()
-
- if args.repositories and not is_dnf_distribution(args.distribution) and args.distribution not in (Distribution.debian, Distribution.ubuntu):
- die("Sorry, the --repositories option is only supported on DNF/Debian based distributions")
-
- if args.initrds:
- args.initrds = [p.absolute() for p in args.initrds]
- for p in args.initrds:
- if not p.exists():
- die(f"Initrd {p} not found")
- if not p.is_file():
- die(f"Initrd {p} is not a file")
-
- if args.overlay and not args.base_trees:
- die("--overlay can only be used with --base-tree")
-
- # For unprivileged builds we need the userxattr OverlayFS mount option, which is only available in Linux v5.11 and later.
- with prepend_to_environ_path(args.extra_search_paths):
- if (args.build_script is not None or args.base_trees) and GenericVersion(platform.release()) < GenericVersion("5.11") and os.geteuid() != 0:
- die("This unprivileged build configuration requires at least Linux v5.11")
-
- return MkosiConfig(**vars(args))
-
-
def cache_tree_paths(config: MkosiConfig) -> tuple[Path, Path]:
# If the image ID is specified, use cache file names that are independent of the image versions, so that
die("Must be invoked as root.")
-def machine_name(config: Union[MkosiConfig, argparse.Namespace]) -> str:
- return config.image_id or config.output.with_suffix("").name.partition("_")[0]
-
-
def machine_cid(config: MkosiConfig) -> int:
cid = int.from_bytes(hashlib.sha256(machine_name(config).encode()).digest()[:4], byteorder='little')
# Make sure we don't return any of the well-known CIDs.
die("Couldn't find OVMF UEFI variables file.")
-def qemu_check_kvm_support() -> bool:
- kvm = Path("/dev/kvm")
- if not kvm.is_char_device():
- return False
- # some CI runners may present a non-working KVM device
- try:
- with kvm.open("r+b"):
- return True
- except OSError:
- return False
-
-
@contextlib.contextmanager
def start_swtpm() -> Iterator[Optional[Path]]:
Path("mkosi.version").write_text(new_version + "\n")
-@contextlib.contextmanager
-def prepend_to_environ_path(paths: Sequence[Path]) -> Iterator[None]:
- if not paths:
- yield
- return
-
- with tempfile.TemporaryDirectory(prefix="mkosi.path", dir=tmp_dir()) as d:
-
- for path in paths:
- if not path.is_dir():
- Path(d).joinpath(path.name).symlink_to(path.absolute())
-
- paths = [Path(d), *paths]
-
- news = [os.fspath(path) for path in paths if path.is_dir()]
- olds = os.getenv("PATH", "").split(":")
- os.environ["PATH"] = ":".join(news + olds)
-
- yield
-
-
def expand_specifier(s: str) -> str:
return s.replace("%u", current_user().name)
import sys
from collections.abc import Iterator
-from mkosi import load_args, run_verb
-from mkosi.config import MkosiConfigParser
+from mkosi import run_verb
+from mkosi.config import MkosiConfigParser, load_args
from mkosi.log import ARG_DEBUG, die, log_setup
from mkosi.run import excepthook
import enum
import fnmatch
import functools
+import logging
import os.path
import platform
import shlex
+import shutil
+import subprocess
import sys
import textwrap
from collections.abc import Sequence
from pathlib import Path
from typing import Any, Callable, Optional, Type, Union, cast
-from mkosi.log import Style, die
+from mkosi.log import ARG_DEBUG, Style, die
from mkosi.pager import page
+from mkosi.run import run
from mkosi.util import (
Compression,
Distribution,
current_user,
detect_distribution,
flatten,
+ is_dnf_distribution,
+ prepend_to_environ_path,
+ qemu_check_kvm_support,
)
__version__ = "14"
+MKOSI_COMMANDS_CMDLINE = (Verb.build, Verb.shell, Verb.boot, Verb.qemu, Verb.ssh)
ConfigParseCallback = Callable[[str, Optional[str], argparse.Namespace], Any]
ConfigMatchCallback = Callable[[str, str, argparse.Namespace], bool]
setattr(namespace, s.dest, default)
return namespace
+
+
+
+@functools.total_ordering
+class GenericVersion:
+ def __init__(self, version: str):
+ self._version = version
+
+ def __eq__(self, other: object) -> bool:
+ if not isinstance(other, GenericVersion):
+ return False
+ cmd = ["systemd-analyze", "compare-versions", self._version, "eq", other._version]
+ return run(cmd, check=False).returncode == 0
+
+ def __lt__(self, other: object) -> bool:
+ if not isinstance(other, GenericVersion):
+ return False
+ cmd = ["systemd-analyze", "compare-versions", self._version, "lt", other._version]
+ return run(cmd, check=False).returncode == 0
+
+
+
+@dataclasses.dataclass(frozen=True)
+class MkosiConfig:
+ """Type-hinted storage for command line arguments.
+
+ Only user configuration is stored here while dynamic state exists in
+ MkosiState. If a field of the same name exists in both classes always
+ access the value from state.
+ """
+
+ verb: Verb
+ cmdline: list[str]
+ force: int
+
+ distribution: Distribution
+ release: str
+ mirror: Optional[str]
+ local_mirror: Optional[str]
+ repository_key_check: bool
+ repositories: list[str]
+ repo_dirs: list[Path]
+ repart_dirs: list[Path]
+ overlay: bool
+ architecture: str
+ output_format: OutputFormat
+ manifest_format: list[ManifestFormat]
+ output: Path
+ output_dir: Optional[Path]
+ kernel_command_line: list[str]
+ secure_boot: bool
+ secure_boot_key: Optional[Path]
+ secure_boot_certificate: Optional[Path]
+ secure_boot_valid_days: str
+ secure_boot_common_name: str
+ sign_expected_pcr: bool
+ compress_output: Compression
+ image_version: Optional[str]
+ image_id: Optional[str]
+ tar_strip_selinux_context: bool
+ incremental: bool
+ cache_initrd: bool
+ packages: list[str]
+ remove_packages: list[str]
+ with_docs: bool
+ with_tests: bool
+ cache_dir: Optional[Path]
+ base_trees: list[Path]
+ extra_trees: list[tuple[Path, Optional[Path]]]
+ skeleton_trees: list[tuple[Path, Optional[Path]]]
+ clean_package_metadata: Optional[bool]
+ remove_files: list[str]
+ environment: dict[str, str]
+ build_sources: Path
+ build_dir: Optional[Path]
+ install_dir: Optional[Path]
+ build_packages: list[str]
+ build_script: Optional[Path]
+ prepare_script: Optional[Path]
+ postinst_script: Optional[Path]
+ finalize_script: Optional[Path]
+ with_network: bool
+ cache_only: bool
+ nspawn_settings: Optional[Path]
+ checksum: bool
+ split_artifacts: bool
+ sign: bool
+ key: Optional[str]
+ password: Optional[str]
+ password_is_hashed: bool
+ autologin: bool
+ extra_search_paths: list[Path]
+ ephemeral: bool
+ ssh: bool
+ credentials: dict[str, str]
+ directory: Optional[Path]
+ debug: list[str]
+ auto_bump: bool
+ workspace_dir: Optional[Path]
+ initrds: list[Path]
+ make_initrd: bool
+ kernel_command_line_extra: list[str]
+ acl: bool
+ pager: bool
+ bootable: Optional[bool]
+
+ # QEMU-specific options
+ qemu_gui: bool
+ qemu_smp: str
+ qemu_mem: str
+ qemu_kvm: bool
+ qemu_args: Sequence[str]
+
+ passphrase: Optional[Path]
+
+ def architecture_is_native(self) -> bool:
+ return self.architecture == platform.machine()
+
+ @property
+ def output_split_uki(self) -> Path:
+ return build_auxiliary_output_path(self, ".efi")
+
+ @property
+ def output_split_kernel(self) -> Path:
+ return build_auxiliary_output_path(self, ".vmlinuz")
+
+ @property
+ def output_nspawn_settings(self) -> Path:
+ return build_auxiliary_output_path(self, ".nspawn")
+
+ @property
+ def output_checksum(self) -> Path:
+ return Path("SHA256SUMS")
+
+ @property
+ def output_signature(self) -> Path:
+ return Path("SHA256SUMS.gpg")
+
+ @property
+ def output_sshkey(self) -> Path:
+ return build_auxiliary_output_path(self, ".ssh")
+
+ @property
+ def output_manifest(self) -> Path:
+ return build_auxiliary_output_path(self, ".manifest")
+
+ @property
+ def output_changelog(self) -> Path:
+ return build_auxiliary_output_path(self, ".changelog")
+
+ @property
+ def output_compressed(self) -> Path:
+ if self.compress_output == Compression.none:
+ return self.output
+
+ return self.output.parent / f"{self.output.name}.{self.compress_output.name}"
+
+ def output_paths(self) -> tuple[Path, ...]:
+ return (
+ self.output,
+ self.output_split_uki,
+ self.output_split_kernel,
+ self.output_nspawn_settings,
+ self.output_checksum,
+ self.output_signature,
+ self.output_sshkey,
+ self.output_manifest,
+ self.output_changelog,
+ )
+
+
+
+def strip_suffixes(path: Path) -> Path:
+ while path.suffix in {
+ ".xz",
+ ".zstd",
+ ".raw",
+ ".tar",
+ ".cpio",
+ }:
+ path = path.with_suffix("")
+
+ return path
+
+
+def build_auxiliary_output_path(args: Union[argparse.Namespace, MkosiConfig], suffix: str) -> Path:
+ output = strip_suffixes(args.output)
+ return output.with_name(f"{output.name}{suffix}")
+
+
+def find_image_version(args: argparse.Namespace) -> None:
+ if args.image_version is not None:
+ return
+
+ try:
+ with open("mkosi.version") as f:
+ args.image_version = f.read().strip()
+ except FileNotFoundError:
+ pass
+
+
+def require_private_file(name: Path, description: str) -> None:
+ mode = os.stat(name).st_mode & 0o777
+ if mode & 0o007:
+ logging.warning(textwrap.dedent(f"""\
+ Permissions of '{name}' of '{mode:04o}' are too open.
+ When creating {description} files use an access mode that restricts access to the owner only.
+ """))
+
+
+def find_password(args: argparse.Namespace) -> None:
+ if args.password is not None:
+ return
+
+ try:
+ pwfile = Path("mkosi.rootpw")
+ require_private_file(pwfile, "root password")
+
+ args.password = pwfile.read_text().strip()
+
+ except FileNotFoundError:
+ pass
+
+
+
+def machine_name(config: Union[MkosiConfig, argparse.Namespace]) -> str:
+ return config.image_id or config.output.with_suffix("").name.partition("_")[0]
+
+
+def load_credentials(args: argparse.Namespace) -> dict[str, str]:
+ creds = {}
+
+ d = Path("mkosi.credentials")
+ if d.is_dir():
+ for e in d.iterdir():
+ if os.access(e, os.X_OK):
+ creds[e.name] = run([e], text=True, stdout=subprocess.PIPE, env=os.environ).stdout
+ else:
+ creds[e.name] = e.read_text()
+
+ for s in args.credentials:
+ key, _, value = s.partition("=")
+ creds[key] = value
+
+ if "firstboot.timezone" not in creds:
+ tz = run(
+ ["timedatectl", "show", "-p", "Timezone", "--value"],
+ text=True,
+ stdout=subprocess.PIPE,
+ ).stdout.strip()
+ creds["firstboot.timezone"] = tz
+
+ if "firstboot.locale" not in creds:
+ creds["firstboot.locale"] = "C.UTF-8"
+
+ if "firstboot.hostname" not in creds:
+ creds["firstboot.hostname"] = machine_name(args)
+
+ if args.ssh and "ssh.authorized_keys.root" not in creds and "SSH_AUTH_SOCK" in os.environ:
+ key = run(
+ ["ssh-add", "-L"],
+ text=True,
+ stdout=subprocess.PIPE,
+ env=os.environ,
+ ).stdout.strip()
+ creds["ssh.authorized_keys.root"] = key
+
+ return creds
+
+
+def load_kernel_command_line_extra(args: argparse.Namespace) -> list[str]:
+ columns, lines = shutil.get_terminal_size()
+
+ cmdline = [
+ f"systemd.tty.term.hvc0={os.getenv('TERM', 'vt220')}",
+ f"systemd.tty.columns.hvc0={columns}",
+ f"systemd.tty.rows.hvc0={lines}",
+ f"systemd.tty.term.ttyS0={os.getenv('TERM', 'vt220')}",
+ f"systemd.tty.columns.ttyS0={columns}",
+ f"systemd.tty.rows.ttyS0={lines}",
+ "console=hvc0",
+ ]
+
+ if args.output_format == OutputFormat.cpio:
+ cmdline += ["rd.systemd.unit=default.target"]
+
+ for s in args.kernel_command_line_extra:
+ key, sep, value = s.partition("=")
+ if " " in value:
+ value = f'"{value}"'
+ cmdline += [key if not sep else f"{key}={value}"]
+
+ return cmdline
+
+
+def load_args(args: argparse.Namespace) -> MkosiConfig:
+ ARG_DEBUG.update(args.debug)
+
+ find_image_version(args)
+
+ if args.cmdline and args.verb not in MKOSI_COMMANDS_CMDLINE:
+ die(f"Parameters after verb are only accepted for {' '.join(verb.name for verb in MKOSI_COMMANDS_CMDLINE)}.")
+
+ if args.verb == Verb.qemu and args.output_format in (
+ OutputFormat.directory,
+ OutputFormat.subvolume,
+ OutputFormat.tar,
+ ):
+ die("Directory, subvolume, tar, cpio, and plain squashfs images cannot be booted in qemu.")
+
+ if shutil.which("bsdtar") and args.distribution == Distribution.openmandriva and args.tar_strip_selinux_context:
+ die("Sorry, bsdtar on OpenMandriva is incompatible with --tar-strip-selinux-context")
+
+ if args.cache_dir:
+ args.cache_dir = args.cache_dir / f"{args.distribution}~{args.release}"
+ if args.build_dir:
+ args.build_dir = args.build_dir / f"{args.distribution}~{args.release}"
+ if args.output_dir:
+ args.output_dir = args.output_dir / f"{args.distribution}~{args.release}"
+
+ if args.mirror is None:
+ if args.distribution in (Distribution.fedora, Distribution.centos):
+ args.mirror = None
+ elif args.distribution == Distribution.debian:
+ args.mirror = "http://deb.debian.org/debian"
+ elif args.distribution == Distribution.ubuntu:
+ if args.architecture == "x86" or args.architecture == "x86_64":
+ args.mirror = "http://archive.ubuntu.com/ubuntu"
+ else:
+ args.mirror = "http://ports.ubuntu.com"
+ elif args.distribution == Distribution.arch:
+ if args.architecture == "aarch64":
+ args.mirror = "http://mirror.archlinuxarm.org"
+ else:
+ args.mirror = "https://geo.mirror.pkgbuild.com"
+ elif args.distribution == Distribution.opensuse:
+ args.mirror = "https://download.opensuse.org"
+ elif args.distribution == Distribution.rocky:
+ args.mirror = None
+ elif args.distribution == Distribution.alma:
+ args.mirror = None
+
+ if args.sign:
+ args.checksum = True
+
+ if args.compress_output is None:
+ args.compress_output = Compression.zst if args.output_format == OutputFormat.cpio else Compression.none
+
+ if args.output is None:
+ iid = args.image_id if args.image_id is not None else "image"
+ prefix = f"{iid}_{args.image_version}" if args.image_version is not None else iid
+
+ if args.output_format == OutputFormat.disk:
+ output = f"{prefix}.raw"
+ elif args.output_format == OutputFormat.tar:
+ output = f"{prefix}.tar"
+ elif args.output_format == OutputFormat.cpio:
+ output = f"{prefix}.cpio"
+ else:
+ output = prefix
+ args.output = Path(output)
+
+ if args.output_dir is not None:
+ if "/" not in str(args.output):
+ args.output = args.output_dir / args.output
+ else:
+ logging.warning("Ignoring configured output directory as output file is a qualified path.")
+
+ args.output = args.output.absolute()
+
+ if args.environment:
+ env = {}
+ for s in args.environment:
+ key, sep, value = s.partition("=")
+ value = value if sep else os.getenv(key, "")
+ env[key] = value
+ args.environment = env
+ else:
+ args.environment = {}
+
+ args.credentials = load_credentials(args)
+ args.kernel_command_line_extra = load_kernel_command_line_extra(args)
+
+ if args.secure_boot and args.verb != Verb.genkey:
+ if args.secure_boot_key is None:
+ die("UEFI SecureBoot enabled, but couldn't find private key.",
+ hint="Consider placing it in mkosi.secure-boot.key")
+
+ if args.secure_boot_certificate is None:
+ die("UEFI SecureBoot enabled, but couldn't find certificate.",
+ hint="Consider placing it in mkosi.secure-boot.crt")
+
+ if args.sign_expected_pcr is True and not shutil.which("systemd-measure"):
+ die("Couldn't find systemd-measure needed for the --sign-expected-pcr option.")
+
+ if args.sign_expected_pcr is None:
+ args.sign_expected_pcr = bool(shutil.which("systemd-measure"))
+
+ # Resolve passwords late so we can accurately determine whether a build is needed
+ find_password(args)
+
+ if args.verb in (Verb.shell, Verb.boot):
+ opname = "acquire shell" if args.verb == Verb.shell else "boot"
+ if args.output_format in (OutputFormat.tar, OutputFormat.cpio):
+ die(f"Sorry, can't {opname} with a {args.output_format} archive.")
+ if args.compress_output != Compression.none:
+ die(f"Sorry, can't {opname} with a compressed image.")
+
+ if args.repo_dirs and not (is_dnf_distribution(args.distribution) or args.distribution == Distribution.arch):
+ die("--repo-dir is only supported on DNF based distributions and Arch")
+
+ if args.qemu_kvm is True and not qemu_check_kvm_support():
+ die("Sorry, the host machine does not support KVM acceleration.")
+
+ if args.qemu_kvm is None:
+ args.qemu_kvm = qemu_check_kvm_support()
+
+ if args.repositories and not is_dnf_distribution(args.distribution) and args.distribution not in (Distribution.debian, Distribution.ubuntu):
+ die("Sorry, the --repositories option is only supported on DNF/Debian based distributions")
+
+ if args.initrds:
+ args.initrds = [p.absolute() for p in args.initrds]
+ for p in args.initrds:
+ if not p.exists():
+ die(f"Initrd {p} not found")
+ if not p.is_file():
+ die(f"Initrd {p} is not a file")
+
+ if args.overlay and not args.base_trees:
+ die("--overlay can only be used with --base-tree")
+
+ # For unprivileged builds we need the userxattr OverlayFS mount option, which is only available in Linux v5.11 and later.
+ with prepend_to_environ_path(args.extra_search_paths):
+ if (args.build_script is not None or args.base_trees) and GenericVersion(platform.release()) < GenericVersion("5.11") and os.geteuid() != 0:
+ die("This unprivileged build configuration requires at least Linux v5.11")
+
+ return MkosiConfig(**vars(args))
+
from collections.abc import Sequence
from pathlib import Path
+from mkosi.config import MkosiConfig
from mkosi.distributions import DistributionInstaller
from mkosi.distributions.fedora import Repo, invoke_dnf, setup_dnf
from mkosi.log import complete_step, die
from mkosi.remove import unlink_try_hard
from mkosi.state import MkosiState
-from mkosi.util import Distribution, MkosiConfig
+from mkosi.util import Distribution
def move_rpm_db(root: Path) -> None:
# SPDX-License-Identifier: LGPL-2.1+
import dataclasses
-import functools
import json
from datetime import datetime
from pathlib import Path
from textwrap import dedent
from typing import IO, Any, Optional
+from mkosi.config import MkosiConfig
from mkosi.run import run
-from mkosi.util import Distribution, ManifestFormat, MkosiConfig, PackageType
+from mkosi.util import Distribution, ManifestFormat, PackageType
@dataclasses.dataclass
for package in self.source_packages.values():
print(f"\n{80*'-'}\n", file=out)
out.write(package.report())
-
-
-@functools.total_ordering
-class GenericVersion:
- def __init__(self, version: str):
- self._version = version
-
- def __eq__(self, other: object) -> bool:
- if not isinstance(other, GenericVersion):
- return False
- cmd = ["systemd-analyze", "compare-versions", self._version, "eq", other._version]
- return run(cmd, check=False).returncode == 0
-
- def __lt__(self, other: object) -> bool:
- if not isinstance(other, GenericVersion):
- return False
- cmd = ["systemd-analyze", "compare-versions", self._version, "lt", other._version]
- return run(cmd, check=False).returncode == 0
from pathlib import Path
from typing import Callable, Deque, Optional, TypeVar, Union, cast
+from mkosi.config import GenericVersion
from mkosi.log import complete_step
-from mkosi.manifest import GenericVersion
from mkosi.run import run
from mkosi.types import PathString
import importlib
from pathlib import Path
+from mkosi.config import MkosiConfig
from mkosi.distributions import DistributionInstaller
from mkosi.log import die
-from mkosi.util import MkosiConfig
@dataclasses.dataclass
# SPDX-License-Identifier: LGPL-2.1+
-import argparse
import ast
import contextlib
import dataclasses
import getpass
import itertools
import os
-import platform
import pwd
import re
import resource
import shutil
import sys
import tarfile
+import tempfile
from collections.abc import Iterable, Iterator, Sequence
from pathlib import Path
-from typing import Any, Callable, Optional, TypeVar, Union
+from typing import Any, Callable, Optional, TypeVar
from mkosi.log import die
changelog = "changelog" # human-readable text file with package changelogs
-KNOWN_SUFFIXES = {
- ".xz",
- ".zstd",
- ".raw",
- ".tar",
- ".cpio",
-}
-
-
-def strip_suffixes(path: Path) -> Path:
- while path.suffix in KNOWN_SUFFIXES:
- path = path.with_suffix("")
- return path
-
-
-@dataclasses.dataclass(frozen=True)
-class MkosiConfig:
- """Type-hinted storage for command line arguments.
-
- Only user configuration is stored here while dynamic state exists in
- MkosiState. If a field of the same name exists in both classes always
- access the value from state.
- """
-
- verb: Verb
- cmdline: list[str]
- force: int
-
- distribution: Distribution
- release: str
- mirror: Optional[str]
- local_mirror: Optional[str]
- repository_key_check: bool
- repositories: list[str]
- repo_dirs: list[Path]
- repart_dirs: list[Path]
- overlay: bool
- architecture: str
- output_format: OutputFormat
- manifest_format: list[ManifestFormat]
- output: Path
- output_dir: Optional[Path]
- kernel_command_line: list[str]
- secure_boot: bool
- secure_boot_key: Optional[Path]
- secure_boot_certificate: Optional[Path]
- secure_boot_valid_days: str
- secure_boot_common_name: str
- sign_expected_pcr: bool
- compress_output: Compression
- image_version: Optional[str]
- image_id: Optional[str]
- tar_strip_selinux_context: bool
- incremental: bool
- cache_initrd: bool
- packages: list[str]
- remove_packages: list[str]
- with_docs: bool
- with_tests: bool
- cache_dir: Optional[Path]
- base_trees: list[Path]
- extra_trees: list[tuple[Path, Optional[Path]]]
- skeleton_trees: list[tuple[Path, Optional[Path]]]
- clean_package_metadata: Optional[bool]
- remove_files: list[str]
- environment: dict[str, str]
- build_sources: Path
- build_dir: Optional[Path]
- install_dir: Optional[Path]
- build_packages: list[str]
- build_script: Optional[Path]
- prepare_script: Optional[Path]
- postinst_script: Optional[Path]
- finalize_script: Optional[Path]
- with_network: bool
- cache_only: bool
- nspawn_settings: Optional[Path]
- checksum: bool
- split_artifacts: bool
- sign: bool
- key: Optional[str]
- password: Optional[str]
- password_is_hashed: bool
- autologin: bool
- extra_search_paths: list[Path]
- ephemeral: bool
- ssh: bool
- credentials: dict[str, str]
- directory: Optional[Path]
- debug: list[str]
- auto_bump: bool
- workspace_dir: Optional[Path]
- initrds: list[Path]
- make_initrd: bool
- kernel_command_line_extra: list[str]
- acl: bool
- pager: bool
- bootable: Optional[bool]
-
- # QEMU-specific options
- qemu_gui: bool
- qemu_smp: str
- qemu_mem: str
- qemu_kvm: bool
- qemu_args: Sequence[str]
-
- passphrase: Optional[Path]
-
- def architecture_is_native(self) -> bool:
- return self.architecture == platform.machine()
-
- @property
- def output_split_uki(self) -> Path:
- return build_auxiliary_output_path(self, ".efi")
-
- @property
- def output_split_kernel(self) -> Path:
- return build_auxiliary_output_path(self, ".vmlinuz")
-
- @property
- def output_nspawn_settings(self) -> Path:
- return build_auxiliary_output_path(self, ".nspawn")
-
- @property
- def output_checksum(self) -> Path:
- return Path("SHA256SUMS")
-
- @property
- def output_signature(self) -> Path:
- return Path("SHA256SUMS.gpg")
-
- @property
- def output_sshkey(self) -> Path:
- return build_auxiliary_output_path(self, ".ssh")
-
- @property
- def output_manifest(self) -> Path:
- return build_auxiliary_output_path(self, ".manifest")
-
- @property
- def output_changelog(self) -> Path:
- return build_auxiliary_output_path(self, ".changelog")
-
- @property
- def output_compressed(self) -> Path:
- if self.compress_output == Compression.none:
- return self.output
-
- return self.output.parent / f"{self.output.name}.{self.compress_output.name}"
-
- def output_paths(self) -> tuple[Path, ...]:
- return (
- self.output,
- self.output_split_uki,
- self.output_split_kernel,
- self.output_nspawn_settings,
- self.output_checksum,
- self.output_signature,
- self.output_sshkey,
- self.output_manifest,
- self.output_changelog,
- )
-
-
-def build_auxiliary_output_path(args: Union[argparse.Namespace, MkosiConfig], suffix: str) -> Path:
- output = strip_suffixes(args.output)
- return output.with_name(f"{output.name}{suffix}")
-
def format_rlimit(rlimit: int) -> str:
limits = resource.getrlimit(rlimit)
yield
finally:
os.chdir(old)
+
+
+@contextlib.contextmanager
+def prepend_to_environ_path(paths: Sequence[Path]) -> Iterator[None]:
+ if not paths:
+ yield
+ return
+
+ with tempfile.TemporaryDirectory(prefix="mkosi.path", dir=tmp_dir()) as d:
+
+ for path in paths:
+ if not path.is_dir():
+ Path(d).joinpath(path.name).symlink_to(path.absolute())
+
+ paths = [Path(d), *paths]
+
+ news = [os.fspath(path) for path in paths if path.is_dir()]
+ olds = os.getenv("PATH", "").split(":")
+ os.environ["PATH"] = ":".join(news + olds)
+
+ yield
+
+
+def qemu_check_kvm_support() -> bool:
+ kvm = Path("/dev/kvm")
+ if not kvm.is_char_device():
+ return False
+ # some CI runners may present a non-working KVM device
+ try:
+ with kvm.open("r+b"):
+ return True
+ except OSError:
+ return False
--- /dev/null
+# SPDX-License-Identifier: LGPL-2.1+
+
+from pathlib import Path
+
+from mkosi.config import strip_suffixes
+
+
+def test_strip_suffixes() -> None:
+ assert strip_suffixes(Path("home/test.zstd")) == Path("home/test")
+ assert strip_suffixes(Path("home/test.xz")) == Path("home/test")
+ assert strip_suffixes(Path("home/test.raw")) == Path("home/test")
+ assert strip_suffixes(Path("home/test.tar")) == Path("home/test")
+ assert strip_suffixes(Path("home/test.cpio")) == Path("home/test")
+ assert strip_suffixes(Path("home.xz/test.xz")) == Path("home.xz/test")
+ assert strip_suffixes(Path("home.xz/test")) == Path("home.xz/test")
+ assert strip_suffixes(Path("home.xz/test.txt")) == Path("home.xz/test.txt")
import pytest
-import mkosi
-from mkosi.util import Compression, Distribution, MkosiConfig, Verb
-from mkosi.config import MkosiConfigParser
+from mkosi.util import Compression, Distribution, Verb
+from mkosi.config import MkosiConfigParser, MkosiConfig, load_args
@contextmanager
def parse(argv: Optional[List[str]] = None) -> MkosiConfig:
- return mkosi.load_args(MkosiConfigParser().parse(argv))
+ return load_args(MkosiConfigParser().parse(argv))
def test_parse_load_verb() -> None:
PackageType,
safe_tar_extract,
set_umask,
- strip_suffixes,
)
def test_distribution() -> None:
assert not (evil_target / name).exists()
assert not (Path("/tmp") / name).exists()
-
-def test_strip_suffixes() -> None:
- assert strip_suffixes(Path("home/test.zstd")) == Path("home/test")
- assert strip_suffixes(Path("home/test.xz")) == Path("home/test")
- assert strip_suffixes(Path("home/test.raw")) == Path("home/test")
- assert strip_suffixes(Path("home/test.tar")) == Path("home/test")
- assert strip_suffixes(Path("home/test.cpio")) == Path("home/test")
- assert strip_suffixes(Path("home.xz/test.xz")) == Path("home.xz/test")
- assert strip_suffixes(Path("home.xz/test")) == Path("home.xz/test")
- assert strip_suffixes(Path("home.xz/test.txt")) == Path("home.xz/test.txt")