]> git.ipfire.org Git - thirdparty/mkosi.git/commitdiff
Split MkosiArgs from MkosiConfig
authorDaan De Meyer <daan.j.demeyer@gmail.com>
Tue, 25 Apr 2023 09:21:50 +0000 (11:21 +0200)
committerDaan De Meyer <daan.j.demeyer@gmail.com>
Tue, 25 Apr 2023 15:53:21 +0000 (17:53 +0200)
Let's put the arguments that are not associated with an image into
a separate dataclass.

mkosi/__init__.py
mkosi/__main__.py
mkosi/config.py
tests/test_parse_load_args.py

index 0a52e8bea0ea3ccc8881f73709a3d74ef0608e8e..a2a6f414e7b6f7d20c855d47f84d62cd6305273b 100644 (file)
@@ -24,7 +24,7 @@ from pathlib import Path
 from textwrap import dedent
 from typing import Callable, ContextManager, Optional, TextIO, TypeVar, Union, cast
 
-from mkosi.config import GenericVersion, MkosiConfig, machine_name
+from mkosi.config import GenericVersion, MkosiArgs, MkosiConfig, machine_name
 from mkosi.install import add_dropin_config_from_resource, copy_path, flock
 from mkosi.log import ARG_DEBUG, Style, color_error, complete_step, die, log_step
 from mkosi.manifest import Manifest
@@ -932,7 +932,7 @@ def empty_directory(path: Path) -> None:
         pass
 
 
-def unlink_output(config: MkosiConfig) -> None:
+def unlink_output(args: MkosiArgs, config: MkosiConfig) -> None:
     with complete_step("Removing output files…"):
         if config.output.parent.exists():
             for p in config.output.parent.iterdir():
@@ -970,12 +970,12 @@ def unlink_output(config: MkosiConfig) -> None:
     # remove the downloaded package cache if the user specified one
     # additional "--force".
 
-    if config.verb == Verb.clean:
-        remove_build_cache = config.force > 0
-        remove_package_cache = config.force > 1
+    if args.verb == Verb.clean:
+        remove_build_cache = args.force > 0
+        remove_package_cache = args.force > 1
     else:
-        remove_build_cache = config.force > 1
-        remove_package_cache = config.force > 2
+        remove_build_cache = args.force > 1
+        remove_package_cache = args.force > 2
 
     if remove_build_cache:
         with complete_step("Removing incremental cache files…"):
@@ -1121,7 +1121,7 @@ def line_join_source_target_list(array: Sequence[tuple[Path, Optional[Path]]]) -
     return "\n                            ".join(items)
 
 
-def print_summary(config: MkosiConfig) -> None:
+def print_summary(args: MkosiArgs, config: MkosiConfig) -> None:
     b = Style.bold
     e = Style.reset
     bold: Callable[..., str] = lambda s: f"{b}{s}{e}"
@@ -1131,8 +1131,8 @@ def print_summary(config: MkosiConfig) -> None:
 
     summary = f"""\
 {bold("COMMANDS")}:
-                      verb: {bold(config.verb)}
-                   cmdline: {bold(" ".join(config.cmdline))}
+                      verb: {bold(args.verb)}
+                   cmdline: {bold(" ".join(args.cmdline))}
 
 {bold("DISTRIBUTION")}
               Distribution: {bold(config.distribution.name)}
@@ -1201,7 +1201,7 @@ def print_summary(config: MkosiConfig) -> None:
                    GPG Key: ({"default" if config.key is None else config.key})
         """
 
-    page(summary, config.pager)
+    page(summary, args.pager)
 
 
 def make_output_dir(state: MkosiState) -> None:
@@ -1577,11 +1577,11 @@ def run_build_script(state: MkosiState) -> None:
                               stdout=sys.stdout, env=env | state.environment)
 
 
-def need_cache_tree(state: MkosiState) -> bool:
+def need_cache_tree(args: MkosiArgs, state: MkosiState) -> bool:
     if not state.config.incremental:
         return False
 
