]> git.ipfire.org Git - thirdparty/mkosi.git/commitdiff
Rework output related logic 1538/head
authorDaan De Meyer <daan.j.demeyer@gmail.com>
Thu, 4 May 2023 10:47:24 +0000 (12:47 +0200)
committerDaan De Meyer <daan.j.demeyer@gmail.com>
Fri, 5 May 2023 08:26:58 +0000 (10:26 +0200)
- Let's store the output as a "base" name without any suffixes from
which we construct all other output paths.
- Let's not allow --output to be specified as a path anymore and
always put it in the configured output directory.
- Let's return all output paths as strings instead of paths. As
strings, we'll get typing errors if we try to use these as paths
without joining them with a directory first, which can be either the
output directory or the staging directory.
- Let's always create a symlink from the "base" name to the full
output path so it can be referred to regardless of the output
format, compression or image version that's used.
- Let's make sure we compress split partitions if requested

mkosi/__init__.py
mkosi/config.py

index 6b0bec5c5a8c99279a4d780658bfa57b9514e511..73e9bb84aaf267efc15156889b9a1bbded3d480e 100644 (file)
@@ -23,13 +23,7 @@ from pathlib import Path
 from textwrap import dedent
 from typing import Callable, ContextManager, Optional, TextIO, TypeVar, Union, cast
 
-from mkosi.config import (
-    GenericVersion,
-    MkosiArgs,
-    MkosiConfig,
-    MkosiConfigParser,
-    machine_name,
-)
+from mkosi.config import GenericVersion, MkosiArgs, MkosiConfig, MkosiConfigParser
 from mkosi.install import add_dropin_config_from_resource, copy_path, flock
 from mkosi.log import Style, color_error, complete_step, die, log_step
 from mkosi.manifest import Manifest
@@ -444,7 +438,7 @@ def run_finalize_script(state: MkosiState) -> None:
 
     with complete_step("Running finalize script…"):
         run([state.config.finalize_script],
-            env={**state.environment, "BUILDROOT": str(state.root), "OUTPUTDIR": str(state.config.output_dir or Path.cwd())})
+            env={**state.environment, "BUILDROOT": str(state.root), "OUTPUTDIR": str(state.config.output_dir)})
 
 
 def install_boot_loader(state: MkosiState) -> None:
@@ -578,19 +572,6 @@ def gzip_binary() -> str:
     return "pigz" if shutil.which("pigz") else "gzip"
 
 
-def compressor_command(compression: Compression, src: Path) -> list[PathString]:
-    """Returns a command suitable for compressing archives."""
-
-    if compression == Compression.gz:
-        return [gzip_binary(), "--fast", src]
-    elif compression == Compression.xz:
-        return ["xz", "--check=crc32", "--fast", "-T0", src]
-    elif compression == Compression.zst:
-        return ["zstd", "-q", "-T0", "--rm", src]
-    else:
-        die(f"Unknown compression {compression}")
-
-
 def tar_binary() -> str:
     # Some distros (Mandriva) install BSD tar as "tar", hence prefer
     # "gtar" if it exists, which should be GNU tar wherever it exists.
@@ -606,11 +587,15 @@ def make_tar(state: MkosiState) -> None:
     if state.config.output_format != OutputFormat.tar:
         return
 
-    cmd: list[PathString] = [tar_binary(), "-C", state.root, "-c", "--xattrs", "--xattrs-include=*"]
-    if state.config.tar_strip_selinux_context:
-        cmd += ["--xattrs-exclude=security.selinux"]
-
-    cmd += [".", "-f", state.staging / state.config.output.name]
+    cmd: list[PathString] = [
+        tar_binary(),
+        "-C", state.root,
+        "-c", "--xattrs",
+        "--xattrs-include=*",
+        "--file", state.staging / state.config.output_with_format,
+        *(["--xattrs-exclude=security.selinux"] if state.config.tar_strip_selinux_context else []),
+        ".",
+    ]
 
     with complete_step("Creating archive…"):
         run(cmd)
@@ -626,7 +611,7 @@ def make_initrd(state: MkosiState) -> None:
     if state.config.output_format != OutputFormat.cpio:
         return
 
-    make_cpio(state.root, find_files(state.root, state.root), state.staging / state.config.output.name)
+    make_cpio(state.root, find_files(state.root, state.root), state.staging / state.config.output_with_format)
 
 
 def make_cpio(root: Path, files: Iterator[Path], output: Path) -> None:
