*(["-f"] * context.args.force),
]
- with resource_path(mkosi.resources) as r:
- cmdline += ["--include", os.fspath(r / "mkosi-initrd")]
+ with resource_path(mkosi.resources, "mkosi-initrd") as r:
+ cmdline += ["--include", os.fspath(r)]
for include in context.config.initrd_include:
cmdline += ["--include", os.fspath(include)]
args, [config] = parse_config(cmdline + ["build"])
+ make_executable(
+ *config.prepare_scripts,
+ *config.postinst_scripts,
+ *config.finalize_scripts,
+ *config.build_scripts,
+ )
+
config = dataclasses.replace(config, image="default-initrd")
assert config.output_dir
if not p.is_file():
die(f"Initrd {p} is not a file")
+ for script in config.prepare_scripts + config.build_scripts + config.postinst_scripts + config.finalize_scripts:
+ if not os.access(script, os.X_OK):
+ die(f"{script} is not executable")
+
def check_outputs(config: Config) -> None:
for f in (
}
with (
- resource_path(mkosi.resources) as r,
+ resource_path(mkosi.resources, f"repart/definitions/{context.config.output_format}.repart.d") as r,
complete_step(f"Building {context.config.output_format} extension image")
):
bwrap(
context,
- cmdline + ["--definitions", r / f"repart/definitions/{context.config.output_format}.repart.d"],
+ cmdline + ["--definitions", r],
devices=not context.config.repart_offline,
env=env,
)
def run_shell(args: Args, config: Config) -> None:
+ opname = "acquire shell in" if args.verb == Verb.shell else "boot"
+ if config.output_format in (OutputFormat.tar, OutputFormat.cpio):
+ die(f"Sorry, can't {opname} a {config.output_format} archive.")
+ if config.output_format.use_outer_compression() and config.compress_output:
+ die(f"Sorry, can't {opname} a compressed image.")
+
cmdline: list[PathString] = ["systemd-nspawn", "--quiet"]
# If we copied in a .nspawn file, make sure it's actually honoured
if config.output_format not in (OutputFormat.disk, OutputFormat.directory):
die(f"{config.output_format} images cannot be inspected with {tool}")
+ if (
+ args.verb in (Verb.journalctl, Verb.coredumpctl)
+ and config.output_format == OutputFormat.disk
+ and os.getuid() != 0
+ ):
+ die(f"Must be root to run the {args.verb} command")
+
if (tool_path := find_binary(tool)) is None:
die(f"Failed to find {tool}")
while formats:
form = formats.pop(0)
try:
- with resource_path(mkosi.resources) as r:
- if form == DocFormat.man:
- man = r / "mkosi.1"
+ if form == DocFormat.man:
+ with resource_path(mkosi.resources, "mkosi.1") as man:
if not man.exists():
raise FileNotFoundError()
run(["man", "--local-file", man])
- return
- elif form == DocFormat.pandoc:
- if not shutil.which("pandoc"):
- logging.error("pandoc is not available")
- mdr = r / "mkosi.md"
+ return
+ elif form == DocFormat.pandoc:
+ if not shutil.which("pandoc"):
+ logging.error("pandoc is not available")
+ with resource_path(mkosi.resources, "mkosi.md") as mdr:
pandoc = run(["pandoc", "-t", "man", "-s", mdr], stdout=subprocess.PIPE)
- run(["man", "--local-file", "-"], input=pandoc.stdout)
- return
- elif form == DocFormat.markdown:
- md = (r / "mkosi.md").read_text()
- page(md, args.pager)
- return
- elif form == DocFormat.system:
- run(["man", "mkosi"])
- return
+ run(["man", "--local-file", "-"], input=pandoc.stdout)
+ return
+ elif form == DocFormat.markdown:
+ with resource_path(mkosi.resources, "mkosi.md") as mdr:
+ page(mdr.read_text(), args.pager)
+ return
+ elif form == DocFormat.system:
+ run(["man", "mkosi"])
+ return
except (FileNotFoundError, subprocess.CalledProcessError) as e:
if not formats:
if isinstance(e, FileNotFoundError):
os.environ["PATH"] = ":".join(olds)
-def finalize_tools(args: Args, images: Sequence[Config]) -> Sequence[Config]:
- new = []
+@contextlib.contextmanager
+def finalize_default_tools(args: Args, config: Config) -> Iterator[Config]:
+ distribution = config.tools_tree_distribution or config.distribution.default_tools_tree_distribution()
+ if not distribution:
+ die(f"{config.distribution} does not have a default tools tree distribution",
+ hint="use ToolsTreeDistribution= to set one explicitly")
+
+ release = config.tools_tree_release or distribution.default_release()
+ mirror = (
+ config.tools_tree_mirror or
+ (config.mirror if config.mirror and config.distribution == distribution else None)
+ )
- for config in images:
- if not config.tools_tree or config.tools_tree.name != "default":
- new.append(config)
- continue
+ cmdline = [
+ "--directory", "",
+ "--distribution", str(distribution),
+ *(["--release", release] if release else []),
+ *(["--mirror", mirror] if mirror else []),
+ "--repository-key-check", str(config.repository_key_check),
+ "--cache-only", str(config.cache_only),
+ *(["--output-dir", str(config.output_dir)] if config.output_dir else []),
+ *(["--workspace-dir", str(config.workspace_dir)] if config.workspace_dir else []),
+ *(["--cache-dir", str(config.cache_dir)] if config.cache_dir else []),
+ "--incremental", str(config.incremental),
+ "--acl", str(config.acl),
+ *([f"--package={package}" for package in config.tools_tree_packages]),
+ "--output", f"{distribution}-tools",
+ *(["--source-date-epoch", str(config.source_date_epoch)] if config.source_date_epoch is not None else []),
+ *([f"--environment={k}='{v}'" for k, v in config.environment.items()]),
+ *([f"--extra-search-path={p}" for p in config.extra_search_paths]),
+ *(["-f"] * args.force),
+ ]
- distribution = config.tools_tree_distribution or config.distribution.default_tools_tree_distribution()
- if not distribution:
- die(f"{config.distribution} does not have a default tools tree distribution",
- hint="use ToolsTreeDistribution= to set one explicitly")
+ with resource_path(mkosi.resources, "mkosi-tools") as r:
+ _, [tools] = parse_config(cmdline + ["--include", os.fspath(r), "build"])
- release = config.tools_tree_release or distribution.default_release()
- mirror = (
- config.tools_tree_mirror or
- (config.mirror if config.mirror and config.distribution == distribution else None)
+ make_executable(
+ *tools.prepare_scripts,
+ *tools.postinst_scripts,
+ *tools.finalize_scripts,
+ *tools.build_scripts,
)
- cmdline = [
- "--directory", "",
- "--distribution", str(distribution),
- *(["--release", release] if release else []),
- *(["--mirror", mirror] if mirror else []),
- "--repository-key-check", str(config.repository_key_check),
- "--cache-only", str(config.cache_only),
- *(["--output-dir", str(config.output_dir)] if config.output_dir else []),
- *(["--workspace-dir", str(config.workspace_dir)] if config.workspace_dir else []),
- *(["--cache-dir", str(config.cache_dir)] if config.cache_dir else []),
- "--incremental", str(config.incremental),
- "--acl", str(config.acl),
- *([f"--package={package}" for package in config.tools_tree_packages]),
- "--output", f"{distribution}-tools",
- *(["--source-date-epoch", str(config.source_date_epoch)] if config.source_date_epoch is not None else []),
- *([f"--environment={k}='{v}'" for k, v in config.environment.items()]),
- *([f"--extra-search-path={p}" for p in config.extra_search_paths]),
- *(["-f"] * args.force),
- ]
-
- with resource_path(mkosi.resources) as r:
- _, [tools] = parse_config(cmdline + ["--include", os.fspath(r / "mkosi-tools"), "build"])
-
tools = dataclasses.replace(tools, image=f"{distribution}-tools")
- if tools not in new:
- new.append(tools)
-
- new.append(dataclasses.replace(config, tools_tree=tools.output_dir_or_cwd() / tools.output))
-
- return new
+ yield tools
def check_workspace_directory(config: Config) -> None:
hint="Use WorkspaceDirectory= to configure a different workspace directory")
+def run_clean(args: Args, config: Config) -> None:
+ become_root()
+ unlink_output(args, config)
+
+
+def run_build(args: Args, config: Config) -> None:
+ become_root()
+ init_mount_namespace()
+
+ # For extra safety when running as root, remount a bunch of stuff read-only.
+ for d in ("/usr", "/etc", "/opt", "/srv", "/boot", "/efi", "/media", "/mnt"):
+ if Path(d).exists():
+ run(["mount", "--rbind", d, d, "--options", "ro"])
+
+ with (
+ complete_step(f"Building {config.name()} image"),
+ mount_usr(config.tools_tree),
+ prepend_to_environ_path(config),
+ ):
+ # After tools have been mounted, check if we have what we need
+ check_tools(Verb.build, config)
+
+ # Create these as the invoking user to make sure they're owned by the user running mkosi.
+ for p in (
+ config.output_dir,
+ config.cache_dir,
+ config.build_dir,
+ config.workspace_dir,
+ ):
+ if p:
+ run(["mkdir", "--parents", p], user=INVOKING_USER.uid, group=INVOKING_USER.gid)
+
+ with acl_toggle_build(config, INVOKING_USER.uid):
+ build_image(args, config)
+
+
def run_verb(args: Args, images: Sequence[Config]) -> None:
+ images = list(images)
+
if args.verb.needs_root() and os.getuid() != 0:
die(f"Must be root to run the {args.verb} command")
for config in images:
check_workspace_directory(config)
- images = finalize_tools(args, images)
- last = images[-1]
-
- if args.verb in (Verb.shell, Verb.boot):
- opname = "acquire shell in" if args.verb == Verb.shell else "boot"
- if last.output_format in (OutputFormat.tar, OutputFormat.cpio):
- die(f"Sorry, can't {opname} a {last.output_format} archive.")
- if last.output_format.use_outer_compression() and last.compress_output:
- die(f"Sorry, can't {opname} a compressed image.")
-
- if (
- args.verb in (Verb.journalctl, Verb.coredumpctl)
- and last.output_format == OutputFormat.disk
- and os.getuid() != 0
- ):
- die(f"Must be root to run the {args.verb} command")
-
for config in images:
if args.verb == Verb.build and not args.force:
check_outputs(config)
for config in images:
try_import(f"mkosi.distributions.{config.distribution}")
- # After we unshare the user namespace, we might not have access to /dev/kvm or related device nodes anymore as
- # access to these might be gated behind the kvm group and we won't be part of the kvm group anymore after unsharing
- # the user namespace. To get around this, open all those device nodes now while we still can so we can pass them as
- # file descriptors to qemu later. Note that we can't pass the kvm file descriptor to qemu until
- # https://gitlab.com/qemu-project/qemu/-/issues/1936 is resolved.
- qemu_device_fds = {
- d: d.open()
- for d in QemuDeviceNode
- if args.verb == Verb.qemu and d.feature(last) != ConfigFeature.disabled and d.available(log=True)
- }
-
# First, process all directory removals because otherwise if different images share directories a later
# image build could end up deleting the output generated by an earlier image build.
if not needs_build(args, config) and args.verb != Verb.clean:
continue
- def target() -> None:
- become_root()
- unlink_output(args, config)
+ if config.tools_tree and config.tools_tree.name == "default":
+ with finalize_default_tools(args, config) as tools:
+ fork_and_wait(lambda: run_clean(args, tools)) # pyright: ignore
- fork_and_wait(target)
+ fork_and_wait(lambda: run_clean(args, config))
if args.verb == Verb.clean:
return
build = False
- for config in images:
- check_inputs(config)
+ for i, config in enumerate(images):
+ with (
+ finalize_default_tools(args, config)
+ if config.tools_tree and config.tools_tree.name == "default"
+ else contextlib.nullcontext()
+ as tools
+ ):
+ images[i] = config = dataclasses.replace(
+ config,
+ tools_tree=tools.output_dir_or_cwd() / tools.output if tools else config.tools_tree,
+ )
+
+ if tools and needs_build(args, tools):
+ check_inputs(tools)
+ fork_and_wait(lambda: run_build(args, tools)) # pyright: ignore
if not needs_build(args, config):
continue
- def target() -> None:
- become_root()
- init_mount_namespace()
-
- # For extra safety when running as root, remount a bunch of stuff read-only.
- for d in ("/usr", "/etc", "/opt", "/srv", "/boot", "/efi", "/media", "/mnt"):
- if Path(d).exists():
- run(["mount", "--rbind", d, d, "--options", "ro"])
-
- with (
- complete_step(f"Building {config.name()} image"),
- mount_usr(config.tools_tree),
- prepend_to_environ_path(config),
- ):
- # After tools have been mounted, check if we have what we need
- check_tools(Verb.build, config)
-
- # Create these as the invoking user to make sure they're owned by the user running mkosi.
- for p in (
- config.output_dir,
- config.cache_dir,
- config.build_dir,
- config.workspace_dir,
- ):
- if p:
- run(["mkdir", "--parents", p], user=INVOKING_USER.uid, group=INVOKING_USER.gid)
-
- with acl_toggle_build(config, INVOKING_USER.uid):
- build_image(args, config)
-
- fork_and_wait(target)
+ check_inputs(config)
+ fork_and_wait(lambda: run_build(args, config))
build = True
if args.verb == Verb.build:
return
+ last = images[-1]
+
+ # After we unshare the user namespace, we might not have access to /dev/kvm or related device nodes anymore as
+ # access to these might be gated behind the kvm group and we won't be part of the kvm group anymore after
+ # unsharing the user namespace. To get around this, open all those device nodes now while we still can so we
+ # can pass them as file descriptors to qemu later. Note that we can't pass the kvm file descriptor to qemu
+ # until https://gitlab.com/qemu-project/qemu/-/issues/1936 is resolved.
+ qemu_device_fds = {
+ d: d.open()
+ for d in QemuDeviceNode
+ if args.verb == Verb.qemu and d.feature(last) != ConfigFeature.disabled and d.available(log=True)
+ }
+
if last.tools_tree and args.verb != Verb.ssh:
become_root()
from collections.abc import Iterable, Iterator, Mapping, Sequence
from pathlib import Path
from types import ModuleType
-from typing import Any, Callable, TypeVar
+from typing import Any, Callable, TypeVar, no_type_check
from mkosi.types import PathString
os.chdir(old)
-def make_executable(path: Path) -> None:
- st = path.stat()
- os.chmod(path, st.st_mode | stat.S_IEXEC)
+def make_executable(*paths: Path) -> None:
+ for path in paths:
+ st = path.stat()
+ os.chmod(path, st.st_mode | stat.S_IEXEC)
def try_import(module: str) -> None:
@contextlib.contextmanager
-def resource_path(mod: ModuleType) -> Iterator[Path]:
+def resource_path(mod: ModuleType, path: str) -> Iterator[Path]:
+
+ # We backport as_file() from python 3.12 here temporarily since it added directory support.
+ # TODO: Remove once minimum python version is 3.12.
+
+ # SPDX-License-Identifier: PSF-2.0
+ # Copied from https://github.com/python/cpython/blob/main/Lib/importlib/resources/_common.py
+
+ @no_type_check
+ @contextlib.contextmanager
+ def _tempfile(
+ reader,
+ suffix='',
+ # gh-93353: Keep a reference to call os.remove() in late Python
+ # finalization.
+ *,
+ _os_remove=os.remove,
+ ):
+ # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
+ # blocks due to the need to close the temporary file to work on Windows
+ # properly.
+ fd, raw_path = tempfile.mkstemp(suffix=suffix)
+ try:
+ try:
+ os.write(fd, reader())
+ finally:
+ os.close(fd)
+ del reader
+ yield Path(raw_path)
+ finally:
+ try:
+ _os_remove(raw_path)
+ except FileNotFoundError:
+ pass
+
+ @no_type_check
+ def _temp_file(path):
+ return _tempfile(path.read_bytes, suffix=path.name)
+
+ @no_type_check
+ def _is_present_dir(path) -> bool:
+ """
+ Some Traversables implement ``is_dir()`` to raise an
+ exception (i.e. ``FileNotFoundError``) when the
+ directory doesn't exist. This function wraps that call
+ to always return a boolean and only return True
+ if there's a dir and it exists.
+ """
+ with contextlib.suppress(FileNotFoundError):
+ return path.is_dir()
+ return False
+
+ @no_type_check
+ @functools.singledispatch
+ def as_file(path):
+ """
+ Given a Traversable object, return that object as a
+ path on the local file system in a context manager.
+ """
+ return _temp_dir(path) if _is_present_dir(path) else _temp_file(path)
+
+ @no_type_check
+ @contextlib.contextmanager
+ def _temp_path(dir: tempfile.TemporaryDirectory):
+ """
+ Wrap tempfile.TemporyDirectory to return a pathlib object.
+ """
+ with dir as result:
+ yield Path(result)
+
+ @no_type_check
+ @contextlib.contextmanager
+ def _temp_dir(path):
+ """
+ Given a traversable dir, recursively replicate the whole tree
+ to the file system in a context manager.
+ """
+ assert path.is_dir()
+ with _temp_path(tempfile.TemporaryDirectory()) as temp_dir:
+ yield _write_contents(temp_dir, path)
+
+ @no_type_check
+ def _write_contents(target, source):
+ child = target.joinpath(source.name)
+ if source.is_dir():
+ child.mkdir()
+ for item in source.iterdir():
+ _write_contents(child, item)
+ else:
+ child.write_bytes(source.read_bytes())
+ return child
+
t = importlib.resources.files(mod)
- with importlib.resources.as_file(t) as p:
+ with as_file(t.joinpath(path)) as p:
yield p