-    if state.config.force > 1:
+    if args.force > 1:
         return True
 
     final, build = cache_tree_paths(state.config)
@@ -1589,7 +1589,7 @@ def need_cache_tree(state: MkosiState) -> bool:
     return not final.exists() or (state.config.build_script is not None and not build.exists())
 
 
-def build_stuff(uid: int, gid: int, config: MkosiConfig) -> None:
+def build_stuff(uid: int, gid: int, args: MkosiArgs, config: MkosiConfig) -> None:
     workspace = tempfile.TemporaryDirectory(dir=config.workspace_dir or Path.cwd(), prefix=".mkosi.tmp")
     workspace_dir = Path(workspace.name)
     cache = config.cache_dir or workspace_dir / "cache"
@@ -1613,7 +1613,7 @@ def build_stuff(uid: int, gid: int, config: MkosiConfig) -> None:
     # while we are working on it.
     with flock(workspace_dir), workspace:
         # If caching is requested, then make sure we have cache trees around we can make use of
-        if need_cache_tree(state):
+        if need_cache_tree(args, state):
             with complete_step("Building cache image"):
                 build_image(state, for_cache=True)
                 save_cache(state)
@@ -1673,7 +1673,7 @@ def nspawn_knows_arg(arg: str) -> bool:
     return "unrecognized option" not in c.stderr
 
 
-def run_shell(config: MkosiConfig) -> None:
+def run_shell(args: MkosiArgs, config: MkosiConfig) -> None:
     cmdline: list[PathString] = ["systemd-nspawn", "--quiet"]
 
     if config.output_format in (OutputFormat.directory, OutputFormat.subvolume):
@@ -1689,7 +1689,7 @@ def run_shell(config: MkosiConfig) -> None:
     if config.nspawn_settings is not None:
         cmdline += ["--settings=trusted"]
 
-    if config.verb == Verb.boot:
+    if args.verb == Verb.boot:
         cmdline += ["--boot"]
     else:
         cmdline += [f"--rlimit=RLIMIT_CORE={format_rlimit(resource.RLIMIT_CORE)}"]
@@ -1708,16 +1708,16 @@ def run_shell(config: MkosiConfig) -> None:
     for k, v in config.credentials.items():
         cmdline += [f"--set-credential={k}:{v}"]
 
-    if config.verb == Verb.boot:
+    if args.verb == Verb.boot:
         # Add nspawn options first since systemd-nspawn ignores all options after the first argument.
-        cmdline += config.cmdline
+        cmdline += args.cmdline
         # kernel cmdline config of the form systemd.xxx= get interpreted by systemd when running in nspawn as
         # well.
         cmdline += config.kernel_command_line
         cmdline += config.kernel_command_line_extra
-    elif config.cmdline:
+    elif args.cmdline:
         cmdline += ["--"]
-        cmdline += config.cmdline
+        cmdline += args.cmdline
 
     uid = InvokingUser.uid()
 
@@ -1854,7 +1854,7 @@ def start_swtpm() -> Iterator[Optional[Path]]:
             swtpm_proc.wait()
 
 
-def run_qemu(config: MkosiConfig) -> None:
+def run_qemu(args: MkosiArgs, config: MkosiConfig) -> None:
     accel = "kvm" if config.qemu_kvm else "tcg"
 
     firmware, fw_supports_sb = find_qemu_firmware(config)
@@ -1941,7 +1941,7 @@ def run_qemu(config: MkosiConfig) -> None:
         # Debian images fail to boot with virtio-scsi, see: https://github.com/systemd/mkosi/issues/725
         if config.output_format == OutputFormat.cpio:
             kernel = (config.output_dir or Path.cwd()) / config.output_split_kernel