@@ -649,7 +634,7 @@ def make_directory(state: MkosiState) -> None:
     if state.config.output_format != OutputFormat.directory:
         return
 
-    os.rename(state.root, state.staging / state.config.output.name)
+    state.root.rename(state.staging / state.config.output_with_format)
 
 
 def gen_kernel_images(state: MkosiState) -> Iterator[tuple[str, Path]]:
@@ -695,7 +680,7 @@ def install_unified_kernel(state: MkosiState, roothash: Optional[str]) -> None:
         return
 
     for kver, kimg in gen_kernel_images(state):
-        copy_path(state.root / kimg, state.staging / state.config.output_split_kernel.name)
+        copy_path(state.root / kimg, state.staging / state.config.output_split_kernel)
         break
 
     if state.config.output_format == OutputFormat.cpio and state.config.bootable is None:
@@ -738,10 +723,11 @@ def install_unified_kernel(state: MkosiState, roothash: Optional[str]) -> None:
                 "build",
             ])
 
-            unlink_output(args, presets[0])
-            build_stuff(state.uid, state.gid, args, presets[0])
+            config = presets[0]
+            unlink_output(args, config)
+            build_stuff(state.uid, state.gid, args, config)
 
-            initrds = [presets[0].output_compressed]
+            initrds = [config.output_dir / config.output]
 
     for kver, kimg in gen_kernel_images(state):
         with complete_step(f"Generating unified kernel image for {kimg}"):
@@ -813,22 +799,42 @@ def install_unified_kernel(state: MkosiState, roothash: Optional[str]) -> None:
 
             run(cmd)
 
-            if not state.staging.joinpath(state.config.output_split_uki.name).exists():
-                copy_path(boot_binary, state.staging / state.config.output_split_uki.name)
+            if not state.staging.joinpath(state.config.output_split_uki).exists():
+                copy_path(boot_binary, state.staging / state.config.output_split_uki)
 
-    if state.config.bootable is True and not state.staging.joinpath(state.config.output_split_uki.name).exists():
+    if state.config.bootable is True and not state.staging.joinpath(state.config.output_split_uki).exists():
         die("A bootable image was requested but no kernel was found")
 
 
-def compress_output(config: MkosiConfig, src: Path, uid: int, gid: int) -> None:
-    if not src.is_file():
-        return
+def compressor_command(compression: Compression) -> list[PathString]:
+    """Returns a command suitable for compressing archives."""
 
-    if not config.compress_output:
+    if compression == Compression.gz:
+        return [gzip_binary(), "--fast", "--stdout", "-"]
+    elif compression == Compression.xz:
+        return ["xz", "--check=crc32", "--fast", "-T0", "--stdout", "-"]
+    elif compression == Compression.zst:
+        return ["zstd", "-q", "-T0", "--stdout", "-"]
+    else:
+        die(f"Unknown compression {compression}")
+
+
+def maybe_compress(state: MkosiState, src: Path, dst: Optional[Path] = None) -> None:
+    if not state.config.compress_output or src.is_dir():
+        if dst:
+            shutil.move(src, dst)
         return
 
-    with complete_step(f"Compressing output file {src}…"):
-        run(compressor_command(config.compress_output, src), user=uid, group=gid)
+    if not dst:
+        dst = src.parent / f"{src.name}.{state.config.compress_output}"
+
+    with complete_step(f"Compressing {src}"):
+        with src.open("rb") as i:
+            src.unlink() # if src == dst, make sure dst doesn't truncate the src file but creates a new file.
+
+            with dst.open("wb") as o:
+                run(compressor_command(state.config.compress_output),
+                    user=state.uid, group=state.gid, stdin=i, stdout=o)
 
 
 def copy_nspawn_settings(state: MkosiState) -> None:
@@ -836,7 +842,7 @@ def copy_nspawn_settings(state: MkosiState) -> None:
         return None
 
     with complete_step("Copying nspawn settings file…"):
-        copy_path(state.config.nspawn_settings, state.staging / state.config.output_nspawn_settings.name)
+        copy_path(state.config.nspawn_settings, state.staging / state.config.output_nspawn_settings)
 
 
 def hash_file(of: TextIO, path: Path) -> None:
