from textwrap import dedent
from typing import Callable, ContextManager, Optional, TextIO, TypeVar, Union, cast
-from mkosi.config import GenericVersion, MkosiConfig, machine_name
+from mkosi.config import GenericVersion, MkosiArgs, MkosiConfig, machine_name
from mkosi.install import add_dropin_config_from_resource, copy_path, flock
from mkosi.log import ARG_DEBUG, Style, color_error, complete_step, die, log_step
from mkosi.manifest import Manifest
pass
-def unlink_output(config: MkosiConfig) -> None:
+def unlink_output(args: MkosiArgs, config: MkosiConfig) -> None:
with complete_step("Removing output files…"):
if config.output.parent.exists():
for p in config.output.parent.iterdir():
# remove the downloaded package cache if the user specified one
# additional "--force".
- if config.verb == Verb.clean:
- remove_build_cache = config.force > 0
- remove_package_cache = config.force > 1
+ if args.verb == Verb.clean:
+ remove_build_cache = args.force > 0
+ remove_package_cache = args.force > 1
else:
- remove_build_cache = config.force > 1
- remove_package_cache = config.force > 2
+ remove_build_cache = args.force > 1
+ remove_package_cache = args.force > 2
if remove_build_cache:
with complete_step("Removing incremental cache files…"):
return "\n ".join(items)
-def print_summary(config: MkosiConfig) -> None:
+def print_summary(args: MkosiArgs, config: MkosiConfig) -> None:
b = Style.bold
e = Style.reset
bold: Callable[..., str] = lambda s: f"{b}{s}{e}"
summary = f"""\
{bold("COMMANDS")}:
- verb: {bold(config.verb)}
- cmdline: {bold(" ".join(config.cmdline))}
+ verb: {bold(args.verb)}
+ cmdline: {bold(" ".join(args.cmdline))}
{bold("DISTRIBUTION")}
Distribution: {bold(config.distribution.name)}
GPG Key: ({"default" if config.key is None else config.key})
"""
- page(summary, config.pager)
+ page(summary, args.pager)
def make_output_dir(state: MkosiState) -> None:
stdout=sys.stdout, env=env | state.environment)
-def need_cache_tree(state: MkosiState) -> bool:
+def need_cache_tree(args: MkosiArgs, state: MkosiState) -> bool:
if not state.config.incremental:
return False
- if state.config.force > 1:
+ if args.force > 1:
return True
final, build = cache_tree_paths(state.config)
return not final.exists() or (state.config.build_script is not None and not build.exists())
-def build_stuff(uid: int, gid: int, config: MkosiConfig) -> None:
+def build_stuff(uid: int, gid: int, args: MkosiArgs, config: MkosiConfig) -> None:
workspace = tempfile.TemporaryDirectory(dir=config.workspace_dir or Path.cwd(), prefix=".mkosi.tmp")
workspace_dir = Path(workspace.name)
cache = config.cache_dir or workspace_dir / "cache"
# while we are working on it.
with flock(workspace_dir), workspace:
# If caching is requested, then make sure we have cache trees around we can make use of
- if need_cache_tree(state):
+ if need_cache_tree(args, state):
with complete_step("Building cache image"):
build_image(state, for_cache=True)
save_cache(state)
return "unrecognized option" not in c.stderr
-def run_shell(config: MkosiConfig) -> None:
+def run_shell(args: MkosiArgs, config: MkosiConfig) -> None:
cmdline: list[PathString] = ["systemd-nspawn", "--quiet"]
if config.output_format in (OutputFormat.directory, OutputFormat.subvolume):
if config.nspawn_settings is not None:
cmdline += ["--settings=trusted"]
- if config.verb == Verb.boot:
+ if args.verb == Verb.boot:
cmdline += ["--boot"]
else:
cmdline += [f"--rlimit=RLIMIT_CORE={format_rlimit(resource.RLIMIT_CORE)}"]
for k, v in config.credentials.items():
cmdline += [f"--set-credential={k}:{v}"]
- if config.verb == Verb.boot:
+ if args.verb == Verb.boot:
# Add nspawn options first since systemd-nspawn ignores all options after the first argument.
- cmdline += config.cmdline
+ cmdline += args.cmdline
# kernel cmdline config of the form systemd.xxx= get interpreted by systemd when running in nspawn as
# well.
cmdline += config.kernel_command_line
cmdline += config.kernel_command_line_extra
- elif config.cmdline:
+ elif args.cmdline:
cmdline += ["--"]
- cmdline += config.cmdline
+ cmdline += args.cmdline
uid = InvokingUser.uid()
swtpm_proc.wait()
-def run_qemu(config: MkosiConfig) -> None:
+def run_qemu(args: MkosiArgs, config: MkosiConfig) -> None:
accel = "kvm" if config.qemu_kvm else "tcg"
firmware, fw_supports_sb = find_qemu_firmware(config)
# Debian images fail to boot with virtio-scsi, see: https://github.com/systemd/mkosi/issues/725
if config.output_format == OutputFormat.cpio:
kernel = (config.output_dir or Path.cwd()) / config.output_split_kernel
- if not kernel.exists() and "-kernel" not in config.cmdline:
+ if not kernel.exists() and "-kernel" not in args.cmdline:
die("No kernel found, please install a kernel in the cpio or provide a -kernel argument to mkosi qemu")
cmdline += ["-kernel", kernel,
"-initrd", fname,
cmdline += ["-device", "tpm-tis-device,tpmdev=tpm0"]
cmdline += config.qemu_args
- cmdline += config.cmdline
+ cmdline += args.cmdline
run(cmdline, stdin=sys.stdin, stdout=sys.stdout, env=os.environ, log=False)
-def run_ssh(config: MkosiConfig) -> None:
+def run_ssh(args: MkosiArgs, config: MkosiConfig) -> None:
cmd = [
"ssh",
# Silence known hosts file errors/warnings.
"root@mkosi",
]
- cmd += config.cmdline
+ cmd += args.cmdline
run(cmd, stdin=sys.stdin, stdout=sys.stdout, env=os.environ, log=False)
httpd.serve_forever()
-def generate_secure_boot_key(config: MkosiConfig) -> None:
+def generate_secure_boot_key(args: MkosiArgs, config: MkosiConfig) -> None:
"""Generate secure boot keys using openssl"""
keylength = 2048
cn = expand_specifier(config.secure_boot_common_name)
for f in (config.secure_boot_key, config.secure_boot_certificate):
- if f and not config.force:
+ if f and not args.force:
die(f"{f} already exists",
hint=("To generate new secure boot keys, "
f"first remove {config.secure_boot_key} {config.secure_boot_certificate}"))
return s.replace("%u", InvokingUser.name())
-def needs_build(config: Union[argparse.Namespace, MkosiConfig]) -> bool:
- return config.verb == Verb.build or (config.verb in MKOSI_COMMANDS_NEED_BUILD and (not config.output_compressed.exists() or config.force > 0))
+def needs_build(args: MkosiArgs, config: MkosiConfig) -> bool:
+ return args.verb == Verb.build or (args.verb in MKOSI_COMMANDS_NEED_BUILD and (not config.output_compressed.exists() or args.force > 0))
-def run_verb(config: MkosiConfig) -> None:
+def run_verb(args: MkosiArgs, config: MkosiConfig) -> None:
with prepend_to_environ_path(config.extra_search_paths):
- if config.verb == Verb.genkey:
- return generate_secure_boot_key(config)
+ if args.verb == Verb.genkey:
+ return generate_secure_boot_key(args, config)
- if config.verb == Verb.bump:
+ if args.verb == Verb.bump:
return bump_image_version(config)
- if config.verb == Verb.summary:
- return print_summary(config)
+ if args.verb == Verb.summary:
+ return print_summary(args, config)
- if config.verb in MKOSI_COMMANDS_SUDO:
+ if args.verb in MKOSI_COMMANDS_SUDO:
check_root()
- if config.verb == Verb.build:
+ if args.verb == Verb.build:
check_inputs(config)
- if not config.force:
+ if not args.force:
check_outputs(config)
- if needs_build(config) or config.verb == Verb.clean:
+ if needs_build(args, config) or args.verb == Verb.clean:
def target() -> None:
become_root()
- unlink_output(config)
+ unlink_output(args, config)
fork_and_wait(target)
- if needs_build(config):
+ if needs_build(args, config):
def target() -> None:
# Get the user UID/GID either on the host or in the user namespace running the build
uid, gid = become_root()
init_mount_namespace()
- build_stuff(uid, gid, config)
+ build_stuff(uid, gid, args, config)
# We only want to run the build in a user namespace but not the following steps. Since we can't
# rejoin the parent user namespace after unsharing from it, let's run the build in a fork so that
if config.auto_bump:
bump_image_version(config)
- if config.verb in (Verb.shell, Verb.boot):
- run_shell(config)
+ if args.verb in (Verb.shell, Verb.boot):
+ run_shell(args, config)
- if config.verb == Verb.qemu:
- run_qemu(config)
+ if args.verb == Verb.qemu:
+ run_qemu(args, config)
- if config.verb == Verb.ssh:
- run_ssh(config)
+ if args.verb == Verb.ssh:
+ run_ssh(args, config)
- if config.verb == Verb.serve:
+ if args.verb == Verb.serve:
run_serve(config)
import enum
import fnmatch
import functools
+import inspect
import logging
import operator
import os.path
parser.exit()
+@dataclasses.dataclass(frozen=True)
+class MkosiArgs:
+ verb: Verb
+ cmdline: list[str]
+ force: int
+ directory: Optional[Path]
+ debug: bool
+ debug_shell: bool
+ pager: bool
+
+ @classmethod
+ def from_namespace(cls, ns: argparse.Namespace) -> "MkosiArgs":
+ return cls(**{
+ k: v for k, v in vars(ns).items()
+ if k in inspect.signature(cls).parameters
+ })
+
+
@dataclasses.dataclass(frozen=True)
class MkosiConfig:
"""Type-hinted storage for command line arguments.
access the value from state.
"""
- verb: Verb
- cmdline: list[str]
- force: int
-
distribution: Distribution
release: str
mirror: Optional[str]
ephemeral: bool
ssh: bool
credentials: dict[str, str]
- directory: Optional[Path]
- debug: bool
- debug_shell: bool
auto_bump: bool
workspace_dir: Optional[Path]
initrds: list[Path]
make_initrd: bool
kernel_command_line_extra: list[str]
acl: bool
- pager: bool
bootable: Optional[bool]
# QEMU-specific options
passphrase: Optional[Path]
+ @classmethod
+ def from_namespace(cls, ns: argparse.Namespace) -> "MkosiConfig":
+ return cls(**{
+ k: v for k, v in vars(ns).items()
+ if k in inspect.signature(cls).parameters
+ })
+
def architecture_is_native(self) -> bool:
return self.architecture == platform.machine()
help="Change to specified directory before doing anything",
type=Path,
metavar="PATH",
+ default=None,
)
parser.add_argument(
"--debug",
return parser
- def parse(self, args: Optional[Sequence[str]] = None) -> MkosiConfig:
+ def parse(self, argv: Optional[Sequence[str]] = None) -> tuple[MkosiArgs, MkosiConfig]:
namespace = argparse.Namespace()
- if args is None:
- args = sys.argv[1:]
- args = list(args)
+ if argv is None:
+ argv = sys.argv[1:]
+ argv = list(argv)
# Make sure the verb command gets explicitly passed. Insert a -- before the positional verb argument
# otherwise it might be considered as an argument of a parameter with nargs='?'. For example mkosi -i
# summary would be treated as -i=summary.
for verb in Verb:
try:
- v_i = args.index(verb.name)
+ v_i = argv.index(verb.name)
except ValueError:
continue
- if v_i > 0 and args[v_i - 1] != "--":
- args.insert(v_i, "--")
+ if v_i > 0 and argv[v_i - 1] != "--":
+ argv.insert(v_i, "--")
break
else:
- args += ["--", "build"]
+ argv += ["--", "build"]
argparser = self.create_argument_parser()
- argparser.parse_args(args, namespace)
+ argparser.parse_args(argv, namespace)
- if namespace.verb == Verb.help:
- PagerHelpAction.__call__(None, argparser, namespace) # type: ignore
+ args = load_args(namespace)
- if "directory" not in namespace:
- setattr(namespace, "directory", None)
+ if args.verb == Verb.help:
+ PagerHelpAction.__call__(None, argparser, namespace) # type: ignore
- if namespace.directory and not namespace.directory.is_dir():
+ if args.directory and not args.directory.is_dir():
die(f"Error: {namespace.directory} is not a directory!")
- p = namespace.directory or Path.cwd()
- with chdir(p):
- self.parse_config(namespace.directory or Path.cwd(), namespace)
+ with chdir(args.directory or Path.cwd()):
+ self.parse_config(Path("."), namespace)
for s in self.SETTINGS:
if s.dest in namespace:
setattr(namespace, s.dest, default)
- return load_args(namespace)
+ return args, load_config(namespace)
class GenericVersion:
return cmdline
-def load_args(args: argparse.Namespace) -> MkosiConfig:
+def load_args(args: argparse.Namespace) -> MkosiArgs:
ARG_DEBUG.set(args.debug)
ARG_DEBUG_SHELL.set(args.debug_shell)
+ return MkosiArgs.from_namespace(args)
+
+
+def load_config(args: argparse.Namespace) -> MkosiConfig:
find_image_version(args)
if args.cmdline and args.verb not in MKOSI_COMMANDS_CMDLINE:
if (args.build_script is not None or args.base_trees) and GenericVersion(platform.release()) < GenericVersion("5.11") and os.geteuid() != 0:
die("This unprivileged build configuration requires at least Linux v5.11")
- return MkosiConfig(**vars(args))
+ return MkosiConfig.from_namespace(args)
+
import pytest
from mkosi.util import Compression, Distribution, Verb
-from mkosi.config import MkosiConfigParser, MkosiConfig
+from mkosi.config import MkosiConfigParser, MkosiConfig, MkosiArgs
@contextmanager
chdir(old_dir)
-def parse(argv: Optional[List[str]] = None) -> MkosiConfig:
+def parse(argv: Optional[List[str]] = None) -> tuple[MkosiArgs, MkosiConfig]:
return MkosiConfigParser().parse(argv)
def test_parse_load_verb() -> None:
with cd_temp_dir():
- assert parse(["build"]).verb == Verb.build
- assert parse(["clean"]).verb == Verb.clean
+ assert parse(["build"])[0].verb == Verb.build
+ assert parse(["clean"])[0].verb == Verb.clean
with pytest.raises(SystemExit):
parse(["help"])
- assert parse(["genkey"]).verb == Verb.genkey
- assert parse(["bump"]).verb == Verb.bump
- assert parse(["serve"]).verb == Verb.serve
- assert parse(["build"]).verb == Verb.build
- assert parse(["shell"]).verb == Verb.shell
- assert parse(["boot"]).verb == Verb.boot
- assert parse(["qemu"]).verb == Verb.qemu
+ assert parse(["genkey"])[0].verb == Verb.genkey
+ assert parse(["bump"])[0].verb == Verb.bump
+ assert parse(["serve"])[0].verb == Verb.serve
+ assert parse(["build"])[0].verb == Verb.build
+ assert parse(["shell"])[0].verb == Verb.shell
+ assert parse(["boot"])[0].verb == Verb.boot
+ assert parse(["qemu"])[0].verb == Verb.qemu
with pytest.raises(SystemExit):
parse(["invalid"])
def test_os_distribution() -> None:
with cd_temp_dir():
for dist in Distribution:
- assert parse(["-d", dist.name]).distribution == dist
+ assert parse(["-d", dist.name])[1].distribution == dist
with pytest.raises(tuple((argparse.ArgumentError, SystemExit))):
parse(["-d", "invalidDistro"])
for dist in Distribution:
config = Path("mkosi.conf")
config.write_text(f"[Distribution]\nDistribution={dist}")
- assert parse([]).distribution == dist
+ assert parse([])[1].distribution == dist
def test_parse_config_files_filter() -> None:
(confd / "10-file.conf").write_text("[Content]\nPackages=yes")
(confd / "20-file.noconf").write_text("[Content]\nPackages=nope")
- assert parse([]).packages == ["yes"]
+ assert parse([])[1].packages == ["yes"]
def test_shell_boot() -> None:
def test_compression() -> None:
with cd_temp_dir():
- assert parse(["--format", "disk", "--compress-output", "False"]).compress_output == Compression.none
+ assert parse(["--format", "disk", "--compress-output", "False"])[1].compress_output == Compression.none
@pytest.mark.parametrize("dist1,dist2", itertools.combinations_with_replacement(Distribution, 2))