-            if not kernel.exists() and "-kernel" not in config.cmdline:
+            if not kernel.exists() and "-kernel" not in args.cmdline:
                 die("No kernel found, please install a kernel in the cpio or provide a -kernel argument to mkosi qemu")
             cmdline += ["-kernel", kernel,
                         "-initrd", fname,
@@ -1964,12 +1964,12 @@ def run_qemu(config: MkosiConfig) -> None:
                 cmdline += ["-device", "tpm-tis-device,tpmdev=tpm0"]
 
         cmdline += config.qemu_args
-        cmdline += config.cmdline
+        cmdline += args.cmdline
 
         run(cmdline, stdin=sys.stdin, stdout=sys.stdout, env=os.environ, log=False)
 
 
-def run_ssh(config: MkosiConfig) -> None:
+def run_ssh(args: MkosiArgs, config: MkosiConfig) -> None:
     cmd = [
         "ssh",
         # Silence known hosts file errors/warnings.
@@ -1980,7 +1980,7 @@ def run_ssh(config: MkosiConfig) -> None:
         "root@mkosi",
     ]
 
-    cmd += config.cmdline
+    cmd += args.cmdline
 
     run(cmd, stdin=sys.stdin, stdout=sys.stdout, env=os.environ, log=False)
 
@@ -1998,7 +1998,7 @@ def run_serve(config: MkosiConfig) -> None:
         httpd.serve_forever()
 
 
-def generate_secure_boot_key(config: MkosiConfig) -> None:
+def generate_secure_boot_key(args: MkosiArgs, config: MkosiConfig) -> None:
     """Generate secure boot keys using openssl"""
 
     keylength = 2048
@@ -2006,7 +2006,7 @@ def generate_secure_boot_key(config: MkosiConfig) -> None:
     cn = expand_specifier(config.secure_boot_common_name)
 
     for f in (config.secure_boot_key, config.secure_boot_certificate):
-        if f and not config.force:
+        if f and not args.force:
             die(f"{f} already exists",
                 hint=("To generate new secure boot keys, "
                       f"first remove {config.secure_boot_key} {config.secure_boot_certificate}"))
@@ -2065,43 +2065,43 @@ def expand_specifier(s: str) -> str:
     return s.replace("%u", InvokingUser.name())
 
 
-def needs_build(config: Union[argparse.Namespace, MkosiConfig]) -> bool:
-    return config.verb == Verb.build or (config.verb in MKOSI_COMMANDS_NEED_BUILD and (not config.output_compressed.exists() or config.force > 0))
+def needs_build(args: MkosiArgs, config: MkosiConfig) -> bool:
+    return args.verb == Verb.build or (args.verb in MKOSI_COMMANDS_NEED_BUILD and (not config.output_compressed.exists() or args.force > 0))
 
 
-def run_verb(config: MkosiConfig) -> None:
+def run_verb(args: MkosiArgs, config: MkosiConfig) -> None:
     with prepend_to_environ_path(config.extra_search_paths):
-        if config.verb == Verb.genkey:
-            return generate_secure_boot_key(config)
+        if args.verb == Verb.genkey:
+            return generate_secure_boot_key(args, config)
 
-        if config.verb == Verb.bump:
+        if args.verb == Verb.bump:
             return bump_image_version(config)
 
-        if config.verb == Verb.summary:
-            return print_summary(config)
+        if args.verb == Verb.summary:
+            return print_summary(args, config)
 
-        if config.verb in MKOSI_COMMANDS_SUDO:
+        if args.verb in MKOSI_COMMANDS_SUDO:
             check_root()
 
-        if config.verb == Verb.build:
+        if args.verb == Verb.build:
             check_inputs(config)
 
-            if not config.force:
+            if not args.force:
                 check_outputs(config)
 
-        if needs_build(config) or config.verb == Verb.clean:
+        if needs_build(args, config) or args.verb == Verb.clean:
             def target() -> None:
                 become_root()
-                unlink_output(config)
+                unlink_output(args, config)
 
             fork_and_wait(target)
 
-        if needs_build(config):
+        if needs_build(args, config):
             def target() -> None:
                 # Get the user UID/GID either on the host or in the user namespace running the build
                 uid, gid = become_root()
                 init_mount_namespace()
-                build_stuff(uid, gid, config)
+                build_stuff(uid, gid, args, config)
 
             # We only want to run the build in a user namespace but not the following steps. Since we can't
             # rejoin the parent user namespace after unsharing from it, let's run the build in a fork so that
@@ -2111,14 +2111,14 @@ def run_verb(config: MkosiConfig) -> None:
             if config.auto_bump:
                 bump_image_version(config)
 
-        if config.verb in (Verb.shell, Verb.boot):
-            run_shell(config)
+        if args.verb in (Verb.shell, Verb.boot):
+            run_shell(args, config)
 
-        if config.verb == Verb.qemu:
-            run_qemu(config)
+        if args.verb == Verb.qemu:
+            run_qemu(args, config)
 
-        if config.verb == Verb.ssh:
-            run_ssh(config)
+        if args.verb == Verb.ssh:
+            run_ssh(args, config)
 
-        if config.verb == Verb.serve:
+        if args.verb == Verb.serve:
             run_serve(config)
index e701dc6ce4f3584737d75317a223c29e80401246..65ddc73d2b670398659643ef6bf17da9bcc36b4e 100644 (file)
@@ -42,18 +42,18 @@ def propagate_failed_return() -> Iterator[None]:
 @propagate_failed_return()
 def main() -> None:
     log_setup()
-    config = MkosiConfigParser().parse()
+    args, config = MkosiConfigParser().parse()
 
     if ARG_DEBUG.get():
         logging.getLogger().setLevel(logging.DEBUG)
 
-    if config.directory:
-        if config.directory.is_dir():
-            os.chdir(config.directory)
+    if args.directory:
+        if args.directory.is_dir():
+            os.chdir(args.directory)
         else:
-            die(f"Error: {config.directory} is not a directory!")
+            die(f"Error: {args.directory} is not a directory!")
 
-    run_verb(config)
+    run_verb(args, config)
 
 
 if __name__ == "__main__":
index dc3879eaef40b08ceea3a9b335cd65dcc47ad82e..23ad5bf87f4e2e6bbfd8b734ecfa133f1c484e1b 100644 (file)
@@ -4,6 +4,7 @@ import dataclasses
 import enum
 import fnmatch
 import functools
+import inspect
 import logging
 import operator
 import os.path
@@ -480,6 +481,24 @@ class PagerHelpAction(argparse._HelpAction):
         parser.exit()
 
 
+@dataclasses.dataclass(frozen=True)
+class MkosiArgs:
+    verb: Verb
+    cmdline: list[str]
+    force: int
+    directory: Optional[Path]
+    debug: bool
+    debug_shell: bool
+    pager: bool
+
+    @classmethod
+    def from_namespace(cls, ns: argparse.Namespace) -> "MkosiArgs":
+        return cls(**{
+            k: v for k, v in vars(ns).items()
+            if k in inspect.signature(cls).parameters
+        })
+
+
 @dataclasses.dataclass(frozen=True)
 class MkosiConfig:
     """Type-hinted storage for command line arguments.
@@ -489,10 +508,6 @@ class MkosiConfig:
     access the value from state.
     """
 
-    verb: Verb
-    cmdline: list[str]
-    force: int
-
     distribution: Distribution
     release: str
     mirror: Optional[str]
@@ -552,16 +567,12 @@ class MkosiConfig:
     ephemeral: bool
     ssh: bool
     credentials: dict[str, str]
-    directory: Optional[Path]
-    debug: bool
-    debug_shell: bool
     auto_bump: bool
     workspace_dir: Optional[Path]
     initrds: list[Path]
     make_initrd: bool
     kernel_command_line_extra: list[str]
     acl: bool
-    pager: bool
     bootable: Optional[bool]
 
     # QEMU-specific options
@@ -573,6 +584,13 @@ class MkosiConfig:
 
     passphrase: Optional[Path]
 
+    @classmethod
+    def from_namespace(cls, ns: argparse.Namespace) -> "MkosiConfig":
+        return cls(**{
+            k: v for k, v in vars(ns).items()
+            if k in inspect.signature(cls).parameters
+        })
+
     def architecture_is_native(self) -> bool:
         return self.architecture == platform.machine()
 
@@ -1166,6 +1184,7 @@ class MkosiConfigParser:
             help="Change to specified directory before doing anything",
             type=Path,
             metavar="PATH",
+            default=None,
         )
         parser.add_argument(
             "--debug",
@@ -1656,43 +1675,41 @@ class MkosiConfigParser:
 
         return parser
 
-    def parse(self, args: Optional[Sequence[str]] = None) -> MkosiConfig:
+    def parse(self, argv: Optional[Sequence[str]] = None) -> tuple[MkosiArgs, MkosiConfig]:
         namespace = argparse.Namespace()
 
-        if args is None:
-            args = sys.argv[1:]
-        args = list(args)
+        if argv is None:
+            argv = sys.argv[1:]
+        argv = list(argv)
 
         # Make sure the verb command gets explicitly passed. Insert a -- before the positional verb argument
         # otherwise it might be considered as an argument of a parameter with nargs='?'. For example mkosi -i
         # summary would be treated as -i=summary.
         for verb in Verb:
             try:
-                v_i = args.index(verb.name)
+                v_i = argv.index(verb.name)
             except ValueError:
                 continue
 
-            if v_i > 0 and args[v_i - 1] != "--":
-                args.insert(v_i, "--")
+            if v_i > 0 and argv[v_i - 1] != "--":
+                argv.insert(v_i, "--")
             break
         else:
-            args += ["--", "build"]
+            argv += ["--", "build"]
 
         argparser = self.create_argument_parser()
-        argparser.parse_args(args, namespace)
+        argparser.parse_args(argv, namespace)
 
-        if namespace.verb == Verb.help:
-            PagerHelpAction.__call__(None, argparser, namespace)  # type: ignore
+        args = load_args(namespace)
 
-        if "directory" not in namespace:
-            setattr(namespace, "directory", None)
+        if args.verb == Verb.help:
+            PagerHelpAction.__call__(None, argparser, namespace)  # type: ignore
 
-        if namespace.directory and not namespace.directory.is_dir():
+        if args.directory and not args.directory.is_dir():
             die(f"Error: {namespace.directory} is not a directory!")
 
-        p = namespace.directory or Path.cwd()
-        with chdir(p):
-            self.parse_config(namespace.directory or Path.cwd(), namespace)
+        with chdir(args.directory or Path.cwd()):
+            self.parse_config(Path("."), namespace)
 
         for s in self.SETTINGS:
             if s.dest in namespace:
@@ -1707,7 +1724,7 @@ class MkosiConfigParser:
 
             setattr(namespace, s.dest, default)
 
-        return load_args(namespace)
+        return args, load_config(namespace)
 
 
 class GenericVersion:
@@ -1873,10 +1890,14 @@ def load_kernel_command_line_extra(args: argparse.Namespace) -> list[str]:
     return cmdline
 
 
-def load_args(args: argparse.Namespace) -> MkosiConfig:
+def load_args(args: argparse.Namespace) -> MkosiArgs:
     ARG_DEBUG.set(args.debug)
     ARG_DEBUG_SHELL.set(args.debug_shell)
 
+    return MkosiArgs.from_namespace(args)
+
+
+def load_config(args: argparse.Namespace) -> MkosiConfig:
     find_image_version(args)
 
     if args.cmdline and args.verb not in MKOSI_COMMANDS_CMDLINE:
@@ -1997,4 +2018,5 @@ def load_args(args: argparse.Namespace) -> MkosiConfig:
         if (args.build_script is not None or args.base_trees) and GenericVersion(platform.release()) < GenericVersion("5.11") and os.geteuid() != 0:
             die("This unprivileged build configuration requires at least Linux v5.11")
 
-    return MkosiConfig(**vars(args))
+    return MkosiConfig.from_namespace(args)
+
index 9a58bf0c8ee80e301b0af15b1bc6462c1bc7fefd..9eebe3eefa5460f3348c873b4461c0d181e6c3a9 100644 (file)
@@ -13,7 +13,7 @@ from typing import Iterator, List, Optional
 import pytest
 
 from mkosi.util import Compression, Distribution, Verb
-from mkosi.config import MkosiConfigParser, MkosiConfig
+from mkosi.config import MkosiConfigParser, MkosiConfig, MkosiArgs
 
 
 @contextmanager
@@ -28,23 +28,23 @@ def cd_temp_dir() -> Iterator[None]:
             chdir(old_dir)
 
 
-def parse(argv: Optional[List[str]] = None) -> MkosiConfig:
+def parse(argv: Optional[List[str]] = None) -> tuple[MkosiArgs, MkosiConfig]:
     return MkosiConfigParser().parse(argv)
 
 
 def test_parse_load_verb() -> None:
     with cd_temp_dir():
-        assert parse(["build"]).verb == Verb.build
-        assert parse(["clean"]).verb == Verb.clean
+        assert parse(["build"])[0].verb == Verb.build
+        assert parse(["clean"])[0].verb == Verb.clean
         with pytest.raises(SystemExit):
             parse(["help"])
-        assert parse(["genkey"]).verb == Verb.genkey
-        assert parse(["bump"]).verb == Verb.bump
-        assert parse(["serve"]).verb == Verb.serve
-        assert parse(["build"]).verb == Verb.build
-        assert parse(["shell"]).verb == Verb.shell
-        assert parse(["boot"]).verb == Verb.boot
-        assert parse(["qemu"]).verb == Verb.qemu
+        assert parse(["genkey"])[0].verb == Verb.genkey
+        assert parse(["bump"])[0].verb == Verb.bump
+        assert parse(["serve"])[0].verb == Verb.serve
+        assert parse(["build"])[0].verb == Verb.build
+        assert parse(["shell"])[0].verb == Verb.shell
+        assert parse(["boot"])[0].verb == Verb.boot
+        assert parse(["qemu"])[0].verb == Verb.qemu
         with pytest.raises(SystemExit):
             parse(["invalid"])
 
@@ -52,7 +52,7 @@ def test_parse_load_verb() -> None:
 def test_os_distribution() -> None:
     with cd_temp_dir():
         for dist in Distribution:
-            assert parse(["-d", dist.name]).distribution == dist
+            assert parse(["-d", dist.name])[1].distribution == dist
 
         with pytest.raises(tuple((argparse.ArgumentError, SystemExit))):
             parse(["-d", "invalidDistro"])
@@ -62,7 +62,7 @@ def test_os_distribution() -> None:
         for dist in Distribution:
             config = Path("mkosi.conf")
             config.write_text(f"[Distribution]\nDistribution={dist}")
-            assert parse([]).distribution == dist
+            assert parse([])[1].distribution == dist
 
 
 def test_parse_config_files_filter() -> None:
@@ -73,7 +73,7 @@ def test_parse_config_files_filter() -> None:
         (confd / "10-file.conf").write_text("[Content]\nPackages=yes")
         (confd / "20-file.noconf").write_text("[Content]\nPackages=nope")
 
-        assert parse([]).packages == ["yes"]
+        assert parse([])[1].packages == ["yes"]
 
 
 def test_shell_boot() -> None:
@@ -90,7 +90,7 @@ def test_shell_boot() -> None:
 
 def test_compression() -> None:
     with cd_temp_dir():
-        assert parse(["--format", "disk", "--compress-output", "False"]).compress_output == Compression.none
+        assert parse(["--format", "disk", "--compress-output", "False"])[1].compress_output == Compression.none
 
 
 @pytest.mark.parametrize("dist1,dist2", itertools.combinations_with_replacement(Distribution, 2))