@@ -858,11 +864,11 @@ def calculate_sha256sum(state: MkosiState) -> None:
         return None
 
     with complete_step("Calculating SHA256SUMS…"):
-        with open(state.workspace / state.config.output_checksum.name, "w") as f:
+        with open(state.workspace / state.config.output_checksum, "w") as f:
             for p in state.staging.iterdir():
                 hash_file(f, p)
 
-        os.rename(state.workspace / state.config.output_checksum.name, state.staging / state.config.output_checksum.name)
+        state.workspace.joinpath(state.config.output_checksum).rename(state.staging / state.config.output_checksum)
 
 
 def calculate_signature(state: MkosiState) -> None:
@@ -877,8 +883,8 @@ def calculate_signature(state: MkosiState) -> None:
             cmdline += ["--default-key", state.config.key]
 
         cmdline += [
-            "--output", state.staging / state.config.output_signature.name,
-            state.staging / state.config.output_checksum.name,
+            "--output", state.staging / state.config.output_signature,
+            state.staging / state.config.output_checksum,
         ]
 
         run(
@@ -910,7 +916,7 @@ def save_cache(state: MkosiState) -> None:
             shutil.move(state.build_overlay, build)
 
 
-def dir_size(path: PathString) -> int:
+def dir_size(path: Union[Path, os.DirEntry[str]]) -> int:
     dir_sum = 0
     for entry in os.scandir(path):
         if entry.is_symlink():
@@ -921,31 +927,31 @@ def dir_size(path: PathString) -> int:
         elif entry.is_file():
             dir_sum += entry.stat().st_blocks * 512
         elif entry.is_dir():
-            dir_sum += dir_size(entry.path)
+            dir_sum += dir_size(entry)
     return dir_sum
 
 
 def save_manifest(state: MkosiState, manifest: Manifest) -> None:
     if manifest.has_data():
         if ManifestFormat.json in state.config.manifest_format:
-            with complete_step(f"Saving manifest {state.config.output_manifest.name}"):
-                with open(state.staging / state.config.output_manifest.name, 'w') as f:
+            with complete_step(f"Saving manifest {state.config.output_manifest}"):
+                with open(state.staging / state.config.output_manifest, 'w') as f:
                     manifest.write_json(f)
 
         if ManifestFormat.changelog in state.config.manifest_format:
-            with complete_step(f"Saving report {state.config.output_changelog.name}"):
-                with open(state.staging / state.config.output_changelog.name, 'w') as f:
+            with complete_step(f"Saving report {state.config.output_changelog}"):
+                with open(state.staging / state.config.output_changelog, 'w') as f:
                     manifest.write_package_report(f)
 
 
 def print_output_size(config: MkosiConfig) -> None:
-    if not config.output_compressed.exists():
+    if not config.output_dir.joinpath(config.output).exists():
         return
 
     if config.output_format == OutputFormat.directory:
-        log_step("Resulting image size is " + format_bytes(dir_size(config.output)) + ".")
+        log_step("Resulting image size is " + format_bytes(dir_size(config.output_dir / config.output)) + ".")
     else:
-        st = os.stat(config.output_compressed)
+        st = config.output_dir.joinpath(config.output).stat()
         size = format_bytes(st.st_size)
         space = format_bytes(st.st_blocks * 512)
         log_step(f"Resulting image size is {size}, consumes {space}.")
@@ -960,44 +966,24 @@ def empty_directory(path: Path) -> None:
 
 
 def unlink_output(args: MkosiArgs, config: MkosiConfig) -> None:
-    with complete_step("Removing output files…"):
-        if config.output.parent.exists():
-            for p in config.output.parent.iterdir():
-                if p.name.startswith(config.output.name):
-                    unlink_try_hard(p)
-
-        unlink_try_hard(Path(f"{config.output}.manifest"))
-        unlink_try_hard(Path(f"{config.output}.changelog"))
-
-        if config.output_split_kernel.parent.exists():
-            for p in config.output_split_kernel.parent.iterdir():
-                if p.name.startswith(config.output_split_kernel.name):
-                    unlink_try_hard(p)
-        unlink_try_hard(config.output_split_kernel)
-
-        if config.output_split_uki.parent.exists():
-            for p in config.output_split_uki.parent.iterdir():
-                if p.name.startswith(config.output_split_uki.name):
-                    unlink_try_hard(p)
-        unlink_try_hard(config.output_split_uki)
-
-        if config.nspawn_settings is not None:
-            unlink_try_hard(config.output_nspawn_settings)
-
-    if config.ssh and config.output_sshkey is not None:
-        unlink_try_hard(config.output_sshkey)
-
-    # We remove any cached images if either the user used --force
-    # twice, or he/she called "clean" with it passed once. Let's also
-    # remove the downloaded package cache if the user specified one
-    # additional "--force".
+    # We remove any cached images if either the user used --force twice, or he/she called "clean" with it
+    # passed once. Let's also remove the downloaded package cache if the user specified one additional
+    # "--force". Let's also remove all versions if the user specified one additional "--force".
 
     if args.verb == Verb.clean:
         remove_build_cache = args.force > 0
         remove_package_cache = args.force > 1
+        prefix = config.output if args.force > 1 else config.output_with_version
     else:
         remove_build_cache = args.force > 1
         remove_package_cache = args.force > 2
+        prefix = config.output if args.force > 2 else config.output_with_version
+
+    with complete_step("Removing output files…"):
+        if config.output_dir.exists():
+            for p in config.output_dir.iterdir():
+                if p.name.startswith(prefix):
+                    unlink_try_hard(p)
 
     if remove_build_cache:
         for p in cache_tree_paths(config):
@@ -1005,33 +991,23 @@ def unlink_output(args: MkosiArgs, config: MkosiConfig) -> None:
                 with complete_step(f"Removing cache directory {p}…"):
                     unlink_try_hard(p)
 
-        if config.build_dir is not None and config.build_dir.exists() and any(config.build_dir.iterdir()):
+        if config.build_dir and config.build_dir.exists() and any(config.build_dir.iterdir()):
             with complete_step("Clearing out build directory…"):
                 empty_directory(config.build_dir)
 
-        if config.install_dir is not None and config.install_dir.exists() and any(config.install_dir.iterdir()):
+        if config.install_dir and config.install_dir.exists() and any(config.install_dir.iterdir()):
             with complete_step("Clearing out install directory…"):
                 empty_directory(config.install_dir)
 
     if remove_package_cache:
-        if config.cache_dir is not None and config.cache_dir.exists() and any(config.cache_dir.iterdir()):
+        if config.cache_dir and config.cache_dir.exists() and any(config.cache_dir.iterdir()):
             with complete_step("Clearing out package cache…"):
                 empty_directory(config.cache_dir)
 
 
 def cache_tree_paths(config: MkosiConfig) -> tuple[Path, Path]:
-
     assert config.cache_dir
-
-    # If the image ID is specified, use cache file names that are independent of the image versions, so that
-    # rebuilding and bumping versions is cheap and reuses previous versions if cached.
-    if config.image_id:
-        prefix = config.cache_dir / config.image_id
-    # Otherwise, derive the cache file names directly from the output file names.
-    else:
-        prefix = config.cache_dir / config.output.name
-
-    return (Path(f"{prefix}.cache"), Path(f"{prefix}.build.cache"))
+    return config.cache_dir / f"{config.output}.cache", config.cache_dir / f"{config.output}.build.cache"
 
 
 def check_tree_input(path: Optional[Path]) -> None:
@@ -1093,9 +1069,8 @@ def check_outputs(config: MkosiConfig) -> None:
         config.output_checksum if config.checksum else None,
         config.output_signature if config.sign else None,
         config.output_nspawn_settings if config.nspawn_settings is not None else None,
-        config.output_sshkey if config.ssh else None,
     ):
