- fedora
- rocky
- alma
- - gentoo
+ # gentoo (see https://github.com/systemd/mkosi/pull/1313#issuecomment-1406277198)
- opensuse
format:
- directory
EOF
- name: Build ${{ matrix.distro }}/${{ matrix.format }}
- run: sudo python3 -m mkosi build
+ run: python3 -m mkosi build
+
+ # systemd-resolved is enabled by default in Arch/Debian/Ubuntu (systemd default preset) but fails to
+ # start in a systemd-nspawn container with --private-users so we mask it out here to avoid CI failures.
+ # FIXME: Remove when Arch/Debian/Ubuntu ship systemd v253
+ - name: Mask systemd-resolved
+ if: matrix.format == 'directory'
+ run: sudo systemctl --root mkosi.output/${{ matrix.distro }}~*/image mask systemd-resolved
- name: Boot ${{ matrix.distro }}/${{ matrix.format }} systemd-nspawn
if: matrix.format == 'disk' || matrix.format == 'directory'
- name: Boot ${{ matrix.distro }}/${{ matrix.format }} UEFI
if: matrix.format == 'disk'
- run: sudo timeout -k 30 10m python3 -m mkosi qemu
+ run: timeout -k 30 10m python3 -m mkosi qemu
- name: Check ${{ matrix.distro }}/${{ matrix.format }} UEFI
if: matrix.format == 'disk' || matrix.format == 'directory'
## v15
-- Rename `--no-chown` to `--chown` and set it to default to `True`, preserving
- current behaviour.
-- Add `--idmap` option to run `--systemd-nspawn` with ID mapping support. Defaults
- to `True`. `--idmap=no` can be used to prevent usage of ID mapping.
- Migrated to systemd-repart. Many options are dropped in favor of specifying them directly
in repart partition definition files:
- Format=gpt_xxx options are replaced with a single "disk" options. Filesystem to use can now be specified with repart's Format= option
- Removed default kernel command line arguments `rhgb`, `selinux=0` and `audit=0`.
- Dropped --all and --all-directory as this functionality is better implemented by
using a build system.
+- mkosi now builds images without needing root privileges.
+- Removed `--no-chown`, `--idmap` and `--nspawn-keep-unit` options as they were made obsolete by moving to
+ rootless builds.
+- Removed `--source-file-transfer`, `--source-file-transfer-final`, `--source-resolve-symlinks` and
+ `--source-resolve-symlinks-final` in favor of always mounting the source directory into the build image.
+ `--source-file-transfer-final` might be reimplemented in the future using virtiofsd.
## v14
squashfs-tools \
btrfs-progs \
mtools \
- python3-pefile
+ python3-pefile \
+ bubblewrap
sudo pacman-key --init
sudo pacman-key --populate archlinux
sudo apt-get install libfdisk-dev
git clone https://github.com/systemd/systemd --depth=1
meson systemd/build systemd -Drepart=true -Defi=true
- ninja -C systemd/build systemd-nspawn systemd-dissect systemd-repart systemd-analyze bootctl ukify
- sudo ln -svf $PWD/systemd/build/systemd-nspawn /usr/bin/systemd-nspawn
- sudo ln -svf $PWD/systemd/build/systemd-dissect /usr/bin/systemd-dissect
+ ninja -C systemd/build systemd-nspawn systemd-repart bootctl ukify
sudo ln -svf $PWD/systemd/build/systemd-repart /usr/bin/systemd-repart
- sudo ln -svf $PWD/systemd/build/systemd-analyze /usr/bin/systemd-analyze
sudo ln -svf $PWD/systemd/build/bootctl /usr/bin/bootctl
sudo ln -svf $PWD/systemd/build/ukify /usr/bin/ukify
- systemd-nspawn --version
- systemd-dissect --version
systemd-repart --version
bootctl --version
+ ukify --version
- name: Install
shell: bash
image root, so any `CopyFiles=` source paths in partition definition files will
be relative to the image root directory.
-`NoChown=`, `--no-chown`
-
-: By default, if `mkosi` is run inside a `sudo` environment all
- generated artifacts have their UNIX user/group ownership changed to
- the user which invoked `sudo`. With this option this may be turned
- off and all generated files are owned by `root`.
-
`TarStripSELinuxContext=`, `--tar-strip-selinux-context`
: If running on a SELinux-enabled system (Fedora Linux, CentOS, Rocky Linux,
`BuildSources=`, `--build-sources=`
-: Takes a path to a source tree to copy into the development image, if
- the build script is used. This only applies if a build script is
- used, and defaults to the local directory. Use `SourceFileTransfer=`
- to configure how the files are transferred from the host to the
- container image.
+: Takes a path to a source tree to mount into the development image, if
+ the build script is used.
`BuildDirectory=`, `--build-dir=`
: Takes a path to an executable that is used as build script for this
image. If this option is used the build process will be two-phased
instead of single-phased. The specified script is copied onto the
- development image and executed inside an `systemd-nspawn` container
- environment. If this option is not used, but the `mkosi.build` file
- found in the local directory it is automatically used for this
- purpose (also see the "Files" section below). Specify an empty value
- to disable automatic detection.
+ development image and executed inside a namespaced chroot environment.
+ If this option is not used, but the `mkosi.build` file found in the
+ local directory it is automatically used for this purpose (also see
+ the "Files" section below). Specify an empty value to disable
+ automatic detection.
`PrepareScript=`, `--prepare-script=`
: Takes a path to an executable that is invoked inside the image right
after installing the software packages. It is the last step before
the image is cached (if incremental mode is enabled). This script
- is invoked inside a `systemd-nspawn` container environment, and thus
- does not have access to host resources. If this option is not used,
- but an executable script `mkosi.prepare` is found in the local
- directory, it is automatically used for this purpose. Specify an
- empty value to disable automatic detection.
+ is invoked inside a namespaced chroot environment, and thus does not
+ have access to host resources. If this option is not used, but an
+ executable script `mkosi.prepare` is found in the local directory, it
+ is automatically used for this purpose. Specify an empty value to
+ disable automatic detection.
`PostInstallationScript=`, `--postinst-script=`
: Takes a path to an executable that is invoked inside the final image
right after copying in the build artifacts generated in the first
- phase of the build. This script is invoked inside a `systemd-nspawn`
- container environment, and thus does not have access to host
- resources. If this option is not used, but an executable
- `mkosi.postinst` is found in the local directory, it is
- automatically used for this purpose. Specify an empty value to
- disable automatic detection.
+ phase of the build. This script is invoked inside a namespaced chroot
+ environment, and thus does not have access to host resources. If this
+ option is not used, but an executable `mkosi.postinst` is found in the
+ local directory, it is automatically used for this purpose. Specify an
+ empty value to disable automatic detection.
`FinalizeScript=`, `--finalize-script=`
automatically used for this purpose. Specify an empty value to
disable automatic detection.
-`SourceFileTransfer=`, `--source-file-transfer=`
-
-: Configures how the source file tree (as configured with
- `BuildSources=`) is transferred into the container image during the
- first phase of the build. Takes one of `copy-all` (to copy all files
- from the source tree), `copy-git-cached` (to copy only those files
- `git ls-files --cached` lists), `copy-git-others` (to copy only
- those files `git ls-files --others` lists), `mount` to bind mount
- the source tree directly. Defaults to `copy-git-cached` if a `git`
- source tree is detected, otherwise `copy-all`. When you specify
- `copy-git-more`, it is the same as `copy-git-cached`, except it also
- includes the `.git/` directory.
-
-`SourceFileTransferFinal=`, `--source-file-transfer-final=`
-
-: Same as `SourceFileTransfer=`, but for the final image instead of
- the build image. Takes the same values as `SourceFileFransfer=`
- except `mount`. By default, sources are not copied into the final
- image.
-
-`SourceResolveSymlinks=`, `--source-resolve-symlinks`
-
-: If given, any symbolic links in the source file tree are resolved and the
- file contents are copied to the build image. If not given, they are left as
- symbolic links. This only applies if `SourceFileTransfer=` is `copy-all`.
- Defaults to leaving them as symbolic links.
-
-`SourceResolveSymlinksFinal=`, `--source-resolve-symlinks-final`
-
-: Same as `SourceResolveSymlinks=`, but for the final image instead of
- the build image.
-
`WithNetwork=`, `--with-network`
: When true, enables network connectivity while the build script
: Space-delimited list of additional arguments to pass when invoking
qemu.
-`NspawnKeepUnit=`, `--nspawn-keep-unit`
-
-: When used, this option instructs underlying calls of systemd-nspawn to
- use the current unit scope, instead of creating a dedicated transcient
- scope unit for the containers. This option should be used when mkosi is
- run by a service unit.
-
`Netdev=`, `--netdev`
: When used with the boot or qemu verbs, this option creates a virtual
image. The *development* image is used to build the project in the
current working directory (the *source* tree). For that the whole
directory is copied into the image, along with the `mkosi.build`
- script. The script is then invoked inside the image (via
- `systemd-nspawn`), with `$SRCDIR` pointing to the *source*
- tree. `$DESTDIR` points to a directory where the script should place
- any files generated it would like to end up in the *final*
- image. Note that `make`/`automake`/`meson` based build systems
- generally honor `$DESTDIR`, thus making it very natural to build
- *source* trees from the build script. After the *development* image
- was built and the build script ran inside of it, it is removed
+ script. The script is then invoked inside the image, with `$SRCDIR`
+ pointing to the *source* tree. `$DESTDIR` points to a directory where
+ the script should place any files generated it would like to end up
+ in the *final* image. Note that `make`/`automake`/`meson` based build
+ systems generally honor `$DESTDIR`, thus making it very natural to
+ build *source* trees from the build script. After the *development*
+ image was built and the build script ran inside of it, it is removed
again. After that the *final* image is built, without any *source*
tree or build script copied in. However, this time the contents of
`$DESTDIR` are added into the image.
necessary dependencies. For example, on *Fedora Linux* you need:
```bash
-dnf install btrfs-progs apt debootstrap dosfstools mtools edk2-ovmf e2fsprogs squashfs-tools gnupg python3 tar xfsprogs xz zypper sbsigntools
+dnf install bubblewrap btrfs-progs apt debootstrap dosfstools mtools edk2-ovmf e2fsprogs squashfs-tools gnupg python3 tar xfsprogs xz zypper sbsigntools
```
On Debian/Ubuntu it might be necessary to install the `ubuntu-keyring`,
you want to build. `debootstrap` on Debian only pulls in the Debian keyring
on its own, and the version on Ubuntu only the one from Ubuntu.
-Note that the minimum required Python version is 3.7.
+Note that the minimum required Python version is 3.9.
# REFERENCES
* [Primary mkosi git repository on GitHub](https://github.com/systemd/mkosi/)
import configparser
import contextlib
import crypt
-import ctypes
-import ctypes.util
import dataclasses
import datetime
import errno
import os
import platform
import re
+import resource
import shlex
import shutil
import string
from collections.abc import Iterable, Iterator, Sequence
from pathlib import Path
from textwrap import dedent, wrap
-from typing import (
- TYPE_CHECKING,
- Any,
- BinaryIO,
- Callable,
- NoReturn,
- Optional,
- TextIO,
- TypeVar,
- Union,
- cast,
-)
+from typing import Any, Callable, NoReturn, Optional, TextIO, TypeVar, Union, cast
from mkosi.backend import (
- ARG_DEBUG,
Distribution,
ManifestFormat,
MkosiConfig,
- MkosiException,
- MkosiNotSupportedException,
- MkosiPrinter,
MkosiState,
OutputFormat,
- SourceFileTransfer,
Verb,
- chown_to_running_user,
+ current_user_uid_gid,
detect_distribution,
- die,
+ flatten,
+ format_rlimit,
is_centos_variant,
is_rpm_distribution,
- mkdirp_chown_current_user,
- nspawn_knows_arg,
- nspawn_rlimit_params,
- nspawn_version,
patch_file,
path_relative_to_cwd,
- run,
- run_workspace_command,
- scandir_recursive,
set_umask,
should_compress_output,
- spawn,
tmp_dir,
- warn,
)
from mkosi.install import (
add_dropin_config,
flock,
install_skeleton_trees,
)
+from mkosi.log import (
+ ARG_DEBUG,
+ MkosiException,
+ MkosiNotSupportedException,
+ MkosiPrinter,
+ die,
+ warn,
+)
from mkosi.manifest import Manifest
-from mkosi.mounts import dissect_and_mount, mount_bind, mount_overlay, mount_tmpfs
+from mkosi.mounts import dissect_and_mount, mount_bind, mount_overlay, scandir_recursive
from mkosi.remove import unlink_try_hard
+from mkosi.run import (
+ become_root,
+ fork_and_wait,
+ init_mount_namespace,
+ run,
+ run_workspace_command,
+ spawn,
+)
+from mkosi.types import PathString, TempDir
complete_step = MkosiPrinter.complete_step
color_error = MkosiPrinter.color_error
__version__ = "14"
-# These types are only generic during type checking and not at runtime, leading
-# to a TypeError during compilation.
-# Let's be as strict as we can with the description for the usage we have.
-if TYPE_CHECKING:
- CompletedProcess = subprocess.CompletedProcess[Any]
- TempDir = tempfile.TemporaryDirectory[str]
-else:
- CompletedProcess = subprocess.CompletedProcess
- TempDir = tempfile.TemporaryDirectory
-
-SomeIO = Union[BinaryIO, TextIO]
-PathString = Union[Path, str]
-
MKOSI_COMMANDS_NEED_BUILD = (Verb.shell, Verb.boot, Verb.qemu, Verb.serve)
-MKOSI_COMMANDS_SUDO = (Verb.build, Verb.clean, Verb.shell, Verb.boot)
+MKOSI_COMMANDS_SUDO = (Verb.shell, Verb.boot)
MKOSI_COMMANDS_CMDLINE = (Verb.build, Verb.shell, Verb.boot, Verb.qemu, Verb.ssh)
DRACUT_SYSTEMD_EXTRAS = [
MkosiPrinter.print_step(" ".join(shlex.quote(str(x)) for x in cmdline) + "\n")
-CLONE_NEWNS = 0x00020000
-
# EFI has its own conventions too
EFI_ARCHITECTURES = {
"x86_64": "x64",
}
-def unshare(flags: int) -> None:
- libc_name = ctypes.util.find_library("c")
- if libc_name is None:
- die("Could not find libc")
- libc = ctypes.CDLL(libc_name, use_errno=True)
-
- if libc.unshare(ctypes.c_int(flags)) != 0:
- e = ctypes.get_errno()
- raise OSError(e, os.strerror(e))
-
-
def format_bytes(num_bytes: int) -> str:
if num_bytes >= 1024 * 1024 * 1024:
return f"{num_bytes/1024**3 :0.1f}G"
return f"{num_bytes}B"
-@complete_step("Detaching namespace")
-def init_namespace() -> None:
- unshare(CLONE_NEWNS)
- run(["mount", "--make-rslave", "/"])
-
def setup_workspace(config: MkosiConfig) -> TempDir:
with complete_step("Setting up temporary workspace.", "Temporary workspace set up in {.name}") as output:
while str(p).startswith(str(config.build_sources)):
p = p.parent
- d = tempfile.TemporaryDirectory(dir=p, prefix=f"mkosi.{config.build_sources.name}.tmp")
+ d = tempfile.TemporaryDirectory(dir=p, prefix=f".mkosi.{config.build_sources.name}.tmp")
output.append(d)
return d
workdir = state.workspace / "workdir"
workdir.mkdir()
stack.enter_context(mount_overlay(base, state.root, workdir, state.root))
- else:
- # always have a root of the tree as a mount point so we can recursively unmount anything that
- # ends up mounted there.
- stack.enter_context(mount_bind(state.root))
-
- # Make sure /tmp and /run are not part of the image
- stack.enter_context(mount_tmpfs(state.root / "run"))
- stack.enter_context(mount_tmpfs(state.root / "tmp"))
if state.do_run_build_script and state.config.include_dir and not cached:
stack.enter_context(mount_bind(state.config.include_dir, state.root / "usr/include"))
etc_hostname.write_text(state.config.hostname + "\n")
-@contextlib.contextmanager
-def mount_cache(state: MkosiState) -> Iterator[None]:
- cache_paths = state.installer.cache_path()
-
- # We can't do this in mount_image() yet, as /var itself might have to be created as a subvolume first
- with complete_step("Mounting Package Cache", "Unmounting Package Cache"), contextlib.ExitStack() as stack:
- for cache_path in cache_paths:
- stack.enter_context(mount_bind(state.cache, state.root / cache_path))
- yield
-
-
def configure_dracut(state: MkosiState, cached: bool) -> None:
if not state.config.bootable or state.do_run_build_script or cached:
return
return
with complete_step("Setting up basic OS tree…"):
+ state.root.mkdir(mode=0o755, exist_ok=True)
# We need an initialized machine ID for the build & boot logic to work
state.root.joinpath("etc").mkdir(mode=0o755, exist_ok=True)
state.root.joinpath("etc/machine-id").write_text(f"{state.machine_id}\n")
state.root.joinpath("etc/kernel/install.conf").write_text("layout=bls\n")
-def flatten(lists: Iterable[Iterable[T]]) -> list[T]:
- """Flatten a sequence of sequences into a single list."""
- return list(itertools.chain.from_iterable(lists))
-
-
def clean_paths(
root: Path,
globs: Sequence[str],
if cached:
return
- with mount_cache(state):
- state.installer.install(state)
+ state.installer.install(state)
def remove_packages(state: MkosiState) -> None:
""")
-def nspawn_id_map_supported() -> bool:
- if nspawn_version() < 252:
- return False
-
- ret = run(["systemd-analyze", "compare-versions", platform.release(), ">=", "5.12"], check=False)
- return ret.returncode == 0
-
-
-def nspawn_params_for_build_sources(config: MkosiConfig, sft: SourceFileTransfer) -> list[str]:
- params = ["--setenv=SRCDIR=/root/src",
- "--chdir=/root/src"]
- if sft == SourceFileTransfer.mount:
- idmap_opt = ":rootidmap" if nspawn_id_map_supported() and config.idmap else ""
- params += [f"--bind={config.build_sources}:/root/src{idmap_opt}"]
-
- return params
+def cache_params(state: MkosiState, root: Path) -> list[PathString]:
+ return flatten(("--bind", state.config.cache_path, root / p) for p in state.installer.cache_path())
def run_prepare_script(state: MkosiState, cached: bool) -> None:
verb = "build" if state.do_run_build_script else "final"
- with mount_cache(state), complete_step("Running prepare script…"):
-
- # We copy the prepare script into the build tree. We'd prefer
- # mounting it into the tree, but for that we'd need a good
- # place to mount it to. But if we create that we might as well
- # just copy the file anyway.
-
- shutil.copy2(state.config.prepare_script, state.root / "root/prepare")
+ with complete_step("Running prepare script…"):
+ bwrap: list[PathString] = [
+ "--bind", state.config.build_sources, "/root/src",
+ "--bind", state.config.prepare_script, "/root/prepare",
+ *cache_params(state, Path("/")),
+ "--chdir", "/root/src",
+ ]
- nspawn_params = nspawn_params_for_build_sources(state.config, SourceFileTransfer.mount)
- run_workspace_command(state, ["/root/prepare", verb],
- network=True, nspawn_params=nspawn_params, env=state.environment)
+ run_workspace_command(state, ["/root/prepare", verb], network=True, bwrap_params=bwrap,
+ env=dict(SRCDIR="/root/src"))
srcdir = state.root / "root/src"
if srcdir.exists():
- os.rmdir(srcdir)
+ srcdir.rmdir()
- os.unlink(state.root / "root/prepare")
+ state.root.joinpath("root/prepare").unlink()
def run_postinst_script(state: MkosiState) -> None:
verb = "build" if state.do_run_build_script else "final"
- with mount_cache(state), complete_step("Running postinstall script…"):
-
- # We copy the postinst script into the build tree. We'd prefer
- # mounting it into the tree, but for that we'd need a good
- # place to mount it to. But if we create that we might as well
- # just copy the file anyway.
+ with complete_step("Running postinstall script…"):
+ bwrap: list[PathString] = [
+ "--bind", state.config.postinst_script, "/root/postinst",
+ *cache_params(state, Path("/")),
+ ]
- shutil.copy2(state.config.postinst_script, state.root / "root/postinst")
+ run_workspace_command(state, ["/root/postinst", verb], bwrap_params=bwrap,
+ network=state.config.with_network is True)
- run_workspace_command(state, ["/root/postinst", verb],
- network=(state.config.with_network is True), env=state.environment)
state.root.joinpath("root/postinst").unlink()
return
with complete_step("Installing boot loader…"):
- run(["bootctl", "install", "--root", state.root], env={"SYSTEMD_ESP_PATH": "/boot"})
+ run(["bootctl", "install", "--root", state.root], env={"SYSTEMD_ESP_PATH": "/boot", **os.environ})
def install_extra_trees(state: MkosiState) -> None:
with complete_step("Copying in extra file trees…"):
for tree in state.config.extra_trees:
if tree.is_dir():
- copy_path(tree, state.root)
+ copy_path(tree, state.root, preserve_owner=False)
else:
# unpack_archive() groks Paths, but mypy doesn't know this.
# Pretend that tree is a str.
os.chdir(c)
-def copy_git_files(src: Path, dest: Path, *, source_file_transfer: SourceFileTransfer) -> None:
- what_files = ["--exclude-standard", "--cached"]
- if source_file_transfer == SourceFileTransfer.copy_git_others:
- what_files += ["--others", "--exclude=.mkosi-*"]
-
- uid = int(os.getenv("SUDO_UID", 0))
-
- c = run(["git", "-C", src, "ls-files", "-z", *what_files], stdout=subprocess.PIPE, text=False, user=uid)
- files = {x.decode("utf-8") for x in c.stdout.rstrip(b"\0").split(b"\0")}
-
- # Add the .git/ directory in as well.
- if source_file_transfer == SourceFileTransfer.copy_git_more:
- top = os.path.join(src, ".git/")
- for path, _, filenames in os.walk(top):
- for filename in filenames:
- fp = os.path.join(path, filename) # full path
- fr = os.path.join(".git/", fp.removeprefix(top)) # relative to top
- files.add(fr)
-
- # Get submodule files
- c = run(["git", "-C", src, "submodule", "status", "--recursive"], stdout=subprocess.PIPE, text=True, user=uid)
- submodules = {x.split()[1] for x in c.stdout.splitlines()}
-
- # workaround for git ls-files returning the path of submodules that we will
- # still parse
- files -= submodules
-
- for sm in submodules:
- sm = Path(sm)
- c = run(
- ["git", "-C", src / sm, "ls-files", "-z"] + what_files,
- stdout=subprocess.PIPE,
- text=False,
- user=uid,
- )
- files |= {sm / x.decode("utf-8") for x in c.stdout.rstrip(b"\0").split(b"\0")}
- files -= submodules
-
- # Add the .git submodule file well.
- if source_file_transfer == SourceFileTransfer.copy_git_more:
- files.add(os.path.join(sm, ".git"))
-
- del c
-
- dest.mkdir(exist_ok=True)
-
- with chdir(src):
- run(["cp", "--parents", "--archive", "--reflink=auto", *files, dest])
-
-
-def install_build_src(state: MkosiState) -> None:
- if state.for_cache:
- return
-
- if state.do_run_build_script:
- if state.config.build_script is not None:
- with complete_step("Copying in build script…"):
- copy_path(state.config.build_script, state.root / "root" / state.config.build_script.name)
- else:
- return
-
- sft: Optional[SourceFileTransfer] = None
- resolve_symlinks: bool = False
- if state.do_run_build_script:
- sft = state.config.source_file_transfer
- resolve_symlinks = state.config.source_resolve_symlinks
- else:
- sft = state.config.source_file_transfer_final
- resolve_symlinks = state.config.source_resolve_symlinks_final
-
- if sft is None:
- return
-
- with complete_step("Copying in sources…"):
- target = state.root / "root/src"
-
- if sft in (
- SourceFileTransfer.copy_git_others,
- SourceFileTransfer.copy_git_cached,
- SourceFileTransfer.copy_git_more,
- ):
- copy_git_files(state.config.build_sources, target, source_file_transfer=sft)
- elif sft == SourceFileTransfer.copy_all:
- ignore = shutil.ignore_patterns(
- ".git",
- ".mkosi-*",
- "*.cache-pre-dev",
- "*.cache-pre-inst",
- f"{state.config.output_dir.name}/" if state.config.output_dir else "mkosi.output/",
- f"{state.config.workspace_dir.name}/" if state.config.workspace_dir else "mkosi.workspace/",
- f"{state.config.cache_path.name}/" if state.config.cache_path else "mkosi.cache/",
- f"{state.config.build_dir.name}/" if state.config.build_dir else "mkosi.builddir/",
- f"{state.config.include_dir.name}/" if state.config.include_dir else "mkosi.includedir/",
- f"{state.config.install_dir.name}/" if state.config.install_dir else "mkosi.installdir/",
- )
- shutil.copytree(state.config.build_sources, target, symlinks=not resolve_symlinks, ignore=ignore)
-
-
def install_build_dest(state: MkosiState) -> None:
if state.do_run_build_script:
return
return
with complete_step("Copying in build tree…"):
- copy_path(install_dir(state), state.root)
+ # The build is executed as a regular user, so we don't want to copy ownership in this scenario.
+ copy_path(install_dir(state), state.root, preserve_owner=False)
def xz_binary() -> str:
)
-def compress_output(config: MkosiConfig, src: Path) -> None:
+def compress_output(config: MkosiConfig, src: Path, uid: int, gid: int) -> None:
compress = should_compress_output(config)
if not src.is_file():
if not compress:
# If we shan't compress, then at least make the output file sparse
with complete_step(f"Digging holes into output file {src}…"):
- run(["fallocate", "--dig-holes", src])
+ run(["fallocate", "--dig-holes", src], user=uid, group=gid)
else:
with complete_step(f"Compressing output file {src}…"):
- run(compressor_command(compress, src))
+ run(compressor_command(compress, src), user=uid, group=gid)
def qcow2_output(state: MkosiState) -> None:
run(cmdline)
+def acl_toggle_remove(root: Path, uid: int, *, allow: bool) -> None:
+ ret = run(
+ [
+ "setfacl",
+ "--physical",
+ "--modify" if allow else "--remove",
+ f"user:{uid}:rwx" if allow else f"user:{uid}",
+ "-",
+ ],
+ check=False,
+ text=True,
+ # Supply files via stdin so we don't clutter --debug run output too much
+ input="\n".join([str(root), *(e.path for e in cast(Iterator[os.DirEntry[str]], scandir_recursive(root)) if e.is_dir())])
+ )
+ if ret.returncode != 0:
+ warn("Failed to set ACLs, you'll need root privileges to remove some generated files/directories")
+
+
def save_cache(state: MkosiState) -> None:
cache = cache_tree_path(state.config, is_final_image=False) if state.do_run_build_script else cache_tree_path(state.config, is_final_image=True)
with complete_step("Installing cache copy…", f"Installed cache copy {path_relative_to_cwd(cache)}"):
unlink_try_hard(cache)
shutil.move(state.root, cache)
-
- if state.config.chown:
- chown_to_running_user(cache)
+ acl_toggle_remove(cache, state.uid, allow=True)
def dir_size(path: PathString) -> int:
cache = workspace / "cache"
else:
cache = config.cache_path
- mkdirp_chown_current_user(cache, chown=config.chown, mode=0o755)
return cache
"""
lines = text.splitlines()
subindent = ' ' if lines[0].endswith(':') else ''
- return list(itertools.chain.from_iterable(wrap(line, width,
- break_long_words=False, break_on_hyphens=False,
- subsequent_indent=subindent) for line in lines))
+ return flatten(wrap(line, width, break_long_words=False, break_on_hyphens=False,
+ subsequent_indent=subindent) for line in lines)
class ArgumentParserMkosi(argparse.ArgumentParser):
return parse_boolean(value)
-def parse_source_file_transfer(value: str) -> Optional[SourceFileTransfer]:
- if value == "":
- return None
- try:
- return SourceFileTransfer(value)
- except Exception as exp:
- raise argparse.ArgumentTypeError(str(exp))
-
-
def parse_base_packages(value: str) -> Union[str, bool]:
if value == "conditional":
return value
group.add_argument("--hostname", help="Set hostname")
group.add_argument("--image-version", help="Set version for image")
group.add_argument("--image-id", help="Set ID for image")
- group.add_argument(
- "--chown",
- metavar="BOOL",
- action=BooleanAction,
- default=True,
- help="When running with sudo, reassign ownership of the generated files to the original user",
- ) # NOQA: E501
- group.add_argument(
- "--idmap",
- metavar="BOOL",
- action=BooleanAction,
- default=True,
- help="Use systemd-nspawn's rootidmap option for bind-mounted directories.",
- )
group.add_argument(
"--tar-strip-selinux-context",
metavar="BOOL",
type=script_path,
metavar="PATH",
)
- group.add_argument(
- "--source-file-transfer",
- type=parse_source_file_transfer,
- choices=[*list(SourceFileTransfer), None],
- metavar="METHOD",
- default=None,
- help='\n'.join(('How to copy build sources to the build image:',
- *(f"'{k}': {v}" for k, v in SourceFileTransfer.doc().items()),
- '(default: copy-git-others if in a git repository, otherwise copy-all)')),
- )
- group.add_argument(
- "--source-file-transfer-final",
- type=parse_source_file_transfer,
- choices=[*list(SourceFileTransfer), None],
- metavar="METHOD",
- default=None,
- help='\n'.join(('How to copy build sources to the final image:',
- *(f"'{k}': {v}" for k, v in SourceFileTransfer.doc().items()
- if k != SourceFileTransfer.mount),
- '(default: None)')),
- )
- group.add_argument(
- "--source-resolve-symlinks",
- metavar="BOOL",
- action=BooleanAction,
- help=("If true, symbolic links in the build sources are followed and the "
- "file contents copied to the build image. If false, they are left as "
- "symbolic links. "
- "Only applies if --source-file-transfer-final is set to 'copy-all'.\n"
- "(default: false)"),
- )
- group.add_argument(
- "--source-resolve-symlinks-final",
- metavar="BOOL",
- action=BooleanAction,
- help=("If true, symbolic links in the build sources are followed and the "
- "file contents copied to the final image. If false, they are left as "
- "symbolic links in the final image. "
- "Only applies if --source-file-transfer-final is set to 'copy-all'.\n"
- "(default: false)"),
- )
group.add_argument(
"--with-network",
action=WithNetworkAction,
# arguments.
help=argparse.SUPPRESS,
)
- group.add_argument(
- "--nspawn-keep-unit",
- metavar="BOOL",
- action=BooleanAction,
- help="If specified, underlying systemd-nspawn containers use the resources of the current unit.",
- )
group.add_argument(
"--network-veth", # Compatibility option
dest="netdev",
def load_args(args: argparse.Namespace) -> MkosiConfig:
- global ARG_DEBUG
ARG_DEBUG.update(args.debug)
args_find_path(args, "nspawn_settings", "mkosi.nspawn")
if args.qemu_headless and not any("loglevel" in x for x in args.kernel_command_line):
args.kernel_command_line.append("loglevel=4")
- if args.source_file_transfer is None:
- if os.path.exists(".git") or args.build_sources.joinpath(".git").exists():
- args.source_file_transfer = SourceFileTransfer.copy_git_others
- else:
- args.source_file_transfer = SourceFileTransfer.copy_all
-
- if args.source_file_transfer_final == SourceFileTransfer.mount and args.verb == Verb.qemu:
- die("Sorry, --source-file-transfer-final=mount is not supported when booting in QEMU")
-
if args.skip_final_phase and args.verb != Verb.build:
die("--skip-final-phase can only be used when building an image using 'mkosi build'", MkosiNotSupportedException)
print(" Remove Packages:", line_join_list(config.remove_packages))
print(" Build Sources:", config.build_sources)
- print(" Source File Transfer:", none_to_none(config.source_file_transfer))
- print("Source File Transfer Final:", none_to_none(config.source_file_transfer_final))
print(" Build Directory:", none_to_none(config.build_dir))
print(" Include Directory:", none_to_none(config.include_dir))
print(" Install Directory:", none_to_none(config.install_dir))
print(" Netdev:", yes_no(config.netdev))
-def make_output_dir(config: MkosiConfig) -> None:
+def make_output_dir(state: MkosiState) -> None:
"""Create the output directory if set and not existing yet"""
- if config.output_dir is None:
+ if state.config.output_dir is None:
return
- mkdirp_chown_current_user(config.output_dir, chown=config.chown, mode=0o755)
+ run(["mkdir", "-p", state.config.output_dir], user=state.uid, group=state.gid)
-def make_build_dir(config: MkosiConfig) -> None:
+def make_build_dir(state: MkosiState) -> None:
"""Create the build directory if set and not existing yet"""
- if config.build_dir is None:
+ if state.config.build_dir is None:
return
- mkdirp_chown_current_user(config.build_dir, chown=config.chown, mode=0o755)
+ run(["mkdir", "-p", state.config.build_dir], user=state.uid, group=state.gid)
-def make_cache_dir(config: MkosiConfig) -> None:
- """Create the output directory if set and not existing yet"""
- # TODO: mypy complains that having the same structure as above, makes the
- # return on None unreachable code. I can't see right now, why it *should* be
- # unreachable, so invert the structure here to be on the safe side.
- if config.cache_path is not None:
- mkdirp_chown_current_user(config.cache_path, chown=config.chown, mode=0o755)
+def make_cache_dir(state: MkosiState) -> None:
+ """Create the cache directory if set and not existing yet"""
+ run(["mkdir", "-p", state.config.cache_path], user=state.uid, group=state.gid)
-def configure_ssh(state: MkosiState, cached: bool) -> None:
- if state.do_run_build_script or not state.config.ssh:
+def make_install_dir(state: MkosiState) -> None:
+ # If no install directory is configured, it'll be located in the workspace which is owned by root in the
+ # userns so we have to run as the same user.
+ run(["mkdir", "-p", install_dir(state)],
+ user=state.uid if state.config.install_dir else 0,
+ group=state.gid if state.config.install_dir else 0)
+ # Make sure the install dir is always owned by the user running mkosi since the build will be running as
+ # the same user and needs to be able to write files here.
+ os.chown(install_dir(state), state.uid, state.gid)
+
+
+def configure_ssh(state: MkosiState) -> None:
+ if state.do_run_build_script or state.for_cache or not state.config.ssh:
return
if state.config.distribution in (Distribution.debian, Distribution.ubuntu):
else:
unit = "sshd"
- # We cache the enable sshd step but not the keygen step because it creates a separate file on the host
- # which introduces non-trivial issue when trying to cache it.
-
- if not cached:
- run(["systemctl", "--root", state.root, "enable", unit])
-
- if state.for_cache:
- return
+ run(["systemctl", "--root", state.root, "enable", unit])
authorized_keys = state.root / "root/.ssh/authorized_keys"
if state.config.ssh_key:
- copy_path(Path(f"{state.config.ssh_key}.pub"), authorized_keys)
+ copy_path(Path(f"{state.config.ssh_key}.pub"), authorized_keys, preserve_owner=False)
elif state.config.ssh_agent is not None:
- env = {"SSH_AUTH_SOCK": state.config.ssh_agent}
+ env = {"SSH_AUTH_SOCK": str(state.config.ssh_agent), **os.environ}
result = run(["ssh-add", "-L"], env=env, text=True, stdout=subprocess.PIPE)
authorized_keys.write_text(result.stdout)
else:
input="y\n",
text=True,
stdout=subprocess.DEVNULL,
+ user=state.uid,
+ group=state.gid,
)
authorized_keys.parent.mkdir(parents=True, exist_ok=True)
- copy_path(p.with_suffix(".pub"), authorized_keys)
+ copy_path(p.with_suffix(".pub"), authorized_keys, preserve_owner=False)
os.remove(p.with_suffix(".pub"))
authorized_keys.chmod(0o600)
with complete_step(f"Basing off cached tree {cache}", "Copied cached tree"):
copy_path(cache, state.root)
+ acl_toggle_remove(state.root, state.uid, allow=False)
return True
if state.config.build_script is None and state.do_run_build_script:
return
- make_build_dir(state.config)
-
cached = reuse_cache_tree(state)
if state.for_cache and cached:
return
configure_dracut(state, cached)
configure_netdev(state, cached)
run_prepare_script(state, cached)
- install_build_src(state)
install_build_dest(state)
install_extra_trees(state)
run_kernel_install(state, cached)
install_boot_loader(state)
- configure_ssh(state, cached)
+ configure_ssh(state)
run_postinst_script(state)
run_preset_all(state)
secure_boot_configure_auto_enroll(state)
if state.config.build_script is None:
return
- idmap_opt = ":rootidmap" if nspawn_id_map_supported() and state.config.idmap else ""
-
with complete_step("Running build script…"):
- os.makedirs(install_dir(state), mode=0o755, exist_ok=True)
-
- with_network = 1 if state.config.with_network is True else 0
-
- cmdline = [
- "systemd-nspawn",
- "--quiet",
- f"--directory={state.root}",
- f"--machine=mkosi-{uuid.uuid4().hex}",
- "--as-pid2",
- "--link-journal=no",
- "--register=no",
- f"--bind={install_dir(state)}:/root/dest{idmap_opt}",
- f"--bind={state.var_tmp()}:/var/tmp{idmap_opt}",
- f"--setenv=WITH_DOCS={one_zero(state.config.with_docs)}",
- f"--setenv=WITH_TESTS={one_zero(state.config.with_tests)}",
- f"--setenv=WITH_NETWORK={with_network}",
- "--setenv=DESTDIR=/root/dest",
- *nspawn_rlimit_params(),
+ # Bubblewrap creates bind mount point parent directories with restrictive permissions so we create
+ # the work directory outselves here.
+ state.root.joinpath("work").mkdir(mode=0o755)
+
+ bwrap: list[PathString] = [
+ "--bind", state.config.build_sources, "/work/src",
+ "--bind", state.config.build_script, f"/work/{state.config.build_script.name}",
+ "--bind", install_dir(state), "/work/dest",
+ "--chdir", "/work/src",
]
- # TODO: Use --autopipe once systemd v247 is widely available.
- console_arg = f"--console={'interactive' if sys.stdout.isatty() else 'pipe'}"
- if nspawn_knows_arg(console_arg):
- cmdline += [console_arg]
+ env = dict(
+ WITH_DOCS=one_zero(state.config.with_docs),
+ WITH_TESTS=one_zero(state.config.with_tests),
+ WITH_NETWORK=one_zero(state.config.with_network is True),
+ SRCDIR="/work/src",
+ DESTDIR="/work/dest",
+ )
if state.config.config_path is not None:
- cmdline += [
- f"--setenv=MKOSI_CONFIG={state.config.config_path}",
- f"--setenv=MKOSI_DEFAULT={state.config.config_path}"
- ]
-
- cmdline += nspawn_params_for_build_sources(state.config, state.config.source_file_transfer)
+ env |= dict(
+ MKOSI_CONFIG=str(state.config.config_path),
+ MKOSI_DEFAULT=str(state.config.config_path),
+ )
if state.config.build_dir is not None:
- cmdline += ["--setenv=BUILDDIR=/root/build",
- f"--bind={state.config.build_dir}:/root/build{idmap_opt}"]
+ bwrap += ["--bind", state.config.build_dir, "/work/build"]
+ env |= dict(BUILDDIR="/work/build")
if state.config.include_dir is not None:
- cmdline += [f"--bind={state.config.include_dir}:/usr/include{idmap_opt}"]
-
- if state.config.with_network is True:
- # If we're using the host network namespace, use the same resolver
- cmdline += ["--bind-ro=/etc/resolv.conf"]
- else:
- cmdline += ["--private-network"]
-
- if state.config.nspawn_keep_unit:
- cmdline += ["--keep-unit"]
+ bwrap += ["--bind", state.config.include_dir, "/usr/include"]
- cmdline += [f"--setenv={env}={value}" for env, value in state.environment.items()]
-
- cmdline += [f"/root/{state.config.build_script.name}"]
-
- # When we're building the image because it's required for another verb, any passed arguments are most
- # likely intended for the target verb, and not for "build", so don't add them in that case.
+ cmd = ["setpriv", f"--reuid={state.uid}", f"--regid={state.gid}", "--clear-groups", f"/work/{state.config.build_script.name}"]
+ # When we're building the image because it's required for another verb, any passed arguments are
+ # most likely intended for the target verb, and not for "build", so don't add them in that case.
if state.config.verb == Verb.build:
- cmdline += state.config.cmdline
+ cmd += state.config.cmdline
+
+ # build-script output goes to stdout so we can run language servers from within mkosi
+ # build-scripts. See https://github.com/systemd/mkosi/pull/566 for more information.
+ run_workspace_command(state, cmd, network=state.config.with_network is True, bwrap_params=bwrap,
+ stdout=sys.stdout, env=env)
- # build-script output goes to stdout so we can run language servers from within mkosi build-scripts.
- # See https://github.com/systemd/mkosi/pull/566 for more information.
- result = run(cmdline, stdout=sys.stdout, check=False)
- if result.returncode != 0:
- if "build-script" in ARG_DEBUG:
- run(cmdline[:-1], check=False)
- die(f"Build script returned non-zero exit code {result.returncode}.")
+ state.root.joinpath("work/dest").rmdir()
+ state.root.joinpath("work/src").rmdir()
+ state.root.joinpath("work/build").rmdir()
+ state.root.joinpath("work").joinpath(state.config.build_script.name).unlink()
+ state.root.joinpath("work").rmdir()
def need_cache_trees(state: MkosiState) -> bool:
unlink_try_hard(state.var_tmp())
-def build_stuff(config: MkosiConfig) -> None:
- make_output_dir(config)
- make_cache_dir(config)
+def build_stuff(uid: int, gid: int, config: MkosiConfig) -> None:
workspace = setup_workspace(config)
workspace_dir = Path(workspace.name)
cache = setup_package_cache(config, workspace_dir)
+ state = MkosiState(
+ uid=uid,
+ gid=gid,
+ config=config,
+ workspace=workspace_dir,
+ cache=cache,
+ do_run_build_script=False,
+ machine_id=config.machine_id or uuid.uuid4().hex,
+ for_cache=False,
+ )
+
manifest = Manifest(config)
+ make_output_dir(state)
+ make_cache_dir(state)
+ make_install_dir(state)
+ make_build_dir(state)
+
# Make sure tmpfiles' aging doesn't interfere with our workspace
# while we are working on it.
with flock(workspace_dir):
- state = MkosiState(
- config=config,
- workspace=workspace_dir,
- cache=cache,
- do_run_build_script=False,
- machine_id=config.machine_id or uuid.uuid4().hex,
- for_cache=False,
- )
-
# If caching is requested, then make sure we have cache trees around we can make use of
if need_cache_trees(state):
calculate_signature(state)
save_manifest(state, manifest)
+ if state.config.cache_path:
+ acl_toggle_remove(state.config.cache_path, state.uid, allow=True)
+
for p in state.config.output_paths():
if state.staging.joinpath(p.name).exists():
shutil.move(state.staging / p.name, p)
+ if p != state.config.output or state.config.output_format != OutputFormat.directory:
+ os.chown(p, state.uid, state.gid)
+ else:
+ acl_toggle_remove(p, uid, allow=True)
if p in (state.config.output, state.config.output_split_kernel):
- compress_output(state.config, p)
- if state.config.chown and p.exists():
- chown_to_running_user(p)
+ compress_output(state.config, p, uid=state.uid, gid=state.gid)
for p in state.staging.iterdir():
shutil.move(p, state.config.output.parent / p.name)
+ os.chown(state.config.output.parent / p.name, state.uid, state.gid)
if p.name.startswith(state.config.output.name):
- compress_output(state.config, p)
+ compress_output(state.config, p, uid=state.uid, gid=state.gid)
def check_root() -> None:
die("Must be invoked as root.")
-def check_native(config: MkosiConfig) -> None:
- if not config.architecture_is_native() and config.build_script and nspawn_version() < 250:
- die("Cannot (currently) override the architecture and run build commands")
-
-
@contextlib.contextmanager
def suppress_stacktrace() -> Iterator[None]:
try:
return True
+def nspawn_knows_arg(arg: str) -> bool:
+ # Specify some extra incompatible options so nspawn doesn't try to boot a container in the current
+ # directory if it has a compatible layout.
+ return "unrecognized option" not in run(["systemd-nspawn", arg,
+ "--directory", "/dev/null", "--image", "/dev/null"],
+ stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, check=False,
+ text=True).stderr
+
+
def run_shell(config: MkosiConfig) -> None:
+ cmdline: list[PathString] = ["systemd-nspawn", "--quiet"]
+
if config.output_format in (OutputFormat.directory, OutputFormat.subvolume):
- target = f"--directory={config.output}"
- else:
- target = f"--image={config.output}"
+ cmdline += ["--directory", config.output]
- cmdline = ["systemd-nspawn", "--quiet", target]
+ owner = os.stat(config.output).st_uid
+ if owner != 0:
+ cmdline += [f"--private-users={str(owner)}"]
+ else:
+ cmdline += ["--image", config.output]
# If we copied in a .nspawn file, make sure it's actually honoured
if config.nspawn_settings is not None:
if config.verb == Verb.boot:
cmdline += ["--boot"]
else:
- cmdline += nspawn_rlimit_params()
+ cmdline += [f"--rlimit=RLIMIT_CORE={format_rlimit(resource.RLIMIT_CORE)}"]
# Redirecting output correctly when not running directly from the terminal.
console_arg = f"--console={'interactive' if sys.stdout.isatty() else 'pipe'}"
cmdline += ["--ephemeral"]
cmdline += ["--machine", machine_name(config)]
-
- if config.nspawn_keep_unit:
- cmdline += ["--keep-unit"]
-
- if config.source_file_transfer_final == SourceFileTransfer.mount:
- cmdline += [f"--bind={config.build_sources}:/root/src", "--chdir=/root/src"]
+ cmdline += [f"--bind={config.build_sources}:/root/src", "--chdir=/root/src"]
for k, v in config.credentials.items():
cmdline += [f"--set-credential={k}:{v}"]
cmdline += ["--"]
cmdline += config.cmdline
- run(cmdline)
+ uid, _ = current_user_uid_gid()
+
+ if config.output_format == OutputFormat.directory:
+ acl_toggle_remove(config.output, uid, allow=False)
+
+ try:
+ run(cmdline)
+ finally:
+ if config.output_format == OutputFormat.directory:
+ acl_toggle_remove(config.output, uid, allow=True)
def find_qemu_binary(config: MkosiConfig) -> str:
return generate_secure_boot_key(config)
if config.verb == Verb.bump:
- bump_image_version(config)
+ return bump_image_version(config)
+
+ if config.verb == Verb.summary:
+ return print_summary(config)
if config.verb in MKOSI_COMMANDS_SUDO:
check_root()
check_outputs(config)
if needs_build(config) or config.verb == Verb.clean:
- check_root()
unlink_output(config)
- if config.verb == Verb.summary:
- print_summary(config)
-
if needs_build(config):
- check_native(config)
- init_namespace()
- build_stuff(config)
+ def target() -> None:
+ # Get the user UID/GID either on the host or in the user namespace running the build
+ uid, gid = become_root() if os.getuid() != 0 else current_user_uid_gid()
+ init_mount_namespace()
+ build_stuff(uid, gid, config)
+
+ # We only want to run the build in a user namespace but not the following steps. Since we can't
+ # rejoin the parent user namespace after unsharing from it, let's run the build in a fork so that
+ # the main process does not leave its user namespace.
+ fork_and_wait(target)
if config.auto_bump:
bump_image_version(config)
from subprocess import CalledProcessError
from mkosi import parse_args, run_verb
-from mkosi.backend import MkosiException, die
+from mkosi.log import MkosiException, die
+from mkosi.run import excepthook
@contextlib.contextmanager
if __name__ == "__main__":
+ sys.excepthook = excepthook
main()
import argparse
import ast
-import collections
import contextlib
import dataclasses
import enum
import functools
import importlib
+import itertools
import os
import platform
import pwd
import re
import resource
-import shlex
import shutil
-import signal
-import subprocess
import sys
import tarfile
-import uuid
-from collections.abc import Iterable, Iterator, Mapping, Sequence
+from collections.abc import Iterable, Iterator, Sequence
from pathlib import Path
-from types import FrameType
-from typing import (
- IO,
- TYPE_CHECKING,
- Any,
- Callable,
- Deque,
- NoReturn,
- Optional,
- TypeVar,
- Union,
- cast,
-)
+from typing import Any, Callable, Optional, TypeVar, Union, cast
from mkosi.distributions import DistributionInstaller
+from mkosi.log import MkosiException, die
T = TypeVar("T")
V = TypeVar("V")
-PathString = Union[Path, str]
-
-
-def shell_join(cmd: Sequence[PathString]) -> str:
- return " ".join(shlex.quote(str(x)) for x in cmd)
@contextlib.contextmanager
return ((x + step - 1) // step) * step
-# These types are only generic during type checking and not at runtime, leading
-# to a TypeError during compilation.
-# Let's be as strict as we can with the description for the usage we have.
-if TYPE_CHECKING:
- CompletedProcess = subprocess.CompletedProcess[Any]
- Popen = subprocess.Popen[Any]
-else:
- CompletedProcess = subprocess.CompletedProcess
- Popen = subprocess.Popen
-
-
-class MkosiException(Exception):
- """Leads to sys.exit"""
-
-
-class MkosiNotSupportedException(MkosiException):
- """Leads to sys.exit when an invalid combination of parsed arguments happens"""
-
-
-# This global should be initialized after parsing arguments
-ARG_DEBUG: set[str] = set()
-
-
class Parseable:
"A mix-in to provide conversions for argparse"
)
-class SourceFileTransfer(enum.Enum):
- copy_all = "copy-all"
- copy_git_cached = "copy-git-cached"
- copy_git_others = "copy-git-others"
- copy_git_more = "copy-git-more"
- mount = "mount"
-
- def __str__(self) -> str:
- return self.value
-
- @classmethod
- def doc(cls) -> dict["SourceFileTransfer", str]:
- return {
- cls.copy_all: "normal file copy",
- cls.copy_git_cached: "use git ls-files --cached, ignoring any file that git itself ignores",
- cls.copy_git_others: "use git ls-files --others, ignoring any file that git itself ignores",
- cls.copy_git_more: "use git ls-files --cached, ignoring any file that git itself ignores, but include the .git/ directory",
- cls.mount: "bind mount source files into the build image",
- }
-
-
class OutputFormat(Parseable, enum.Enum):
directory = enum.auto()
subvolume = enum.auto()
image_version: Optional[str]
image_id: Optional[str]
hostname: Optional[str]
- chown: bool
- idmap: bool
tar_strip_selinux_context: bool
incremental: bool
cache_initrd: bool
prepare_script: Optional[Path]
postinst_script: Optional[Path]
finalize_script: Optional[Path]
- source_file_transfer: SourceFileTransfer
- source_file_transfer_final: Optional[SourceFileTransfer]
- source_resolve_symlinks: bool
- source_resolve_symlinks_final: bool
with_network: Union[bool, str]
nspawn_settings: Optional[Path]
base_image: Optional[Path]
qemu_kvm: bool
qemu_args: Sequence[str]
- # systemd-nspawn specific options
- nspawn_keep_unit: bool
-
passphrase: Optional[Path]
def architecture_is_native(self) -> bool:
class MkosiState:
"""State related properties."""
+ uid: int
+ gid: int
config: MkosiConfig
workspace: Path
cache: Path
return root.parent
-def nspawn_knows_arg(arg: str) -> bool:
- # Specify some extra incompatible options so nspawn doesn't try to boot a container in the current
- # directory if it has a compatible layout.
- return "unrecognized option" not in run(["systemd-nspawn", arg,
- "--directory", "/dev/null", "--image", "/dev/null"],
- stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, check=False,
- text=True).stderr
-
-
def format_rlimit(rlimit: int) -> str:
limits = resource.getrlimit(rlimit)
soft = "infinity" if limits[0] == resource.RLIM_INFINITY else str(limits[0])
return f"{soft}:{hard}"
-def nspawn_rlimit_params() -> Sequence[str]:
- return [
- f"--rlimit=RLIMIT_CORE={format_rlimit(resource.RLIMIT_CORE)}",
- ] if nspawn_knows_arg("--rlimit") else []
-
-
-def nspawn_version() -> int:
- return int(run(["systemd-nspawn", "--version"], stdout=subprocess.PIPE).stdout.strip().split()[1])
-
-
-def run_workspace_command(
- state: MkosiState,
- cmd: Sequence[PathString],
- network: bool = False,
- env: Optional[Mapping[str, str]] = None,
- nspawn_params: Optional[list[str]] = None,
- capture_stdout: bool = False,
- check: bool = True,
-) -> CompletedProcess:
- nspawn = [
- "systemd-nspawn",
- "--quiet",
- f"--directory={state.root}",
- "--machine=mkosi-" + uuid.uuid4().hex,
- "--as-pid2",
- "--link-journal=no",
- "--register=no",
- f"--bind={state.var_tmp()}:/var/tmp",
- "--setenv=SYSTEMD_OFFLINE=1",
- *nspawn_rlimit_params(),
- ]
- stdout = None
-
- if network:
- # If we're using the host network namespace, use the same resolver
- nspawn += ["--bind-ro=/etc/resolv.conf"]
- else:
- nspawn += ["--private-network"]
-
- if env:
- nspawn += [f"--setenv={k}={v}" for k, v in env.items()]
- if "workspace-command" in ARG_DEBUG:
- nspawn += ["--setenv=SYSTEMD_LOG_LEVEL=debug"]
-
- if nspawn_params:
- nspawn += nspawn_params
-
- if capture_stdout:
- stdout = subprocess.PIPE
- nspawn += ["--console=pipe"]
-
- if state.config.nspawn_keep_unit:
- nspawn += ["--keep-unit"]
-
- try:
- return run([*nspawn, "--", *cmd], check=check, stdout=stdout, text=capture_stdout)
- except subprocess.CalledProcessError as e:
- if "workspace-command" in ARG_DEBUG:
- run(nspawn, check=False)
- die(f"Workspace command {shell_join(cmd)} returned non-zero exit code {e.returncode}.")
-
-
-@contextlib.contextmanager
-def do_delay_interrupt() -> Iterator[None]:
- # CTRL+C is sent to the entire process group. We delay its handling in mkosi itself so the subprocess can
- # exit cleanly before doing mkosi's cleanup. If we don't do this, we get device or resource is busy
- # errors when unmounting stuff later on during cleanup. We only delay a single CTRL+C interrupt so that a
- # user can always exit mkosi even if a subprocess hangs by pressing CTRL+C twice.
- interrupted = False
-
- def handler(signal: int, frame: Optional[FrameType]) -> None:
- nonlocal interrupted
- if interrupted:
- raise KeyboardInterrupt()
- else:
- interrupted = True
-
- s = signal.signal(signal.SIGINT, handler)
-
- try:
- yield
- finally:
- signal.signal(signal.SIGINT, s)
-
- if interrupted:
- die("Interrupted")
-
-
-@contextlib.contextmanager
-def do_noop() -> Iterator[None]:
- yield
-
-
-# Borrowed from https://github.com/python/typeshed/blob/3d14016085aed8bcf0cf67e9e5a70790ce1ad8ea/stdlib/3/subprocess.pyi#L24
-_FILE = Union[None, int, IO[Any]]
-
-
-def spawn(
- cmdline: Sequence[PathString],
- delay_interrupt: bool = True,
- stdout: _FILE = None,
- stderr: _FILE = None,
- **kwargs: Any,
-) -> Popen:
- if "run" in ARG_DEBUG:
- MkosiPrinter.info(f"+ {shell_join(cmdline)}")
-
- if not stdout and not stderr:
- # Unless explicit redirection is done, print all subprocess
- # output on stderr, since we do so as well for mkosi's own
- # output.
- stdout = sys.stderr
-
- cm = do_delay_interrupt if delay_interrupt else do_noop
- try:
- with cm():
- return subprocess.Popen(cmdline, stdout=stdout, stderr=stderr, **kwargs)
- except FileNotFoundError:
- die(f"{cmdline[0]} not found in PATH.")
-
-
-def run(
- cmdline: Sequence[PathString],
- check: bool = True,
- delay_interrupt: bool = True,
- stdout: _FILE = None,
- stderr: _FILE = None,
- env: Mapping[str, Any] = {},
- **kwargs: Any,
-) -> CompletedProcess:
- cmdline = [os.fspath(x) for x in cmdline]
-
- if "run" in ARG_DEBUG:
- MkosiPrinter.info(f"+ {shell_join(cmdline)}")
-
- if not stdout and not stderr:
- # Unless explicit redirection is done, print all subprocess
- # output on stderr, since we do so as well for mkosi's own
- # output.
- stdout = sys.stderr
-
- cm = do_delay_interrupt if delay_interrupt else do_noop
- try:
- with cm():
- return subprocess.run(cmdline, check=check, stdout=stdout, stderr=stderr, env={**os.environ, **env}, **kwargs)
- except FileNotFoundError:
- die(f"{cmdline[0]} not found in PATH.")
-
-
def tmp_dir() -> Path:
path = os.environ.get("TMPDIR") or "/var/tmp"
return Path(path)
return path
-def die(message: str, exception: type[MkosiException] = MkosiException) -> NoReturn:
- MkosiPrinter.warn(f"Error: {message}")
- raise exception(message)
-
-
-def warn(message: str) -> None:
- MkosiPrinter.warn(f"Warning: {message}")
-
-
-class MkosiPrinter:
- out_file = sys.stderr
- isatty = out_file.isatty()
-
- bold = "\033[0;1;39m" if isatty else ""
- red = "\033[31;1m" if isatty else ""
- reset = "\033[0m" if isatty else ""
-
- prefix = "‣ "
-
- level = 0
-
- @classmethod
- def _print(cls, text: str) -> None:
- cls.out_file.write(text)
-
- @classmethod
- def color_error(cls, text: Any) -> str:
- return f"{cls.red}{text}{cls.reset}"
-
- @classmethod
- def print_step(cls, text: str) -> None:
- prefix = cls.prefix + " " * cls.level
- if sys.exc_info()[0]:
- # We are falling through exception handling blocks.
- # De-emphasize this step here, so the user can tell more
- # easily which step generated the exception. The exception
- # or error will only be printed after we finish cleanup.
- cls._print(f"{prefix}({text})\n")
- else:
- cls._print(f"{prefix}{cls.bold}{text}{cls.reset}\n")
-
- @classmethod
- def info(cls, text: str) -> None:
- cls._print(text + "\n")
-
- @classmethod
- def warn(cls, text: str) -> None:
- cls._print(f"{cls.prefix}{cls.color_error(text)}\n")
-
- @classmethod
- @contextlib.contextmanager
- def complete_step(cls, text: str, text2: Optional[str] = None) -> Iterator[list[Any]]:
- cls.print_step(text)
-
- cls.level += 1
- try:
- args: list[Any] = []
- yield args
- finally:
- cls.level -= 1
- assert cls.level >= 0
-
- if text2 is not None:
- cls.print_step(text2.format(*args))
-
-
-def chown_to_running_user(path: Path) -> None:
- uid = int(os.getenv("SUDO_UID") or os.getenv("PKEXEC_UID") or str(os.getuid()))
- user = pwd.getpwuid(uid).pw_name
- gid = pwd.getpwuid(uid).pw_gid
-
- with MkosiPrinter.complete_step(
- f"Changing ownership of output file {path} to user {user}…",
- f"Changed ownership of {path}",
- ):
- os.chown(path, uid, gid)
-
-
-def mkdirp_chown_current_user(
- path: PathString,
- *,
- chown: bool = True,
- mode: int = 0o777,
- exist_ok: bool = True
-) -> None:
- abspath = Path(path).absolute()
- path = Path()
-
- for d in abspath.parts:
- path /= d
- if path.exists():
- continue
-
- path.mkdir(mode=mode, exist_ok=exist_ok)
-
- if chown:
- chown_to_running_user(path)
-
-
def safe_tar_extract(tar: tarfile.TarFile, path: Path=Path("."), *, numeric_owner: bool=False) -> None:
"""Extract a tar without CVE-2007-4559.
tar.extractall(path, numeric_owner=numeric_owner)
-complete_step = MkosiPrinter.complete_step
-
-
def disable_pam_securetty(root: Path) -> None:
def _rm_securetty(line: str) -> str:
if "pam_securetty.so" in line:
return sorted(packages, key=sort)
-def scandir_recursive(
- root: Path,
- filter: Optional[Callable[[os.DirEntry[str]], T]] = None,
-) -> Iterator[T]:
- """Recursively walk the tree starting at @root, optionally apply filter, yield non-none values"""
- queue: Deque[Union[str, Path]] = collections.deque([root])
-
- while queue:
- for entry in os.scandir(queue.pop()):
- pred = filter(entry) if filter is not None else entry
- if pred is not None:
- yield cast(T, pred)
- if entry.is_dir(follow_symlinks=False):
- queue.append(entry.path)
+def flatten(lists: Iterable[Iterable[T]]) -> list[T]:
+ """Flatten a sequence of sequences into a single list."""
+ return list(itertools.chain.from_iterable(lists))
+
+
+def current_user_uid_gid() -> tuple[int, int]:
+ uid = int(os.getenv("SUDO_UID") or os.getenv("PKEXEC_UID") or os.getuid())
+ gid = pwd.getpwuid(uid).pw_gid
+ return uid, gid
import os
from textwrap import dedent
-from mkosi.backend import (
- MkosiPrinter,
- MkosiState,
- add_packages,
- complete_step,
- disable_pam_securetty,
- run,
- sort_packages,
-)
+from mkosi.backend import MkosiState, add_packages, disable_pam_securetty, sort_packages
from mkosi.distributions import DistributionInstaller
-from mkosi.mounts import mount_api_vfs
+from mkosi.log import complete_step
+from mkosi.run import run_with_apivfs
+from mkosi.types import PathString
class ArchInstaller(DistributionInstaller):
@complete_step("Installing Arch Linux…")
def install_arch(state: MkosiState) -> None:
- if state.config.release is not None:
- MkosiPrinter.info("Distribution release specification is not supported for Arch Linux, ignoring.")
-
assert state.config.mirror
if state.config.local_mirror:
# Create base layout for pacman and pacman-key
os.makedirs(state.root / "var/lib/pacman", 0o755, exist_ok=True)
- os.makedirs(state.root / "etc/pacman.d/gnupg", 0o755, exist_ok=True)
-
- # Permissions on these directories are all 0o777 because of 'mount --bind'
- # limitations but pacman expects them to be 0o755 so we fix them before
- # calling pacman (except /var/tmp which is 0o1777).
- fix_permissions_dirs = {
- "boot": 0o755,
- "etc": 0o755,
- "etc/pacman.d": 0o755,
- "var": 0o755,
- "var/lib": 0o755,
- "var/cache": 0o755,
- "var/cache/pacman": 0o755,
- "var/tmp": 0o1777,
- "run": 0o755,
- }
-
- for dir, permissions in fix_permissions_dirs.items():
- if (path := state.root / dir).exists():
- path.chmod(permissions)
pacman_conf = state.workspace / "pacman.conf"
if state.config.repository_key_check:
[options]
RootDir = {state.root}
LogFile = /dev/null
- CacheDir = {state.root}/var/cache/pacman/pkg/
+ CacheDir = {state.config.cache_path}
GPGDir = /etc/pacman.d/gnupg/
HookDir = {state.root}/etc/pacman.d/hooks/
HoldPkg = pacman glibc
if not state.do_run_build_script and state.config.ssh:
add_packages(state.config, packages, "openssh")
- with mount_api_vfs(state.root):
- run(["pacman", "--config", pacman_conf, "--noconfirm", "-Sy", *sort_packages(packages)],
- env={"KERNEL_INSTALL_BYPASS": state.environment.get("KERNEL_INSTALL_BYPASS", "1")})
+ cmdline: list[PathString] = [
+ "pacman",
+ "--config", pacman_conf,
+ "--noconfirm",
+ "-Sy", *sort_packages(packages),
+ ]
+
+ run_with_apivfs(state, cmdline, env=dict(KERNEL_INSTALL_BYPASS="1"))
state.root.joinpath("etc/pacman.d/mirrorlist").write_text(f"Server = {state.config.mirror}/$repo/os/$arch\n")
import shutil
from pathlib import Path
-from mkosi.backend import (
- Distribution,
- MkosiConfig,
- MkosiState,
- add_packages,
- complete_step,
- die,
- run_workspace_command,
-)
+from mkosi.backend import Distribution, MkosiConfig, MkosiState, add_packages
from mkosi.distributions import DistributionInstaller
from mkosi.distributions.fedora import Repo, install_packages_dnf, invoke_dnf, setup_dnf
+from mkosi.log import complete_step, die
from mkosi.remove import unlink_try_hard
+from mkosi.run import run_workspace_command
def move_rpm_db(root: Path) -> None:
setup_dnf(state, repos)
if state.config.distribution == Distribution.centos:
- state.workspace.joinpath("vars/stream").write_text(f"{state.config.release}-stream")
+ env = dict(DNF_VAR_stream=f"{state.config.release}-stream")
+ else:
+ env = {}
packages = {*state.config.packages}
add_packages(state.config, packages, "systemd", "dnf")
if release <= 8:
add_packages(state.config, packages, "glibc-minimal-langpack")
- install_packages_dnf(state, packages)
+ install_packages_dnf(state, packages, env)
# On Fedora, the default rpmdb has moved to /usr/lib/sysimage/rpm so if that's the case we need to
# move it back to /var/lib/rpm on CentOS.
# SPDX-License-Identifier: LGPL-2.1+
-import contextlib
import os
+import shutil
import subprocess
-from collections.abc import Iterable, Iterator
+from collections.abc import Iterable
from pathlib import Path
from textwrap import dedent
-from typing import TYPE_CHECKING, Any
-
-from mkosi.backend import (
- MkosiState,
- PathString,
- add_packages,
- complete_step,
- disable_pam_securetty,
- run,
- run_workspace_command,
-)
+
+from mkosi.backend import MkosiState, add_packages, disable_pam_securetty
from mkosi.distributions import DistributionInstaller
from mkosi.install import install_skeleton_trees, write_resource
-from mkosi.mounts import mount_api_vfs, mount_bind
-
-if TYPE_CHECKING:
- CompletedProcess = subprocess.CompletedProcess[Any]
-else:
- CompletedProcess = subprocess.CompletedProcess
+from mkosi.run import run, run_with_apivfs
+from mkosi.types import _FILE, CompletedProcess, PathString
class DebianInstaller(DistributionInstaller):
# the base image.
state.root.joinpath("etc/resolv.conf").unlink(missing_ok=True)
state.root.joinpath("etc/resolv.conf").symlink_to("../run/systemd/resolve/resolv.conf")
- run(["systemctl", "--root", state.root, "enable", "systemd-resolved"])
@classmethod
def cache_path(cls) -> list[str]:
"--variant=minbase",
"--include=ca-certificates",
"--merged-usr",
+ f"--cache-dir={state.cache}",
f"--components={','.join(repos)}",
]
mirror = state.config.local_mirror or state.config.mirror
assert mirror is not None
cmdline += [state.config.release, state.root, mirror]
- run(cmdline)
+
+ # Pretend we're lxc so debootstrap skips its mknod check.
+ run_with_apivfs(state, cmdline, env=dict(container="lxc"))
# Install extra packages via the secondary APT run, because it is smarter and can deal better with any
# conflicts. dbus and libpam-systemd are optional dependencies for systemd in debian so we include them
policyrcd.chmod(0o755)
doc_paths = [
- "/usr/share/locale",
- "/usr/share/doc",
- "/usr/share/man",
- "/usr/share/groff",
- "/usr/share/info",
- "/usr/share/lintian",
- "/usr/share/linda",
+ state.root / "usr/share/locale",
+ state.root / "usr/share/doc",
+ state.root / "usr/share/man",
+ state.root / "usr/share/groff",
+ state.root / "usr/share/info",
+ state.root / "usr/share/lintian",
+ state.root / "usr/share/linda",
]
if not state.config.with_docs:
# Remove documentation installed by debootstrap
- cmdline = ["/bin/rm", "-rf", *doc_paths]
- run_workspace_command(state, cmdline)
+ for d in doc_paths:
+ try:
+ shutil.rmtree(d)
+ except FileNotFoundError:
+ pass
# Create dpkg.cfg to ignore documentation on new packages
dpkg_nodoc_conf = state.root / "etc/dpkg/dpkg.cfg.d/01_nodoc"
with dpkg_nodoc_conf.open("w") as f:
stdout=subprocess.PIPE, check=False).stdout
-@contextlib.contextmanager
-def mount_apt_local_mirror(state: MkosiState) -> Iterator[None]:
- # Ensure apt inside the image can see the local mirror outside of it
- mirror = state.config.local_mirror or state.config.mirror
- if not mirror or not mirror.startswith("file:"):
- yield
- return
-
- # Strip leading '/' as Path() does not behave well when concatenating
- mirror_dir = mirror[5:].lstrip("/")
-
- with complete_step("Mounting apt local mirror…", "Unmounting apt local mirror…"):
- with mount_bind(Path("/") / mirror_dir, state.root / mirror_dir):
- yield
-
-
def invoke_apt(
state: MkosiState,
subcommand: str,
operation: str,
extra: Iterable[str],
- **kwargs: Any,
+ stdout: _FILE = None,
) -> CompletedProcess:
- config_file = state.workspace / "apt.conf"
+ state.workspace.joinpath("apt").mkdir(exist_ok=True)
+ state.workspace.joinpath("apt/log").mkdir(exist_ok=True)
+ state.root.joinpath("var/lib/dpkg").mkdir(exist_ok=True)
+ state.root.joinpath("var/lib/dpkg/status").touch()
+
+ config_file = state.workspace / "apt/apt.conf"
debarch = DEBIAN_ARCHITECTURES[state.config.architecture]
- if not config_file.exists():
- config_file.write_text(
- dedent(
- f"""\
- Dir "{state.root}";
- DPkg::Chroot-Directory "{state.root}";
- """
- )
+ config_file.write_text(
+ dedent(
+ f"""\
+ APT::Architecture "{debarch}";
+ APT::Immediate-Configure "off";
+ Dir::Cache "{state.cache}";
+ Dir::State "{state.workspace / "apt"}";
+ Dir::State::status "{state.root / "var/lib/dpkg/status"}";
+ Dir::Etc "{state.root / "etc/apt"}";
+ Dir::Log "{state.workspace / "apt/log"}";
+ DPkg::Options:: "--root={state.root}";
+ DPkg::Options:: "--log={state.workspace / "apt/dpkg.log"}";
+ DPkg::Install::Recursive::Minimum "1000";
+ """
)
+ )
cmdline = [
f"/usr/bin/apt-{subcommand}",
- "-o", f"APT::Architecture={debarch}",
- "-o", "dpkg::install::recursive::minimum=1000",
operation,
*extra,
]
env = dict(
- APT_CONFIG=f"{config_file}",
+ APT_CONFIG=config_file,
DEBIAN_FRONTEND="noninteractive",
- DEBCONF_NONINTERACTIVE_SEEN="true",
+ DEBCONF_INTERACTIVE_SEEN="true",
INITRD="No",
)
- with mount_apt_local_mirror(state), mount_api_vfs(state.root):
- return run(cmdline, env=env, text=True, **kwargs)
+ return run_with_apivfs(state, cmdline, stdout=stdout, env=env)
def add_apt_package_if_exists(state: MkosiState, extra_packages: set[str], package: str) -> None:
import shutil
import urllib.parse
import urllib.request
-from collections.abc import Iterable, Sequence
+from collections.abc import Iterable, Mapping, Sequence
from pathlib import Path
from textwrap import dedent
-from typing import NamedTuple, Optional
+from typing import Any, NamedTuple, Optional
from mkosi.backend import (
Distribution,
- MkosiPrinter,
MkosiState,
add_packages,
- complete_step,
detect_distribution,
- run,
sort_packages,
- warn,
)
from mkosi.distributions import DistributionInstaller
-from mkosi.mounts import mount_api_vfs
+from mkosi.log import MkosiPrinter, complete_step, warn
from mkosi.remove import unlink_try_hard
+from mkosi.run import run_with_apivfs
FEDORA_KEYS_MAP = {
"36": "53DED2CB922D8B8D9E63FD18999F7CBF38AB71F4",
return packages
-def install_packages_dnf(state: MkosiState, packages: set[str],) -> None:
+def install_packages_dnf(state: MkosiState, packages: set[str], env: Mapping[str, Any] = {}) -> None:
packages = make_rpm_list(state, packages)
- invoke_dnf(state, 'install', packages)
+ invoke_dnf(state, 'install', packages, env)
class Repo(NamedTuple):
def setup_dnf(state: MkosiState, repos: Sequence[Repo] = ()) -> None:
- gpgcheck = True
+ with state.workspace.joinpath("dnf.conf").open("w") as f:
+ gpgcheck = True
- repo_file = state.workspace / "mkosi.repo"
- with repo_file.open("w") as f:
for repo in repos:
gpgkey: Optional[str] = None
name={repo.id}
{repo.url}
gpgkey={gpgkey or ''}
+ gpgcheck={int(gpgcheck)}
enabled={int(repo.enabled)}
- check_config_file_age=False
+ check_config_file_age=0
"""
)
)
- default_repos = f"reposdir={state.workspace} {' '.join(str(p) for p in state.config.repo_dirs)}"
- vars_dir = state.workspace / "vars"
- vars_dir.mkdir(exist_ok=True)
-
- config_file = state.workspace / "dnf.conf"
- config_file.write_text(
- dedent(
- f"""\
- [main]
- gpgcheck={'1' if gpgcheck else '0'}
- {default_repos }
- varsdir={vars_dir}
- """
- )
- )
-
-
-def invoke_dnf(state: MkosiState, command: str, packages: Iterable[str]) -> None:
+def invoke_dnf(state: MkosiState, command: str, packages: Iterable[str], env: Mapping[str, Any] = {}) -> None:
if state.config.distribution == Distribution.fedora:
release, _ = parse_fedora_release(state.config.release)
else:
- release = state.config.release.strip("-stream")
-
- config_file = state.workspace / "dnf.conf"
+ release = state.config.release
- cmd = 'dnf' if shutil.which('dnf') else 'yum'
+ state.workspace.joinpath("vars").mkdir(exist_ok=True)
cmdline = [
- cmd,
+ 'dnf' if shutil.which('dnf') else 'yum',
"-y",
- f"--config={config_file}",
+ f"--config={state.workspace.joinpath('dnf.conf')}",
"--best",
"--allowerasing",
f"--releasever={release}",
f"--installroot={state.root}",
"--setopt=keepcache=1",
"--setopt=install_weak_deps=0",
+ f"--setopt=cachedir={state.cache}",
+ f"--setopt=reposdir={' '.join(str(p) for p in state.config.repo_dirs)}",
+ f"--setopt=varsdir={state.workspace / 'vars'}",
"--noplugins",
]
cmdline += [command, *sort_packages(packages)]
- with mount_api_vfs(state.root):
- run(cmdline, env={"KERNEL_INSTALL_BYPASS": state.environment.get("KERNEL_INSTALL_BYPASS", "1")})
+ run_with_apivfs(state, cmdline, env=dict(KERNEL_INSTALL_BYPASS="1") | env)
distribution, _ = detect_distribution()
if distribution not in (Distribution.debian, Distribution.ubuntu):
from pathlib import Path
from textwrap import dedent
-from mkosi.backend import (
- ARG_DEBUG,
- MkosiException,
- MkosiPrinter,
- MkosiState,
- complete_step,
- die,
- run_workspace_command,
- safe_tar_extract,
-)
+from mkosi.backend import MkosiState, safe_tar_extract
from mkosi.distributions import DistributionInstaller
from mkosi.install import copy_path, flock
+from mkosi.log import ARG_DEBUG, MkosiException, MkosiPrinter, complete_step, die
from mkosi.remove import unlink_try_hard
+from mkosi.run import run_workspace_command
ARCHITECTURES = {
"x86_64": ("amd64", "arch/x86/boot/bzImage"),
self.portage_cfg_dir.mkdir(parents=True, exist_ok=True)
- self.DEFAULT_NSPAWN_PARAMS = [
- "--capability=CAP_SYS_ADMIN,CAP_MKNOD",
- f"--bind={self.portage_cfg['PORTDIR']}",
- f"--bind={self.portage_cfg['DISTDIR']}",
- f"--bind={self.portage_cfg['PKGDIR']}",
- ]
-
jobs = os.cpu_count() or 1
self.emerge_default_opts = [
"--buildpkg=y",
if self.state.do_run_build_script:
self.invoke_emerge(pkgs=self.state.config.build_packages)
if self.state.config.packages:
- self.invoke_emerge(pkgs=self.state.config.packages, check=False)
+ self.invoke_emerge(pkgs=self.state.config.packages)
+
def invoke_emerge(
self,
- check: bool = True,
inside_stage3: bool = True,
pkgs: Sequence[str] = (),
actions: Sequence[str] = (),
MkosiPrinter.print_step("Invoking emerge(1) inside stage3"
f"{self.root}")
- run_workspace_command(self.state, cmd, network=True, env=self.emerge_vars,
- nspawn_params=self.DEFAULT_NSPAWN_PARAMS,
- check=check)
+
+ bwrap = [
+ "--bind", self.portage_cfg['PORTDIR'], self.portage_cfg['PORTDIR'],
+ "--bind", self.portage_cfg['DISTDIR'], self.portage_cfg['DISTDIR'],
+ "--bind", self.portage_cfg['PKGDIR'], self.portage_cfg['PKGDIR'],
+ ]
+ run_workspace_command(self.state, cmd, network=True, bwrap_params=bwrap)
def _dbg(self, state: MkosiState) -> None:
"""this is for dropping into shell to see what's wrong"""
- cmd = ["/usr/bin/sh"]
- run_workspace_command(self.state, cmd, network=True,
- nspawn_params=self.DEFAULT_NSPAWN_PARAMS)
+ bwrap = [
+ "--bind", self.portage_cfg['PORTDIR'], self.portage_cfg['PORTDIR'],
+ "--bind", self.portage_cfg['DISTDIR'], self.portage_cfg['DISTDIR'],
+ "--bind", self.portage_cfg['PKGDIR'], self.portage_cfg['PKGDIR'],
+ ]
+ run_workspace_command(self.state, ["sh"], network=True, bwrap_params=bwrap)
class GentooInstaller(DistributionInstaller):
from pathlib import Path
-from mkosi.backend import MkosiState, add_packages, complete_step, disable_pam_securetty
+from mkosi.backend import MkosiState, add_packages, disable_pam_securetty
from mkosi.distributions import DistributionInstaller
from mkosi.distributions.fedora import Repo, install_packages_dnf, invoke_dnf, setup_dnf
+from mkosi.log import complete_step
class MageiaInstaller(DistributionInstaller):
from pathlib import Path
-from mkosi.backend import MkosiState, add_packages, complete_step
+from mkosi.backend import MkosiState, add_packages
from mkosi.distributions import DistributionInstaller
from mkosi.distributions.fedora import Repo, install_packages_dnf, invoke_dnf, setup_dnf
+from mkosi.log import complete_step
class OpenmandrivaInstaller(DistributionInstaller):
import shutil
-from mkosi.backend import (
- MkosiState,
- PathString,
- add_packages,
- complete_step,
- patch_file,
- run,
- sort_packages,
-)
+from mkosi.backend import MkosiState, add_packages, patch_file, sort_packages
from mkosi.distributions import DistributionInstaller
-from mkosi.mounts import mount_api_vfs
+from mkosi.log import complete_step
+from mkosi.run import run, run_with_apivfs
+from mkosi.types import PathString
class OpensuseInstaller(DistributionInstaller):
"--root",
state.root,
"--gpg-auto-import-keys" if state.config.repository_key_check else "--no-gpg-checks",
+ "--cache-dir", state.cache,
"install",
"-y",
"--no-recommends",
*sort_packages(packages),
]
- with mount_api_vfs(state.root):
- run(cmdline)
+ run_with_apivfs(state, cmdline)
# Disable package caching in the image that was enabled previously to populate the package cache.
run(["zypper", "--root", state.root, "modifyrepo", "-K", "repo-oss"])
from textwrap import dedent
from typing import Optional
-from mkosi.backend import MkosiState, complete_step, run
+from mkosi.backend import MkosiState
+from mkosi.log import complete_step
+from mkosi.run import run
def make_executable(path: Path) -> None:
os.close(fd)
-def copy_path(src: Path, dst: Path, parents: bool = False) -> None:
- run(["cp", "--archive", "--no-target-directory", "--reflink=auto", src, dst])
+def copy_path(src: Path, dst: Path, preserve_owner: bool = True) -> None:
+ run([
+ "cp",
+ "--recursive",
+ "--no-dereference",
+ f"--preserve=mode,timestamps,links,xattr{',ownership' if preserve_owner else ''}",
+ "--no-target-directory",
+ "--reflink=auto",
+ src, dst,
+ ])
def install_skeleton_trees(state: MkosiState, cached: bool, *, late: bool=False) -> None:
with complete_step("Copying in skeleton file trees…"):
for tree in state.config.skeleton_trees:
if tree.is_dir():
- copy_path(tree, state.root)
+ copy_path(tree, state.root, preserve_owner=False)
else:
# unpack_archive() groks Paths, but mypy doesn't know this.
# Pretend that tree is a str.
--- /dev/null
+import contextlib
+import sys
+from typing import Any, Iterator, NoReturn, Optional
+
+# This global should be initialized after parsing arguments
+ARG_DEBUG: set[str] = set()
+
+
+class MkosiException(Exception):
+ """Leads to sys.exit"""
+
+
+class MkosiNotSupportedException(MkosiException):
+ """Leads to sys.exit when an invalid combination of parsed arguments happens"""
+
+
+def die(message: str, exception: type[MkosiException] = MkosiException) -> NoReturn:
+ MkosiPrinter.warn(f"Error: {message}")
+ raise exception(message)
+
+
+def warn(message: str) -> None:
+ MkosiPrinter.warn(f"Warning: {message}")
+
+
+class MkosiPrinter:
+ out_file = sys.stderr
+ isatty = out_file.isatty()
+
+ bold = "\033[0;1;39m" if isatty else ""
+ red = "\033[31;1m" if isatty else ""
+ reset = "\033[0m" if isatty else ""
+
+ prefix = "‣ "
+
+ level = 0
+
+ @classmethod
+ def _print(cls, text: str) -> None:
+ cls.out_file.write(text)
+
+ @classmethod
+ def color_error(cls, text: Any) -> str:
+ return f"{cls.red}{text}{cls.reset}"
+
+ @classmethod
+ def print_step(cls, text: str) -> None:
+ prefix = cls.prefix + " " * cls.level
+ if sys.exc_info()[0]:
+ # We are falling through exception handling blocks.
+ # De-emphasize this step here, so the user can tell more
+ # easily which step generated the exception. The exception
+ # or error will only be printed after we finish cleanup.
+ cls._print(f"{prefix}({text})\n")
+ else:
+ cls._print(f"{prefix}{cls.bold}{text}{cls.reset}\n")
+
+ @classmethod
+ def info(cls, text: str) -> None:
+ cls._print(text + "\n")
+
+ @classmethod
+ def warn(cls, text: str) -> None:
+ cls._print(f"{cls.prefix}{cls.color_error(text)}\n")
+
+ @classmethod
+ @contextlib.contextmanager
+ def complete_step(cls, text: str, text2: Optional[str] = None) -> Iterator[list[Any]]:
+ cls.print_step(text)
+
+ cls.level += 1
+ try:
+ args: list[Any] = []
+ yield args
+ finally:
+ cls.level -= 1
+ assert cls.level >= 0
+
+ if text2 is not None:
+ cls.print_step(text2.format(*args))
+
+
+complete_step = MkosiPrinter.complete_step
from textwrap import dedent
from typing import IO, Any, Optional
-from mkosi.backend import Distribution, ManifestFormat, MkosiConfig, PackageType, run
+from mkosi.backend import Distribution, ManifestFormat, MkosiConfig, PackageType
+from mkosi.run import run
@dataclasses.dataclass
# SPDX-License-Identifier: LGPL-2.1+
+import collections
import contextlib
import os
import stat
from collections.abc import Iterator, Sequence
from pathlib import Path
-from typing import ContextManager, Optional, Union, cast
+from typing import Callable, ContextManager, Deque, Optional, TypeVar, Union, cast
-from mkosi.backend import complete_step, run, scandir_recursive
+from mkosi.log import complete_step
+from mkosi.run import run
+from mkosi.types import PathString
-PathString = Union[Path, str]
+T = TypeVar("T")
+
+
+def scandir_recursive(
+ root: Path,
+ filter: Optional[Callable[[os.DirEntry[str]], T]] = None,
+) -> Iterator[T]:
+ """Recursively walk the tree starting at @root, optionally apply filter, yield non-none values"""
+ queue: Deque[Union[str, Path]] = collections.deque([root])
+
+ while queue:
+ for entry in os.scandir(queue.pop()):
+ pred = filter(entry) if filter is not None else entry
+ if pred is not None:
+ yield cast(T, pred)
+ if entry.is_dir(follow_symlinks=False):
+ queue.append(entry.path)
def stat_is_whiteout(st: os.stat_result) -> bool:
@contextlib.contextmanager
def mount(
- what: PathString,
- where: Path,
- operation: Optional[str] = None,
- options: Sequence[str] = (),
- type: Optional[str] = None,
- read_only: bool = False,
+ what: PathString,
+ where: Path,
+ operation: Optional[str] = None,
+ options: Sequence[str] = (),
+ type: Optional[str] = None,
+ read_only: bool = False,
) -> Iterator[Path]:
os.makedirs(where, 0o755, True)
run(["umount", "--no-mtab", "--recursive", where])
-def mount_bind(what: Path, where: Optional[Path] = None) -> ContextManager[Path]:
+def mount_bind(what: Path, where: Optional[Path] = None, read_only: bool = False) -> ContextManager[Path]:
if where is None:
where = what
return mount(what, where, operation="--bind")
-def mount_tmpfs(where: Path) -> ContextManager[Path]:
- return mount("tmpfs", where, type="tmpfs")
-
-
@contextlib.contextmanager
def mount_overlay(
lower: Path,
delete_whiteout_files(upper)
-@contextlib.contextmanager
-def mount_api_vfs(root: Path) -> Iterator[None]:
- subdirs = ("proc", "dev", "sys")
-
- with complete_step("Mounting API VFS…", "Unmounting API VFS…"), contextlib.ExitStack() as stack:
- for subdir in subdirs:
- stack.enter_context(mount_bind(Path("/") / subdir, root / subdir))
-
- yield
-
-
@contextlib.contextmanager
def dissect_and_mount(image: Path, where: Path) -> Iterator[Path]:
run(["systemd-dissect", "-M", image, where])
--- /dev/null
+import ctypes
+import ctypes.util
+import multiprocessing
+import os
+import pwd
+import shlex
+import signal
+import subprocess
+import sys
+import traceback
+from pathlib import Path
+from types import TracebackType
+from typing import Any, Callable, Iterable, Mapping, Optional, Sequence, Type, TypeVar
+
+from mkosi.backend import MkosiState
+from mkosi.log import ARG_DEBUG, MkosiPrinter, die
+from mkosi.types import _FILE, CompletedProcess, PathString, Popen
+
+CLONE_NEWNS = 0x00020000
+CLONE_NEWUSER = 0x10000000
+
+SUBRANGE = 65536
+
+T = TypeVar("T")
+
+
+def unshare(flags: int) -> None:
+ libc_name = ctypes.util.find_library("c")
+ if libc_name is None:
+ die("Could not find libc")
+ libc = ctypes.CDLL(libc_name, use_errno=True)
+
+ if libc.unshare(ctypes.c_int(flags)) != 0:
+ e = ctypes.get_errno()
+ raise OSError(e, os.strerror(e))
+
+
+def read_subrange(path: Path) -> int:
+ uid = str(os.getuid())
+ try:
+ user = pwd.getpwuid(os.getuid()).pw_name
+ except KeyError:
+ user = None
+
+ for line in path.read_text().splitlines():
+ name, start, count = line.split(":")
+
+ if name == uid or name == user:
+ break
+ else:
+ die(f"No mapping found for {user or uid} in {path}")
+
+ if int(count) < SUBRANGE:
+ die(f"subuid/subgid range length must be at least {SUBRANGE}, got {count} for {user or uid} from line '{line}'")
+
+ return int(start)
+
+
+def become_root() -> tuple[int, int]:
+ """
+ Set up a new user namespace mapping using /etc/subuid and /etc/subgid.
+
+ The current user will be mapped to root and 65436 will be mapped to the UID/GID of the invoking user.
+ The other IDs will be mapped through.
+
+ The function returns the UID-GID pair of the invoking user in the namespace (65436, 65436).
+ """
+ subuid = read_subrange(Path("/etc/subuid"))
+ subgid = read_subrange(Path("/etc/subgid"))
+
+ event = multiprocessing.Event()
+ pid = os.getpid()
+
+ child = os.fork()
+ if child == 0:
+ event.wait()
+
+ # We map the private UID range configured in /etc/subuid and /etc/subgid into the container using
+ # newuidmap and newgidmap. On top of that, we also make sure to map in the user running mkosi so that
+ # we can run still chown stuff to that user or run stuff as that user which will make sure any
+ # generated files are owned by that user. We don't map to the last user in the range as the last user
+ # is sometimes used in tests as a default value and mapping to that user might break those tests.
+ newuidmap = [
+ "newuidmap", pid,
+ 0, subuid, SUBRANGE - 100,
+ SUBRANGE - 100, os.getuid(), 1,
+ SUBRANGE - 100 + 1, subuid + SUBRANGE - 100 + 1, 99
+ ]
+ run((str(x) for x in newuidmap))
+
+ newgidmap = [
+ "newgidmap", pid,
+ 0, subgid, SUBRANGE - 100,
+ SUBRANGE - 100, os.getgid(), 1,
+ SUBRANGE - 100 + 1, subgid + SUBRANGE - 100 + 1, 99
+ ]
+ run(str(x) for x in newgidmap)
+
+ sys.stdout.flush()
+ sys.stderr.flush()
+
+ os._exit(0)
+
+ unshare(CLONE_NEWUSER)
+ event.set()
+ os.waitpid(child, 0)
+
+ # By default, we're root in the user namespace because if we were our current user by default, we
+ # wouldn't be able to chown stuff to be owned by root while the reverse is possible.
+ os.setresuid(0, 0, 0)
+ os.setresgid(0, 0, 0)
+ os.setgroups([0])
+
+ return SUBRANGE - 100, SUBRANGE - 100
+
+
+def init_mount_namespace() -> None:
+ unshare(CLONE_NEWNS)
+ run(["mount", "--make-rslave", "/"])
+
+
+def foreground() -> None:
+ """
+ If we're connected to a terminal, put the process in a new process group and make that the foreground
+ process group so that only this process receives SIGINT.
+ """
+ if sys.stdin.isatty():
+ os.setpgrp()
+ old = signal.signal(signal.SIGTTOU, signal.SIG_IGN)
+ os.tcsetpgrp(0, os.getpgrp())
+ signal.signal(signal.SIGTTOU, old)
+
+
+class RemoteException(Exception):
+ """
+ Stores the exception from a subprocess along with its traceback. We have to do this explicitly because
+ the original traceback object cannot be pickled. When stringified, produces the subprocess stacktrace
+ plus the exception message.
+ """
+ def __init__(self, e: BaseException, tb: traceback.StackSummary):
+ self.exception = e
+ self.tb = tb
+
+ def __str__(self) -> str:
+ return f"Traceback (most recent call last):\n{''.join(self.tb.format()).strip()}\n{type(self.exception).__name__}: {self.exception}"
+
+
+def excepthook(exctype: Type[BaseException], exc: BaseException, tb: Optional[TracebackType]) -> None:
+ """Attach to sys.excepthook to automically format exceptions with a RemoteException attached correctly."""
+ if isinstance(exc.__cause__, RemoteException):
+ print(exc.__cause__, file=sys.stderr)
+ else:
+ sys.__excepthook__(exctype, exc, tb)
+
+
+def fork_and_wait(target: Callable[[], T]) -> T:
+ """Run the target function in the foreground in a child process and collect its backtrace if there is one."""
+ pout, pin = multiprocessing.Pipe(duplex=False)
+
+ pid = os.fork()
+ if pid == 0:
+ foreground()
+
+ try:
+ result = target()
+ except BaseException as e:
+ # Just getting the stacktrace from the traceback doesn't get us the parent frames for some reason
+ # so we have to attach those manually.
+ tb = traceback.StackSummary.from_list(traceback.extract_stack()[:-1] + traceback.extract_tb(e.__traceback__))
+ pin.send(RemoteException(e, tb))
+ else:
+ pin.send(result)
+ finally:
+ pin.close()
+
+ sys.stdout.flush()
+ sys.stderr.flush()
+
+ os._exit(0)
+
+ os.waitpid(pid, 0)
+ result = pout.recv()
+ if isinstance(result, RemoteException):
+ # Reraise the original exception and attach the remote exception with full traceback as the cause.
+ raise result.exception from result
+
+ return result
+
+
+def run(
+ cmdline: Iterable[PathString],
+ check: bool = True,
+ stdout: _FILE = None,
+ stderr: _FILE = None,
+ env: Optional[Mapping[str, Any]] = None,
+ **kwargs: Any,
+) -> CompletedProcess:
+ cmdline = [os.fspath(x) for x in cmdline]
+
+ if "run" in ARG_DEBUG:
+ MkosiPrinter.info(f"+ {shlex.join(str(s) for s in cmdline)}")
+
+ if not stdout and not stderr:
+ # Unless explicit redirection is done, print all subprocess
+ # output on stderr, since we do so as well for mkosi's own
+ # output.
+ stdout = sys.stderr
+
+ if env is None:
+ env = os.environ
+ else:
+ env = dict(
+ PATH=os.environ["PATH"],
+ TERM=os.getenv("TERM", "vt220"),
+ ) | env
+
+ try:
+ return subprocess.run(cmdline, check=check, stdout=stdout, stderr=stderr, env=env, **kwargs,
+ preexec_fn=foreground)
+ except FileNotFoundError:
+ die(f"{cmdline[0]} not found in PATH.")
+
+
+def spawn(
+ cmdline: Sequence[PathString],
+ stdout: _FILE = None,
+ stderr: _FILE = None,
+ **kwargs: Any,
+) -> Popen:
+ if "run" in ARG_DEBUG:
+ MkosiPrinter.info(f"+ {shlex.join(str(s) for s in cmdline)}")
+
+ if not stdout and not stderr:
+ # Unless explicit redirection is done, print all subprocess
+ # output on stderr, since we do so as well for mkosi's own
+ # output.
+ stdout = sys.stderr
+
+ try:
+ return subprocess.Popen(cmdline, stdout=stdout, stderr=stderr, **kwargs, preexec_fn=foreground)
+ except FileNotFoundError:
+ die(f"{cmdline[0]} not found in PATH.")
+
+
+def run_with_apivfs(
+ state: MkosiState,
+ cmd: Sequence[PathString],
+ bwrap_params: Sequence[PathString] = tuple(),
+ stdout: _FILE = None,
+ env: Mapping[str, Any] = {},
+) -> CompletedProcess:
+ cmdline: list[PathString] = [
+ "bwrap",
+ # Required to make chroot detection via /proc/1/root work properly.
+ "--unshare-pid",
+ "--dev-bind", "/", "/",
+ "--tmpfs", state.root / "run",
+ "--tmpfs", state.root / "tmp",
+ "--proc", state.root / "proc",
+ "--dev", state.root / "dev",
+ "--ro-bind", "/sys", state.root / "sys",
+ "--bind", state.var_tmp(), state.root / "var/tmp",
+ *bwrap_params,
+ "sh", "-c",
+ ]
+
+ env = env | state.environment
+
+ template = f"chmod 1777 {state.root / 'tmp'} {state.root / 'var/tmp'} {state.root / 'dev/shm'} && exec {{}} || exit $?"
+
+ try:
+ return run([*cmdline, template.format(shlex.join(str(s) for s in cmd))],
+ text=True, stdout=stdout, env=env)
+ except subprocess.CalledProcessError as e:
+ if "run" in ARG_DEBUG:
+ run([*cmdline, template.format("sh")], check=False, env=env)
+ die(f"\"{shlex.join(str(s) for s in cmd)}\" returned non-zero exit code {e.returncode}.")
+
+
+def run_workspace_command(
+ state: MkosiState,
+ cmd: Sequence[PathString],
+ bwrap_params: Sequence[PathString] = tuple(),
+ network: bool = False,
+ stdout: _FILE = None,
+ env: Mapping[str, Any] = {},
+) -> CompletedProcess:
+ cmdline: list[PathString] = [
+ "bwrap",
+ "--unshare-ipc",
+ "--unshare-pid",
+ "--unshare-cgroup",
+ "--bind", state.root, "/",
+ "--tmpfs", "/run",
+ "--tmpfs", "/tmp",
+ "--dev", "/dev",
+ "--proc", "/proc",
+ "--ro-bind", "/sys", "/sys",
+ "--bind", state.var_tmp(), "/var/tmp",
+ *bwrap_params,
+ ]
+
+ if network:
+ # If we're using the host network namespace, use the same resolver
+ cmdline += ["--ro-bind", "/etc/resolv.conf", "/etc/resolv.conf"]
+ else:
+ cmdline += ["--unshare-net"]
+
+ cmdline += ["sh", "-c"]
+
+ env = dict(
+ container="mkosi",
+ SYSTEMD_OFFLINE=str(int(network)),
+ HOME="/",
+ ) | env | state.environment
+
+ template = "chmod 1777 /tmp /var/tmp /dev/shm && exec {} || exit $?"
+
+ try:
+ return run([*cmdline, template.format(shlex.join(str(s) for s in cmd))],
+ text=True, stdout=stdout, env=env)
+ except subprocess.CalledProcessError as e:
+ if "run" in ARG_DEBUG:
+ run([*cmdline, template.format("sh")], check=False, env=env)
+ die(f"\"{shlex.join(str(s) for s in cmd)}\" returned non-zero exit code {e.returncode}.")
--- /dev/null
+import subprocess
+import tempfile
+from pathlib import Path
+from typing import IO, TYPE_CHECKING, Any, Union
+
+# These types are only generic during type checking and not at runtime, leading
+# to a TypeError during compilation.
+# Let's be as strict as we can with the description for the usage we have.
+if TYPE_CHECKING:
+ CompletedProcess = subprocess.CompletedProcess[Any]
+ Popen = subprocess.Popen[Any]
+ TempDir = tempfile.TemporaryDirectory[str]
+else:
+ CompletedProcess = subprocess.CompletedProcess
+ Popen = subprocess.Popen
+ TempDir = tempfile.TemporaryDirectory
+
+# Borrowed from https://github.com/python/typeshed/blob/3d14016085aed8bcf0cf67e9e5a70790ce1ad8ea/stdlib/3/subprocess.pyi#L24
+_FILE = Union[None, int, IO[Any]]
+PathString = Union[Path, str]
from mkosi.backend import (
Distribution,
- MkosiException,
PackageType,
safe_tar_extract,
set_umask,
strip_suffixes,
workspace,
)
+from mkosi.log import MkosiException
def test_distribution() -> None:
import pytest
import mkosi
-from mkosi.backend import Distribution, MkosiConfig, MkosiException, Verb
+from mkosi.backend import Distribution, MkosiConfig, Verb
+from mkosi.log import MkosiException
def parse(argv: Optional[List[str]] = None) -> MkosiConfig: