From: Daan De Meyer Date: Tue, 25 Apr 2023 09:21:50 +0000 (+0200) Subject: Split MkosiArgs from MkosiConfig X-Git-Tag: v15~196^2~6 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c74c231d825d4b3549d7013cc77fd0ee40e79b3e;p=thirdparty%2Fmkosi.git Split MkosiArgs from MkosiConfig Let's put the arguments that are not associated with an image into a separate dataclass. --- diff --git a/mkosi/__init__.py b/mkosi/__init__.py index 0a52e8bea..a2a6f414e 100644 --- a/mkosi/__init__.py +++ b/mkosi/__init__.py @@ -24,7 +24,7 @@ from pathlib import Path from textwrap import dedent from typing import Callable, ContextManager, Optional, TextIO, TypeVar, Union, cast -from mkosi.config import GenericVersion, MkosiConfig, machine_name +from mkosi.config import GenericVersion, MkosiArgs, MkosiConfig, machine_name from mkosi.install import add_dropin_config_from_resource, copy_path, flock from mkosi.log import ARG_DEBUG, Style, color_error, complete_step, die, log_step from mkosi.manifest import Manifest @@ -932,7 +932,7 @@ def empty_directory(path: Path) -> None: pass -def unlink_output(config: MkosiConfig) -> None: +def unlink_output(args: MkosiArgs, config: MkosiConfig) -> None: with complete_step("Removing output files…"): if config.output.parent.exists(): for p in config.output.parent.iterdir(): @@ -970,12 +970,12 @@ def unlink_output(config: MkosiConfig) -> None: # remove the downloaded package cache if the user specified one # additional "--force". - if config.verb == Verb.clean: - remove_build_cache = config.force > 0 - remove_package_cache = config.force > 1 + if args.verb == Verb.clean: + remove_build_cache = args.force > 0 + remove_package_cache = args.force > 1 else: - remove_build_cache = config.force > 1 - remove_package_cache = config.force > 2 + remove_build_cache = args.force > 1 + remove_package_cache = args.force > 2 if remove_build_cache: with complete_step("Removing incremental cache files…"): @@ -1121,7 +1121,7 @@ def line_join_source_target_list(array: Sequence[tuple[Path, Optional[Path]]]) - return "\n ".join(items) -def print_summary(config: MkosiConfig) -> None: +def print_summary(args: MkosiArgs, config: MkosiConfig) -> None: b = Style.bold e = Style.reset bold: Callable[..., str] = lambda s: f"{b}{s}{e}" @@ -1131,8 +1131,8 @@ def print_summary(config: MkosiConfig) -> None: summary = f"""\ {bold("COMMANDS")}: - verb: {bold(config.verb)} - cmdline: {bold(" ".join(config.cmdline))} + verb: {bold(args.verb)} + cmdline: {bold(" ".join(args.cmdline))} {bold("DISTRIBUTION")} Distribution: {bold(config.distribution.name)} @@ -1201,7 +1201,7 @@ def print_summary(config: MkosiConfig) -> None: GPG Key: ({"default" if config.key is None else config.key}) """ - page(summary, config.pager) + page(summary, args.pager) def make_output_dir(state: MkosiState) -> None: @@ -1577,11 +1577,11 @@ def run_build_script(state: MkosiState) -> None: stdout=sys.stdout, env=env | state.environment) -def need_cache_tree(state: MkosiState) -> bool: +def need_cache_tree(args: MkosiArgs, state: MkosiState) -> bool: if not state.config.incremental: return False - if state.config.force > 1: + if args.force > 1: return True final, build = cache_tree_paths(state.config) @@ -1589,7 +1589,7 @@ def need_cache_tree(state: MkosiState) -> bool: return not final.exists() or (state.config.build_script is not None and not build.exists()) -def build_stuff(uid: int, gid: int, config: MkosiConfig) -> None: +def build_stuff(uid: int, gid: int, args: MkosiArgs, config: MkosiConfig) -> None: workspace = tempfile.TemporaryDirectory(dir=config.workspace_dir or Path.cwd(), prefix=".mkosi.tmp") workspace_dir = Path(workspace.name) cache = config.cache_dir or workspace_dir / "cache" @@ -1613,7 +1613,7 @@ def build_stuff(uid: int, gid: int, config: MkosiConfig) -> None: # while we are working on it. with flock(workspace_dir), workspace: # If caching is requested, then make sure we have cache trees around we can make use of - if need_cache_tree(state): + if need_cache_tree(args, state): with complete_step("Building cache image"): build_image(state, for_cache=True) save_cache(state) @@ -1673,7 +1673,7 @@ def nspawn_knows_arg(arg: str) -> bool: return "unrecognized option" not in c.stderr -def run_shell(config: MkosiConfig) -> None: +def run_shell(args: MkosiArgs, config: MkosiConfig) -> None: cmdline: list[PathString] = ["systemd-nspawn", "--quiet"] if config.output_format in (OutputFormat.directory, OutputFormat.subvolume): @@ -1689,7 +1689,7 @@ def run_shell(config: MkosiConfig) -> None: if config.nspawn_settings is not None: cmdline += ["--settings=trusted"] - if config.verb == Verb.boot: + if args.verb == Verb.boot: cmdline += ["--boot"] else: cmdline += [f"--rlimit=RLIMIT_CORE={format_rlimit(resource.RLIMIT_CORE)}"] @@ -1708,16 +1708,16 @@ def run_shell(config: MkosiConfig) -> None: for k, v in config.credentials.items(): cmdline += [f"--set-credential={k}:{v}"] - if config.verb == Verb.boot: + if args.verb == Verb.boot: # Add nspawn options first since systemd-nspawn ignores all options after the first argument. - cmdline += config.cmdline + cmdline += args.cmdline # kernel cmdline config of the form systemd.xxx= get interpreted by systemd when running in nspawn as # well. cmdline += config.kernel_command_line cmdline += config.kernel_command_line_extra - elif config.cmdline: + elif args.cmdline: cmdline += ["--"] - cmdline += config.cmdline + cmdline += args.cmdline uid = InvokingUser.uid() @@ -1854,7 +1854,7 @@ def start_swtpm() -> Iterator[Optional[Path]]: swtpm_proc.wait() -def run_qemu(config: MkosiConfig) -> None: +def run_qemu(args: MkosiArgs, config: MkosiConfig) -> None: accel = "kvm" if config.qemu_kvm else "tcg" firmware, fw_supports_sb = find_qemu_firmware(config) @@ -1941,7 +1941,7 @@ def run_qemu(config: MkosiConfig) -> None: # Debian images fail to boot with virtio-scsi, see: https://github.com/systemd/mkosi/issues/725 if config.output_format == OutputFormat.cpio: kernel = (config.output_dir or Path.cwd()) / config.output_split_kernel - if not kernel.exists() and "-kernel" not in config.cmdline: + if not kernel.exists() and "-kernel" not in args.cmdline: die("No kernel found, please install a kernel in the cpio or provide a -kernel argument to mkosi qemu") cmdline += ["-kernel", kernel, "-initrd", fname, @@ -1964,12 +1964,12 @@ def run_qemu(config: MkosiConfig) -> None: cmdline += ["-device", "tpm-tis-device,tpmdev=tpm0"] cmdline += config.qemu_args - cmdline += config.cmdline + cmdline += args.cmdline run(cmdline, stdin=sys.stdin, stdout=sys.stdout, env=os.environ, log=False) -def run_ssh(config: MkosiConfig) -> None: +def run_ssh(args: MkosiArgs, config: MkosiConfig) -> None: cmd = [ "ssh", # Silence known hosts file errors/warnings. @@ -1980,7 +1980,7 @@ def run_ssh(config: MkosiConfig) -> None: "root@mkosi", ] - cmd += config.cmdline + cmd += args.cmdline run(cmd, stdin=sys.stdin, stdout=sys.stdout, env=os.environ, log=False) @@ -1998,7 +1998,7 @@ def run_serve(config: MkosiConfig) -> None: httpd.serve_forever() -def generate_secure_boot_key(config: MkosiConfig) -> None: +def generate_secure_boot_key(args: MkosiArgs, config: MkosiConfig) -> None: """Generate secure boot keys using openssl""" keylength = 2048 @@ -2006,7 +2006,7 @@ def generate_secure_boot_key(config: MkosiConfig) -> None: cn = expand_specifier(config.secure_boot_common_name) for f in (config.secure_boot_key, config.secure_boot_certificate): - if f and not config.force: + if f and not args.force: die(f"{f} already exists", hint=("To generate new secure boot keys, " f"first remove {config.secure_boot_key} {config.secure_boot_certificate}")) @@ -2065,43 +2065,43 @@ def expand_specifier(s: str) -> str: return s.replace("%u", InvokingUser.name()) -def needs_build(config: Union[argparse.Namespace, MkosiConfig]) -> bool: - return config.verb == Verb.build or (config.verb in MKOSI_COMMANDS_NEED_BUILD and (not config.output_compressed.exists() or config.force > 0)) +def needs_build(args: MkosiArgs, config: MkosiConfig) -> bool: + return args.verb == Verb.build or (args.verb in MKOSI_COMMANDS_NEED_BUILD and (not config.output_compressed.exists() or args.force > 0)) -def run_verb(config: MkosiConfig) -> None: +def run_verb(args: MkosiArgs, config: MkosiConfig) -> None: with prepend_to_environ_path(config.extra_search_paths): - if config.verb == Verb.genkey: - return generate_secure_boot_key(config) + if args.verb == Verb.genkey: + return generate_secure_boot_key(args, config) - if config.verb == Verb.bump: + if args.verb == Verb.bump: return bump_image_version(config) - if config.verb == Verb.summary: - return print_summary(config) + if args.verb == Verb.summary: + return print_summary(args, config) - if config.verb in MKOSI_COMMANDS_SUDO: + if args.verb in MKOSI_COMMANDS_SUDO: check_root() - if config.verb == Verb.build: + if args.verb == Verb.build: check_inputs(config) - if not config.force: + if not args.force: check_outputs(config) - if needs_build(config) or config.verb == Verb.clean: + if needs_build(args, config) or args.verb == Verb.clean: def target() -> None: become_root() - unlink_output(config) + unlink_output(args, config) fork_and_wait(target) - if needs_build(config): + if needs_build(args, config): def target() -> None: # Get the user UID/GID either on the host or in the user namespace running the build uid, gid = become_root() init_mount_namespace() - build_stuff(uid, gid, config) + build_stuff(uid, gid, args, config) # We only want to run the build in a user namespace but not the following steps. Since we can't # rejoin the parent user namespace after unsharing from it, let's run the build in a fork so that @@ -2111,14 +2111,14 @@ def run_verb(config: MkosiConfig) -> None: if config.auto_bump: bump_image_version(config) - if config.verb in (Verb.shell, Verb.boot): - run_shell(config) + if args.verb in (Verb.shell, Verb.boot): + run_shell(args, config) - if config.verb == Verb.qemu: - run_qemu(config) + if args.verb == Verb.qemu: + run_qemu(args, config) - if config.verb == Verb.ssh: - run_ssh(config) + if args.verb == Verb.ssh: + run_ssh(args, config) - if config.verb == Verb.serve: + if args.verb == Verb.serve: run_serve(config) diff --git a/mkosi/__main__.py b/mkosi/__main__.py index e701dc6ce..65ddc73d2 100644 --- a/mkosi/__main__.py +++ b/mkosi/__main__.py @@ -42,18 +42,18 @@ def propagate_failed_return() -> Iterator[None]: @propagate_failed_return() def main() -> None: log_setup() - config = MkosiConfigParser().parse() + args, config = MkosiConfigParser().parse() if ARG_DEBUG.get(): logging.getLogger().setLevel(logging.DEBUG) - if config.directory: - if config.directory.is_dir(): - os.chdir(config.directory) + if args.directory: + if args.directory.is_dir(): + os.chdir(args.directory) else: - die(f"Error: {config.directory} is not a directory!") + die(f"Error: {args.directory} is not a directory!") - run_verb(config) + run_verb(args, config) if __name__ == "__main__": diff --git a/mkosi/config.py b/mkosi/config.py index dc3879eae..23ad5bf87 100644 --- a/mkosi/config.py +++ b/mkosi/config.py @@ -4,6 +4,7 @@ import dataclasses import enum import fnmatch import functools +import inspect import logging import operator import os.path @@ -480,6 +481,24 @@ class PagerHelpAction(argparse._HelpAction): parser.exit() +@dataclasses.dataclass(frozen=True) +class MkosiArgs: + verb: Verb + cmdline: list[str] + force: int + directory: Optional[Path] + debug: bool + debug_shell: bool + pager: bool + + @classmethod + def from_namespace(cls, ns: argparse.Namespace) -> "MkosiArgs": + return cls(**{ + k: v for k, v in vars(ns).items() + if k in inspect.signature(cls).parameters + }) + + @dataclasses.dataclass(frozen=True) class MkosiConfig: """Type-hinted storage for command line arguments. @@ -489,10 +508,6 @@ class MkosiConfig: access the value from state. """ - verb: Verb - cmdline: list[str] - force: int - distribution: Distribution release: str mirror: Optional[str] @@ -552,16 +567,12 @@ class MkosiConfig: ephemeral: bool ssh: bool credentials: dict[str, str] - directory: Optional[Path] - debug: bool - debug_shell: bool auto_bump: bool workspace_dir: Optional[Path] initrds: list[Path] make_initrd: bool kernel_command_line_extra: list[str] acl: bool - pager: bool bootable: Optional[bool] # QEMU-specific options @@ -573,6 +584,13 @@ class MkosiConfig: passphrase: Optional[Path] + @classmethod + def from_namespace(cls, ns: argparse.Namespace) -> "MkosiConfig": + return cls(**{ + k: v for k, v in vars(ns).items() + if k in inspect.signature(cls).parameters + }) + def architecture_is_native(self) -> bool: return self.architecture == platform.machine() @@ -1166,6 +1184,7 @@ class MkosiConfigParser: help="Change to specified directory before doing anything", type=Path, metavar="PATH", + default=None, ) parser.add_argument( "--debug", @@ -1656,43 +1675,41 @@ class MkosiConfigParser: return parser - def parse(self, args: Optional[Sequence[str]] = None) -> MkosiConfig: + def parse(self, argv: Optional[Sequence[str]] = None) -> tuple[MkosiArgs, MkosiConfig]: namespace = argparse.Namespace() - if args is None: - args = sys.argv[1:] - args = list(args) + if argv is None: + argv = sys.argv[1:] + argv = list(argv) # Make sure the verb command gets explicitly passed. Insert a -- before the positional verb argument # otherwise it might be considered as an argument of a parameter with nargs='?'. For example mkosi -i # summary would be treated as -i=summary. for verb in Verb: try: - v_i = args.index(verb.name) + v_i = argv.index(verb.name) except ValueError: continue - if v_i > 0 and args[v_i - 1] != "--": - args.insert(v_i, "--") + if v_i > 0 and argv[v_i - 1] != "--": + argv.insert(v_i, "--") break else: - args += ["--", "build"] + argv += ["--", "build"] argparser = self.create_argument_parser() - argparser.parse_args(args, namespace) + argparser.parse_args(argv, namespace) - if namespace.verb == Verb.help: - PagerHelpAction.__call__(None, argparser, namespace) # type: ignore + args = load_args(namespace) - if "directory" not in namespace: - setattr(namespace, "directory", None) + if args.verb == Verb.help: + PagerHelpAction.__call__(None, argparser, namespace) # type: ignore - if namespace.directory and not namespace.directory.is_dir(): + if args.directory and not args.directory.is_dir(): die(f"Error: {namespace.directory} is not a directory!") - p = namespace.directory or Path.cwd() - with chdir(p): - self.parse_config(namespace.directory or Path.cwd(), namespace) + with chdir(args.directory or Path.cwd()): + self.parse_config(Path("."), namespace) for s in self.SETTINGS: if s.dest in namespace: @@ -1707,7 +1724,7 @@ class MkosiConfigParser: setattr(namespace, s.dest, default) - return load_args(namespace) + return args, load_config(namespace) class GenericVersion: @@ -1873,10 +1890,14 @@ def load_kernel_command_line_extra(args: argparse.Namespace) -> list[str]: return cmdline -def load_args(args: argparse.Namespace) -> MkosiConfig: +def load_args(args: argparse.Namespace) -> MkosiArgs: ARG_DEBUG.set(args.debug) ARG_DEBUG_SHELL.set(args.debug_shell) + return MkosiArgs.from_namespace(args) + + +def load_config(args: argparse.Namespace) -> MkosiConfig: find_image_version(args) if args.cmdline and args.verb not in MKOSI_COMMANDS_CMDLINE: @@ -1997,4 +2018,5 @@ def load_args(args: argparse.Namespace) -> MkosiConfig: if (args.build_script is not None or args.base_trees) and GenericVersion(platform.release()) < GenericVersion("5.11") and os.geteuid() != 0: die("This unprivileged build configuration requires at least Linux v5.11") - return MkosiConfig(**vars(args)) + return MkosiConfig.from_namespace(args) + diff --git a/tests/test_parse_load_args.py b/tests/test_parse_load_args.py index 9a58bf0c8..9eebe3eef 100644 --- a/tests/test_parse_load_args.py +++ b/tests/test_parse_load_args.py @@ -13,7 +13,7 @@ from typing import Iterator, List, Optional import pytest from mkosi.util import Compression, Distribution, Verb -from mkosi.config import MkosiConfigParser, MkosiConfig +from mkosi.config import MkosiConfigParser, MkosiConfig, MkosiArgs @contextmanager @@ -28,23 +28,23 @@ def cd_temp_dir() -> Iterator[None]: chdir(old_dir) -def parse(argv: Optional[List[str]] = None) -> MkosiConfig: +def parse(argv: Optional[List[str]] = None) -> tuple[MkosiArgs, MkosiConfig]: return MkosiConfigParser().parse(argv) def test_parse_load_verb() -> None: with cd_temp_dir(): - assert parse(["build"]).verb == Verb.build - assert parse(["clean"]).verb == Verb.clean + assert parse(["build"])[0].verb == Verb.build + assert parse(["clean"])[0].verb == Verb.clean with pytest.raises(SystemExit): parse(["help"]) - assert parse(["genkey"]).verb == Verb.genkey - assert parse(["bump"]).verb == Verb.bump - assert parse(["serve"]).verb == Verb.serve - assert parse(["build"]).verb == Verb.build - assert parse(["shell"]).verb == Verb.shell - assert parse(["boot"]).verb == Verb.boot - assert parse(["qemu"]).verb == Verb.qemu + assert parse(["genkey"])[0].verb == Verb.genkey + assert parse(["bump"])[0].verb == Verb.bump + assert parse(["serve"])[0].verb == Verb.serve + assert parse(["build"])[0].verb == Verb.build + assert parse(["shell"])[0].verb == Verb.shell + assert parse(["boot"])[0].verb == Verb.boot + assert parse(["qemu"])[0].verb == Verb.qemu with pytest.raises(SystemExit): parse(["invalid"]) @@ -52,7 +52,7 @@ def test_parse_load_verb() -> None: def test_os_distribution() -> None: with cd_temp_dir(): for dist in Distribution: - assert parse(["-d", dist.name]).distribution == dist + assert parse(["-d", dist.name])[1].distribution == dist with pytest.raises(tuple((argparse.ArgumentError, SystemExit))): parse(["-d", "invalidDistro"]) @@ -62,7 +62,7 @@ def test_os_distribution() -> None: for dist in Distribution: config = Path("mkosi.conf") config.write_text(f"[Distribution]\nDistribution={dist}") - assert parse([]).distribution == dist + assert parse([])[1].distribution == dist def test_parse_config_files_filter() -> None: @@ -73,7 +73,7 @@ def test_parse_config_files_filter() -> None: (confd / "10-file.conf").write_text("[Content]\nPackages=yes") (confd / "20-file.noconf").write_text("[Content]\nPackages=nope") - assert parse([]).packages == ["yes"] + assert parse([])[1].packages == ["yes"] def test_shell_boot() -> None: @@ -90,7 +90,7 @@ def test_shell_boot() -> None: def test_compression() -> None: with cd_temp_dir(): - assert parse(["--format", "disk", "--compress-output", "False"]).compress_output == Compression.none + assert parse(["--format", "disk", "--compress-output", "False"])[1].compress_output == Compression.none @pytest.mark.parametrize("dist1,dist2", itertools.combinations_with_replacement(Distribution, 2))