-        if f and f.exists():
+        if f and config.output_dir.joinpath(f).exists():
             die(f"Output path {f} exists already. (Consider invocation with --force.)")
 
 
@@ -1183,7 +1158,7 @@ def print_summary(args: MkosiArgs, config: MkosiConfig) -> None:
               Manifest Formats: {maniformats}
               Output Directory: {none_to_default(config.output_dir)}
            Workspace Directory: {none_to_default(config.workspace_dir)}
-                        Output: {bold(config.output_compressed)}
+                        Output: {bold(config.output_with_compression)}
                Output Checksum: {none_to_na(config.output_checksum if config.checksum else None)}
               Output Signature: {none_to_na(config.output_signature if config.sign else None)}
         Output nspawn Settings: {none_to_na(config.output_nspawn_settings if config.nspawn_settings is not None else None)}
@@ -1238,9 +1213,6 @@ def print_summary(args: MkosiArgs, config: MkosiConfig) -> None:
 
 def make_output_dir(state: MkosiState) -> None:
     """Create the output directory if set and not existing yet"""
-    if state.config.output_dir is None:
-        return
-
     run(["mkdir", "-p", state.config.output_dir], user=state.uid, group=state.gid)
 
 
@@ -1384,9 +1356,9 @@ def reuse_cache_tree(state: MkosiState) -> bool:
     return True
 
 
-def invoke_repart(state: MkosiState, skip: Sequence[str] = [], split: bool = False) -> Optional[str]:
+def invoke_repart(state: MkosiState, skip: Sequence[str] = [], split: bool = False) -> tuple[Optional[str], list[Path]]:
     if not state.config.output_format == OutputFormat.disk:
-        return None
+        return None, []
 
     cmdline: list[PathString] = [
         "systemd-repart",
@@ -1395,10 +1367,10 @@ def invoke_repart(state: MkosiState, skip: Sequence[str] = [], split: bool = Fal
         "--dry-run=no",
         "--json=pretty",
         "--root", state.root,
-        state.staging / state.config.output.name,
+        state.staging / state.config.output_with_format,
     ]
 
-    if not state.staging.joinpath(state.config.output.name).exists():
+    if not state.staging.joinpath(state.config.output_with_format).exists():
         cmdline += ["--empty=create"]
     if state.config.passphrase:
         cmdline += ["--key-file", state.config.passphrase]
@@ -1480,7 +1452,9 @@ def invoke_repart(state: MkosiState, skip: Sequence[str] = [], split: bool = Fal
         else:
             roothash = roothash or h
 
-    return f"roothash={roothash}" if roothash else f"usrhash={usrhash}" if usrhash else None
+    split_paths = [Path(p["split_path"]) for p in output if p.get("split_path", "-") != "-"]
+
+    return f"roothash={roothash}" if roothash else f"usrhash={usrhash}" if usrhash else None, split_paths
 
 
 def build_image(state: MkosiState, *, for_cache: bool, manifest: Optional[Manifest] = None) -> None:
@@ -1524,9 +1498,12 @@ def build_image(state: MkosiState, *, for_cache: bool, manifest: Optional[Manife
         run_finalize_script(state)
         run_selinux_relabel(state)
 
-    roothash = invoke_repart(state, skip=("esp", "xbootldr"))
+    roothash, _ = invoke_repart(state, skip=("esp", "xbootldr"))
     install_unified_kernel(state, roothash)
-    invoke_repart(state, split=True)
+    _, split_paths = invoke_repart(state, split=True)
+
+    for p in split_paths:
+        maybe_compress(state, p)
 
     make_tar(state)
     make_initrd(state)
@@ -1624,7 +1601,7 @@ def acl_toggle_build(state: MkosiState) -> Iterator[None]:
 
         yield
     finally:
-        for p in (*state.config.base_trees, state.config.cache_dir, *state.config.output_paths()):
+        for p in (*state.config.base_trees, state.config.cache_dir, state.config.output_dir / state.config.output):
             if p and p.is_dir():
                 with complete_step(f"Adding ACLs to {p}"):
                     setfacl(p, state.uid, allow=True)
@@ -1663,24 +1640,23 @@ def build_stuff(uid: int, gid: int, args: MkosiArgs, config: MkosiConfig) -> Non
             state = dataclasses.replace(state, )
             build_image(state, manifest=manifest, for_cache=False)
 
+        maybe_compress(state,
+                       state.staging / state.config.output_with_format,
+                       state.staging / state.config.output_with_compression)
+
         copy_nspawn_settings(state)
         calculate_sha256sum(state)
         calculate_signature(state)
         save_manifest(state, manifest)
 
-        for p in state.config.output_paths():
-            if state.staging.joinpath(p.name).exists():
-                shutil.move(state.staging / p.name, p)
-                if p != state.config.output or state.config.output_format != OutputFormat.directory:
-                    os.chown(p, uid, gid)
-                if p == state.config.output:
-                    compress_output(state.config, p, uid=uid, gid=gid)
+        for f in state.staging.iterdir():
+            if not f.is_dir():
+                os.chown(f, uid, gid)
 
-        for p in state.staging.iterdir():
-            shutil.move(p, state.config.output.parent / p.name)
-            os.chown(state.config.output.parent / p.name, uid, gid)
-            if p.name.startswith(state.config.output.name):
-                compress_output(state.config, p, uid=uid, gid=gid)
+            shutil.move(f, state.config.output_dir)
+
+        if not state.config.output_dir.joinpath(state.config.output).exists():
+            state.config.output_dir.joinpath(state.config.output).symlink_to(state.config.output_with_compression)
 
     print_output_size(config)
 
@@ -1691,7 +1667,7 @@ def check_root() -> None:
 
 
 def machine_cid(config: MkosiConfig) -> int:
-    cid = int.from_bytes(hashlib.sha256(machine_name(config).encode()).digest()[:4], byteorder='little')
+    cid = int.from_bytes(hashlib.sha256(config.output_with_version.encode()).digest()[:4], byteorder='little')
     # Make sure we don't return any of the well-known CIDs.
     return max(3, min(cid, 0xFFFFFFFF - 1))
 
@@ -1718,25 +1694,25 @@ def acl_toggle_boot(config: MkosiConfig) -> Iterator[None]:
     uid = InvokingUser.uid()
 
     try:
-        with complete_step(f"Removing ACLs from {config.output}"):
-            setfacl(config.output, uid, allow=False)
+        with complete_step(f"Removing ACLs from {config.output_dir / config.output}"):
+            setfacl(config.output_dir / config.output, uid, allow=False)
         yield
     finally:
-        with complete_step(f"Adding ACLs to {config.output}"):
-            setfacl(config.output, uid, allow=True)
+        with complete_step(f"Adding ACLs to {config.output_dir / config.output}"):
+            setfacl(config.output_dir / config.output, uid, allow=True)
 
 
 def run_shell(args: MkosiArgs, config: MkosiConfig) -> None:
     cmdline: list[PathString] = ["systemd-nspawn", "--quiet"]
 
     if config.output_format == OutputFormat.directory:
-        cmdline += ["--directory", config.output]
+        cmdline += ["--directory", config.output_dir / config.output]
 
-        owner = os.stat(config.output).st_uid
+        owner = os.stat(config.output_dir / config.output).st_uid
         if owner != 0:
             cmdline += [f"--private-users={str(owner)}"]
     else:
-        cmdline += ["--image", config.output]
+        cmdline += ["--image", config.output_dir / config.output]
 
     # If we copied in a .nspawn file, make sure it's actually honoured
     if config.nspawn_settings is not None:
@@ -1755,7 +1731,7 @@ def run_shell(args: MkosiArgs, config: MkosiConfig) -> None:
     if config.ephemeral:
         cmdline += ["--ephemeral"]
 
-    cmdline += ["--machine", machine_name(config)]
+    cmdline += ["--machine", config.output]
     cmdline += [f"--bind={config.build_sources}:/root/src", "--chdir=/root/src"]
 
     for k, v in config.credentials.items():
@@ -1961,7 +1937,7 @@ def run_qemu(args: MkosiArgs, config: MkosiConfig) -> None:
             ]
 
         if config.ephemeral:
-            f = stack.enter_context(tempfile.NamedTemporaryFile(prefix=".mkosi-", dir=config.output.parent))
+            f = stack.enter_context(tempfile.NamedTemporaryFile(prefix=".mkosi-", dir=config.output_dir))
             fname = Path(f.name)
 
             # So on one hand we want CoW off, since this stuff will
@@ -1973,13 +1949,13 @@ def run_qemu(args: MkosiArgs, config: MkosiConfig) -> None:
             # CoW but later changes do not result in CoW anymore.
 
             run(["chattr", "+C", fname], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False)
-            copy_path(config.output_compressed, fname)
+            copy_path(config.output_dir / config.output, fname)
         else:
-            fname = config.output_compressed
+            fname = config.output_dir / config.output
 
         # Debian images fail to boot with virtio-scsi, see: https://github.com/systemd/mkosi/issues/725
         if config.output_format == OutputFormat.cpio:
-            kernel = (config.output_dir or Path.cwd()) / config.output_split_kernel
+            kernel = config.output_dir / config.output_split_kernel
             if not kernel.exists() and "-kernel" not in args.cmdline:
                 die("No kernel found, please install a kernel in the cpio or provide a -kernel argument to mkosi qemu")
             cmdline += ["-kernel", kernel,
@@ -2099,7 +2075,10 @@ def expand_specifier(s: str) -> str:
 
 
 def needs_build(args: MkosiArgs, config: MkosiConfig) -> bool:
-    return args.verb == Verb.build or (args.verb in MKOSI_COMMANDS_NEED_BUILD and (not config.output_compressed.exists() or args.force > 0))
+    if args.verb == Verb.build:
+        return True
+
+    return args.verb in MKOSI_COMMANDS_NEED_BUILD and (args.force > 0 or not config.output_dir.joinpath(config.output).exists())
 
 
 def run_verb(args: MkosiArgs, presets: Sequence[MkosiConfig]) -> None:
index 2af6ab6d7166aa4492c95f48536d5e68d536d744..4839c64b057ee6abf0ae67f2344ab9d1504ff26f 100644 (file)
@@ -394,6 +394,22 @@ def config_make_path_parser(*,
     return config_parse_path
 
 
+def config_parse_filename(dest: str, value: Optional[str], namespace: argparse.Namespace) -> Optional[str]:
+    if dest in namespace:
+        return getattr(namespace, dest) # type: ignore
+
+    if not value:
+        return None
+
+    if value == "." or value == "..":
+        die(". and .. are not valid filenames")
+
+    if "/" in value:
+        die(f"{value} is not a valid filename")
+
+    return value
+
+
 def match_path_exists(value: str) -> bool:
     if not value:
         return False
@@ -529,8 +545,8 @@ class MkosiConfig:
     architecture: str
     output_format: OutputFormat
     manifest_format: list[ManifestFormat]
-    output: Path
-    output_dir: Optional[Path]
+    output: str
+    output_dir: Path
     kernel_command_line: list[str]
     secure_boot: bool
     secure_boot_key: Optional[Path]
@@ -603,56 +619,62 @@ class MkosiConfig:
         return self.architecture == platform.machine()
 
     @property
-    def output_split_uki(self) -> Path:
-        return build_auxiliary_output_path(self, ".efi")
+    def output_with_version(self) -> str:
+        output = self.output
 
-    @property
-    def output_split_kernel(self) -> Path:
-        return build_auxiliary_output_path(self, ".vmlinuz")
+        if self.image_version:
+            output += f"_{self.image_version}"
+
+        return output
 
     @property
-    def output_nspawn_settings(self) -> Path:
-        return build_auxiliary_output_path(self, ".nspawn")
+    def output_with_format(self) -> str:
+        output = self.output_with_version
+
+        output += {
+            OutputFormat.disk: ".raw",
+            OutputFormat.cpio: ".cpio",
+            OutputFormat.tar: ".tar",
+        }.get(self.output_format, "")
+
+        return output
 
     @property
-    def output_checksum(self) -> Path:
-        return build_auxiliary_output_path(self, ".SHA256SUMS")
+    def output_with_compression(self) -> str:
+        output = self.output_with_format
+
+        if self.compress_output:
+            output += f".{self.compress_output}"
+
+        return output
 
     @property
-    def output_signature(self) -> Path:
-        return build_auxiliary_output_path(self, ".SHA256SUMS.gpg")
+    def output_split_uki(self) -> str:
+        return f"{self.output_with_version}.efi"
 
     @property
-    def output_sshkey(self) -> Path:
-        return build_auxiliary_output_path(self, ".ssh")
+    def output_split_kernel(self) -> str:
+        return f"{self.output_with_version}.vmlinuz"
 
     @property
-    def output_manifest(self) -> Path:
-        return build_auxiliary_output_path(self, ".manifest")
+    def output_nspawn_settings(self) -> str:
+        return f"{self.output_with_version}.nspawn"
 
     @property
-    def output_changelog(self) -> Path:
-        return build_auxiliary_output_path(self, ".changelog")
+    def output_checksum(self) -> str:
+        return f"{self.output_with_version}.SHA256SUMS"
 
     @property
-    def output_compressed(self) -> Path:
-        if not self.compress_output:
-            return self.output
+    def output_signature(self) -> str:
+        return f"{self.output_with_version}.SHA256SUMS.gpg"
 
-        return self.output.parent / f"{self.output.name}.{self.compress_output}"
+    @property
+    def output_manifest(self) -> str:
+        return f"{self.output_with_version}.manifest"
 
-    def output_paths(self) -> tuple[Path, ...]:
-        return (
-            self.output,
-            self.output_split_uki,
-            self.output_split_kernel,
-            self.output_nspawn_settings,
-            self.output_checksum,
-            self.output_signature,
-            self.output_sshkey,
-            self.output_manifest,
-            self.output_changelog,
-        )
+    @property
+    def output_changelog(self) -> str:
+        return f"{self.output_with_version}.changelog"
 
 
 class MkosiConfigParser:
@@ -719,7 +741,7 @@ class MkosiConfigParser:
         MkosiConfigSetting(
             dest="output",
             section="Output",
-            parse=config_make_path_parser(required=False, absolute=False),
+            parse=config_parse_filename,
         ),
         MkosiConfigSetting(
             dest="output_dir",
@@ -727,6 +749,7 @@ class MkosiConfigParser:
             section="Output",
             parse=config_make_path_parser(required=False),
             paths=("mkosi.output",),
+            default=Path("."),
         ),
         MkosiConfigSetting(
             dest="workspace_dir",
@@ -908,7 +931,7 @@ class MkosiConfigParser:
             dest="build_sources",
             section="Content",
             parse=config_make_path_parser(),
-            default=".",
+            default=Path("."),
         ),
         MkosiConfigSetting(
             dest="build_packages",
@@ -1295,13 +1318,13 @@ class MkosiConfigParser:
         group.add_argument(
             "-o", "--output",
             metavar="PATH",
-            help="Output image path",
+            help="Output name",
             action=action,
         )
         group.add_argument(
             "-O", "--output-dir",
             metavar="DIR",
-            help="Output root directory",
+            help="Output directory",
             action=action,
         )
         group.add_argument(
@@ -1810,11 +1833,6 @@ def strip_suffixes(path: Path) -> Path:
     return path
 
 
-def build_auxiliary_output_path(args: Union[argparse.Namespace, MkosiConfig], suffix: str) -> Path:
-    output = strip_suffixes(args.output)
-    return output.with_name(f"{output.name}{suffix}")
-
-
 def find_image_version(args: argparse.Namespace) -> None:
     if args.image_version is not None:
         return
@@ -1849,11 +1867,6 @@ def find_password(args: argparse.Namespace) -> None:
         pass
 
 
-
-def machine_name(config: Union[MkosiConfig, argparse.Namespace]) -> str:
-    return config.image_id or config.output.with_suffix("").name.partition("_")[0]
-
-
 def load_credentials(args: argparse.Namespace) -> dict[str, str]:
     creds = {}
 
@@ -1883,7 +1896,7 @@ def load_credentials(args: argparse.Namespace) -> dict[str, str]:
         creds["firstboot.locale"] = "C.UTF-8"
 
     if "firstboot.hostname" not in creds:
-        creds["firstboot.hostname"] = machine_name(args)
+        creds["firstboot.hostname"] = args.output
 
     if args.ssh and "ssh.authorized_keys.root" not in creds and "SSH_AUTH_SOCK" in os.environ:
         key = run(
@@ -1951,26 +1964,7 @@ def load_config(args: argparse.Namespace) -> MkosiConfig:
         args.compress_output = Compression.zst if args.output_format == OutputFormat.cpio else Compression.none
 
     if args.output is None:
-        iid = args.image_id or args.preset or "image"
-        prefix = f"{iid}_{args.image_version}" if args.image_version is not None else iid
-
-        if args.output_format == OutputFormat.disk:
-            output = f"{prefix}.raw"
-        elif args.output_format == OutputFormat.tar:
-            output = f"{prefix}.tar"
-        elif args.output_format == OutputFormat.cpio:
-            output = f"{prefix}.cpio"
-        else:
-            output = prefix
-        args.output = Path(output)
-
-    if args.output_dir is not None:
-        if "/" not in str(args.output):
-            args.output = args.output_dir / args.output
-        else:
-            logging.warning("Ignoring configured output directory as output file is a qualified path.")
-
-    args.output = args.output.absolute()
+        args.output = args.image_id or args.preset or "image"
 
     if args.environment:
         env = {}