import dataclasses
import datetime
import hashlib
-import http.server
import importlib.resources
import itertools
import json
from mkosi.types import _FILE, CompletedProcess, PathString
from mkosi.util import (
InvokingUser,
+ chdir,
flatten,
format_rlimit,
one_zero,
BUILDROOT=str(state.root),
CHROOT_SCRIPT="/work/prepare",
CHROOT_SRCDIR="/work/src",
- MKOSI_GID=str(state.gid),
- MKOSI_UID=str(state.uid),
+ MKOSI_GID=str(InvokingUser.gid),
+ MKOSI_UID=str(InvokingUser.uid),
SCRIPT="/work/prepare",
SRCDIR=str(Path.cwd()),
WITH_DOCS=one_zero(state.config.with_docs),
CHROOT_SCRIPT="/work/build-script",
CHROOT_SRCDIR="/work/src",
DESTDIR=str(state.install_dir),
- MKOSI_GID=str(state.gid),
- MKOSI_UID=str(state.uid),
+ MKOSI_GID=str(InvokingUser.gid),
+ MKOSI_UID=str(InvokingUser.uid),
OUTPUTDIR=str(state.staging),
SCRIPT="/work/build-script",
SRCDIR=str(Path.cwd()),
with (
mount_build_overlay(state),\
- mount_passwd(state.name, state.uid, state.gid, state.root),\
+ mount_passwd(state.root),\
mount_volatile_overlay(state)\
):
for script in state.config.build_scripts:
CHROOT_OUTPUTDIR="/work/out",
CHROOT_SCRIPT="/work/postinst",
CHROOT_SRCDIR="/work/src",
- MKOSI_GID=str(state.gid),
- MKOSI_UID=str(state.uid),
+ MKOSI_GID=str(InvokingUser.gid),
+ MKOSI_UID=str(InvokingUser.uid),
OUTPUTDIR=str(state.staging),
SCRIPT="/work/postinst",
SRCDIR=str(Path.cwd()),
CHROOT_OUTPUTDIR="/work/out",
CHROOT_SCRIPT="/work/finalize",
CHROOT_SRCDIR="/work/src",
- MKOSI_GID=str(state.gid),
- MKOSI_UID=str(state.uid),
+ MKOSI_GID=str(InvokingUser.gid),
+ MKOSI_UID=str(InvokingUser.uid),
OUTPUTDIR=str(state.staging),
SCRIPT="/work/finalize",
SRCDIR=str(Path.cwd()),
with complete_step("Building initrd"):
args, [config] = parse_config(cmdline)
unlink_output(args, config)
- build_image(args, config, state.name, state.uid, state.gid)
+ build_image(args, config)
symlink.symlink_to(config.output_dir_or_cwd() / config.output)
return kmods
-def extract_pe_section(state: MkosiState, binary: Path, section: str, output: Path) -> None:
+def python_binary(config: MkosiConfig) -> str:
# If there's no tools tree, prefer the interpreter from MKOSI_INTERPRETER. If there is a tools
# tree, just use the default python3 interpreter.
- python = "python3" if state.config.tools_tree else os.getenv("MKOSI_INTERPRETER", "python3")
+ return "python3" if config.tools_tree else os.getenv("MKOSI_INTERPRETER", "python3")
+
+def extract_pe_section(state: MkosiState, binary: Path, section: str, output: Path) -> None:
# When using a tools tree, we want to use the pefile module from the tools tree instead of requiring that
# python-pefile is installed on the host. So we execute python as a subprocess to make sure we load
# pefile from the tools tree if one is used.
"""
)
- run([python], input=pefile)
+ run([python_binary(state.config)], input=pefile)
def build_uki(
os.utime(p, (mtime, mtime), follow_symlinks=False)
-def build_image(args: MkosiArgs, config: MkosiConfig, name: str, uid: int, gid: int) -> None:
+def build_image(args: MkosiArgs, config: MkosiConfig) -> None:
manifest = Manifest(config) if config.manifest_format else None
workspace = tempfile.TemporaryDirectory(dir=config.workspace_dir_or_cwd(), prefix=".mkosi-tmp")
with workspace, scopedenv({"TMPDIR" : workspace.name}):
- state = MkosiState(args, config, Path(workspace.name), name, uid, gid)
+ state = MkosiState(args, config, Path(workspace.name))
install_package_manager_trees(state)
with mount_base_trees(state):
def run_serve(config: MkosiConfig) -> None:
- """Serve the output directory via a tiny embedded HTTP server"""
+ """Serve the output directory via a tiny HTTP server"""
- port = 8081
+ port = "8081"
- os.chdir(config.output_dir_or_cwd())
-
- with http.server.HTTPServer(("", port), http.server.SimpleHTTPRequestHandler) as httpd:
- logging.info(f"Serving HTTP on port {port}: http://localhost:{port}/")
- httpd.serve_forever()
+ with chdir(config.output_dir_or_cwd()):
+ run([python_binary(config), "-m", "http.server", port],
+ user=InvokingUser.uid, group=InvokingUser.gid, stdin=sys.stdin, stdout=sys.stdout)
def generate_key_cert_pair(args: MkosiArgs) -> None:
"-nodes"])
-def bump_image_version(uid: int = -1, gid: int = -1) -> None:
+def bump_image_version() -> None:
"""Write current image version plus one to mkosi.version"""
- assert bool(uid) == bool(gid)
-
version = Path("mkosi.version").read_text().strip()
v = version.split(".")
logging.info(f"Increasing last component of version by one, bumping '{version}' → '{new_version}'.")
Path("mkosi.version").write_text(f"{new_version}\n")
- os.chown("mkosi.version", uid, gid)
+ os.chown("mkosi.version", InvokingUser.uid, InvokingUser.gid)
def show_docs(args: MkosiArgs) -> None:
def expand_specifier(s: str) -> str:
- return s.replace("%u", InvokingUser.name())
+ return s.replace("%u", InvokingUser.name)
def needs_build(args: MkosiArgs, config: MkosiConfig) -> bool:
for config in presets:
try_import(f"mkosi.distributions.{config.distribution}")
- name = InvokingUser.name()
-
# Get the user UID/GID either on the host or in the user namespace running the build
- uid, gid = become_root()
+ become_root()
init_mount_namespace()
# For extra safety when running as root, remount a bunch of stuff read-only.
if not needs_build(args, config):
continue
- with complete_step(f"Building {config.preset or 'default'} image"),\
+ with (
+ complete_step(f"Building {config.preset or 'default'} image"),\
mount_tools(config.tools_tree),\
- mount_passwd(name, uid, gid),\
- prepend_to_environ_path(config):
-
+ mount_passwd(),\
+ prepend_to_environ_path(config)\
+ ):
# After tools have been mounted, check if we have what we need
check_tools(args, config)
config.workspace_dir,
):
if p:
- run(["mkdir", "--parents", p], user=uid, group=gid)
+ run(["mkdir", "--parents", p], user=InvokingUser.uid, group=InvokingUser.gid)
- with acl_toggle_build(config, uid):
- build_image(args, config, name, uid, gid)
+ with acl_toggle_build(config, InvokingUser.uid):
+ build_image(args, config)
# Make sure all build outputs that are not directories are owned by the user running mkosi.
for p in config.output_dir_or_cwd().iterdir():
if not p.is_dir():
- os.chown(p, uid, gid, follow_symlinks=False)
+ os.chown(p, InvokingUser.uid, InvokingUser.gid, follow_symlinks=False)
build = True
if build and args.auto_bump:
- bump_image_version(uid, gid)
+ bump_image_version()
if args.verb == Verb.build:
return
- # We want to drop privileges after mounting the last tools tree, but to unmount it we still need
- # privileges. To avoid a permission error, let's not unmount the final tools tree, since we'll exit
- # right after (and we're in a mount namespace so the /usr mount disappears when we exit)
- with mount_usr(last.tools_tree, umount=False),\
- mount_passwd(name, uid, gid, umount=False),\
- prepend_to_environ_path(last):
-
+ with (
+ mount_usr(last.tools_tree),\
+ mount_passwd(),\
+ prepend_to_environ_path(last)\
+ ):
check_tools(args, last)
- # After mounting the last tools tree, if we're not going to execute systemd-nspawn or qemu, we don't need to
- # be (fake) root anymore, so switch user to the invoking user.
- if not args.verb.needs_root() and args.verb != Verb.qemu:
- os.setresgid(gid, gid, gid)
- os.setresuid(uid, uid, uid)
-
with prepend_to_environ_path(last):
if args.verb in (Verb.shell, Verb.boot):
- with acl_toggle_boot(last, uid):
+ with acl_toggle_boot(last, InvokingUser.uid):
run_shell(args, last)
if args.verb == Verb.qemu:
- run_qemu(args, last, uid, gid)
+ run_qemu(args, last)
if args.verb == Verb.ssh:
run_ssh(args, last)
if expanduser:
if path.is_relative_to("~") and not InvokingUser.is_running_user():
- path = InvokingUser.home() / path.relative_to("~")
+ path = InvokingUser.home / path.relative_to("~")
path = path.expanduser()
if required and not path.exists():
from mkosi.log import complete_step
from mkosi.run import run
from mkosi.types import PathString
-from mkosi.util import umask
+from mkosi.util import InvokingUser, umask
from mkosi.versioncomp import GenericVersion
options: Sequence[str] = (),
type: Optional[str] = None,
read_only: bool = False,
- umount: bool = True,
lazy: bool = False,
) -> Iterator[Path]:
if not where.exists():
run(cmd)
yield where
finally:
- if umount:
- run(["umount", "--no-mtab", *(["--lazy"] if lazy else []), where])
+ run(["umount", "--no-mtab", *(["--lazy"] if lazy else []), where])
@contextlib.contextmanager
@contextlib.contextmanager
-def mount_usr(tree: Optional[Path], umount: bool = True) -> Iterator[None]:
+def mount_usr(tree: Optional[Path]) -> Iterator[None]:
if not tree:
yield
return
where=Path("/usr"),
operation="--bind",
read_only=True,
- umount=umount, lazy=True
+ lazy=True,
):
yield
finally:
@contextlib.contextmanager
-def mount_passwd(name: str, uid: int, gid: int, root: Path = Path("/"), umount: bool = True) -> Iterator[None]:
+def mount_passwd(root: Path = Path("/")) -> Iterator[None]:
"""
ssh looks up the running user in /etc/passwd and fails if it can't find the running user. To trick it, we
mount over /etc/passwd with our own file containing our user in the user namespace.
"""
with tempfile.NamedTemporaryFile(prefix="mkosi.passwd", mode="w") as passwd:
passwd.write("root:x:0:0:root:/root:/bin/sh\n")
- if uid != 0:
- passwd.write(f"{name}:x:{uid}:{gid}:{name}:/home/{name}:/bin/sh\n")
+ if InvokingUser.uid != 0:
+ name = InvokingUser.name
+ passwd.write(f"{name}:x:{InvokingUser.uid}:{InvokingUser.gid}:{name}:/home/{name}:/bin/sh\n")
passwd.flush()
- os.fchown(passwd.file.fileno(), uid, gid)
+ os.fchown(passwd.file.fileno(), InvokingUser.uid, InvokingUser.gid)
- with mount(passwd.name, root / "etc/passwd", operation="--bind", umount=umount):
+ with mount(passwd.name, root / "etc/passwd", operation="--bind"):
yield
import uuid
from collections.abc import Iterator
from pathlib import Path
-from typing import Optional
from mkosi.architecture import Architecture
from mkosi.config import (
from mkosi.run import MkosiAsyncioThread, run, spawn
from mkosi.tree import copy_tree, rmtree
from mkosi.types import PathString
-from mkosi.util import qemu_check_kvm_support, qemu_check_vsock_support
+from mkosi.util import InvokingUser, qemu_check_kvm_support, qemu_check_vsock_support
def machine_cid(config: MkosiConfig) -> int:
@contextlib.contextmanager
def start_swtpm() -> Iterator[Path]:
- with tempfile.TemporaryDirectory() as state:
- sock = Path(state) / Path("sock")
- proc = spawn([
+ with tempfile.TemporaryDirectory(prefix="mkosi-swtpm") as state:
+ # Make sure qemu can access the swtpm socket in this directory.
+ os.chown(state, InvokingUser.uid, InvokingUser.gid)
+
+ cmdline = [
"swtpm",
"socket",
"--tpm2",
"--tpmstate", f"dir={state}",
- "--ctrl", f"type=unixio,path={sock}"
- ])
+ ]
- try:
- yield sock
- finally:
- proc.terminate()
- proc.wait()
+ # We create the socket ourselves and pass the fd to swtpm to avoid race conditions where we start qemu before
+ # swtpm has had the chance to create the socket (or where we try to chown it first).
+ with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
+ path = Path(state) / Path("sock")
+ sock.bind(os.fspath(path))
+ sock.listen()
+
+ # Make sure qemu can connect to the swtpm socket.
+ os.chown(path, InvokingUser.uid, InvokingUser.gid)
+
+ cmdline += ["--ctrl", f"type=unixio,fd={sock.fileno()}"]
+
+ proc = spawn(cmdline, user=InvokingUser.uid, group=InvokingUser.gid, pass_fds=(sock.fileno(),))
+
+ try:
+ yield path
+ finally:
+ proc.terminate()
+ proc.wait()
@contextlib.contextmanager
-def start_virtiofsd(directory: Path, uid: Optional[int] = None, gid: Optional[int] = None) -> Iterator[Path]:
- with tempfile.TemporaryDirectory() as state:
- # Make sure virtiofsd is allowed to create its socket in this temporary directory.
- os.chown(state, uid if uid is not None else os.getuid(), gid if gid is not None else os.getgid())
+def start_virtiofsd(directory: Path, *, uidmap: bool) -> Iterator[Path]:
+ virtiofsd = shutil.which("virtiofsd")
+ if virtiofsd is None:
+ if Path("/usr/libexec/virtiofsd").exists():
+ virtiofsd = "/usr/libexec/virtiofsd"
+ elif Path("/usr/lib/virtiofsd").exists():
+ virtiofsd = "/usr/lib/virtiofsd"
+ else:
+ die("virtiofsd must be installed to use RuntimeMounts= with mkosi qemu")
+
+ cmdline: list[PathString] = [
+ virtiofsd,
+ "--shared-dir", directory,
+ "--xattr",
+ "--posix-acl",
+ ]
+
+ # Map the given user/group to root in the virtual machine for the virtiofs instance to make sure all files
+ # created by root in the VM are owned by the user running mkosi on the host.
+ if uidmap:
+ cmdline += [
+ "--uid-map", f":0:{InvokingUser.uid}:1:",
+ "--gid-map", f":0:{InvokingUser.gid}:1:"
+ ]
+
+ # We create the socket ourselves and pass the fd to virtiofsd to avoid race conditions where we start qemu
+ # before virtiofsd has had the chance to create the socket (or where we try to chown it first).
+ with (
+ tempfile.TemporaryDirectory(prefix="mkosi-virtiofsd") as state,\
+ socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock\
+ ):
+ # Make sure qemu can access the virtiofsd socket in this directory.
+ os.chown(state, InvokingUser.uid, InvokingUser.gid)
# Make sure we can use the socket name as a unique identifier for the fs as well but make sure it's not too
# long as virtiofs tag names are limited to 36 bytes.
- sock = Path(state) / f"sock-{uuid.uuid4().hex}"[:35]
-
- virtiofsd = shutil.which("virtiofsd")
- if virtiofsd is None:
- if Path("/usr/libexec/virtiofsd").exists():
- virtiofsd = "/usr/libexec/virtiofsd"
- elif Path("/usr/lib/virtiofsd").exists():
- virtiofsd = "/usr/lib/virtiofsd"
- else:
- die("virtiofsd must be installed to use RuntimeMounts= with mkosi qemu")
-
- cmdline: list[PathString] = [
- virtiofsd,
- "--socket-path", sock,
- "--shared-dir", directory,
- "--xattr",
- "--posix-acl",
- ]
+ path = Path(state) / f"sock-{uuid.uuid4().hex}"[:35]
+ sock.bind(os.fspath(path))
+ sock.listen()
+
+ # Make sure qemu can connect to the virtiofsd socket.
+ os.chown(path, InvokingUser.uid, InvokingUser.gid)
- # Map the given user/group to root in the virtual machine for the virtiofs instance to make sure all files
- # created by root in the VM are owned by the user running mkosi on the host.
- if uid is not None:
- cmdline += ["--uid-map", f":0:{uid}:1:"]
- if gid is not None:
- cmdline += ["--gid-map", f":0:{gid}:1:"]
+ cmdline += ["--fd", str(sock.fileno())]
# virtiofsd has to run unprivileged to use the --uid-map and --gid-map options, so run it as the given
# user/group if those are provided.
- proc = spawn(cmdline, user=uid, group=gid)
+ proc = spawn(
+ cmdline,
+ user=InvokingUser.uid if uidmap else None,
+ group=InvokingUser.gid if uidmap else None,
+ pass_fds=(sock.fileno(),)
+ )
try:
- yield sock
+ yield path
finally:
proc.wait()
rmtree(tmp)
-def run_qemu(args: MkosiArgs, config: MkosiConfig, uid: int, gid: int) -> None:
+def run_qemu(args: MkosiArgs, config: MkosiConfig) -> None:
if config.output_format not in (OutputFormat.disk, OutputFormat.cpio, OutputFormat.uki, OutputFormat.directory):
die(f"{config.output_format} images cannot be booted in qemu")
with contextlib.ExitStack() as stack:
for src, target in config.runtime_trees:
- sock = stack.enter_context(start_virtiofsd(src, uid, gid))
+ sock = stack.enter_context(start_virtiofsd(src, uidmap=True))
cmdline += [
"-chardev", f"socket,id={sock.name},path={sock}",
"-device", f"vhost-user-fs-pci,queue-size=1024,chardev={sock.name},tag={sock.name}",
]
if firmware == QemuFirmware.uefi and ovmf_supports_sb:
- ovmf_vars = stack.enter_context(tempfile.NamedTemporaryFile(prefix=".mkosi-"))
+ ovmf_vars = stack.enter_context(tempfile.NamedTemporaryFile(prefix="mkosi-ovmf-vars"))
shutil.copy2(find_ovmf_vars(config), Path(ovmf_vars.name))
+ # Make sure qemu can access the ephemeral vars.
+ os.chown(ovmf_vars.name, InvokingUser.uid, InvokingUser.gid)
cmdline += [
"-global", "ICH9-LPC.disable_s3=1",
"-global", "driver=cfi.pflash01,property=secure,value=on",
else:
fname = config.output_dir_or_cwd() / config.output
+ # Make sure qemu can access the ephemeral copy. Not required for directory output because we don't pass that
+ # directly to qemu, but indirectly via virtiofsd.
+ if config.output_format != OutputFormat.directory:
+ os.chown(fname, InvokingUser.uid, InvokingUser.gid)
+
if config.output_format == OutputFormat.disk and config.runtime_size:
run(["systemd-repart",
"--definitions", "",
if not root:
die("Cannot perform a direct kernel boot without a root or usr partition")
elif config.output_format == OutputFormat.directory:
- # This virtiofsd has to run as root so that it can write files owned by any uid:gid created by the VM.
- sock = stack.enter_context(start_virtiofsd(fname))
+ sock = stack.enter_context(start_virtiofsd(fname, uidmap=False))
cmdline += [
"-chardev", f"socket,id={sock.name},path={sock}",
"-device", f"vhost-user-fs-pci,queue-size=1024,chardev={sock.name},tag=root",
cmdline += config.qemu_args
cmdline += args.cmdline
- run(cmdline, stdin=sys.stdin, stdout=sys.stdout, env=os.environ, log=False)
+ run(
+ cmdline,
+ # On Debian/Ubuntu, only users in the kvm group can access /dev/kvm. The invoking user might be part of the
+ # kvm group, but the user namespace fake root user will definitely not be. Thus, we have to run qemu as the
+ # invoking user to make sure we can access /dev/kvm. Of course, if we were invoked as root, none of this
+ # matters as the root user will always be able to access /dev/kvm.
+ user=InvokingUser.uid if not InvokingUser.invoked_as_root else None,
+ group=InvokingUser.gid if not InvokingUser.invoked_as_root else None,
+ stdin=sys.stdin,
+ stdout=sys.stdout,
+ env=os.environ,
+ log=False,
+ )
if status := int(notifications.get("EXIT_STATUS", 0)):
raise subprocess.CalledProcessError(status, cmdline)
cmd += args.cmdline
- run(cmd, stdin=sys.stdin, stdout=sys.stdout, env=os.environ, log=False)
+ run(
+ cmd,
+ user=InvokingUser.uid,
+ group=InvokingUser.gid,
+ stdin=sys.stdin,
+ stdout=sys.stdout,
+ env=os.environ,
+ log=False,
+ )
return int(start)
-def become_root() -> tuple[int, int]:
+def become_root() -> None:
"""
Set up a new user namespace mapping using /etc/subuid and /etc/subgid.
The current user will be mapped to root and 65436 will be mapped to the UID/GID of the invoking user.
The other IDs will be mapped through.
- The function returns the UID-GID pair of the invoking user in the namespace (65436, 65436).
+ The function modifies the uid, gid of the InvokingUser object to the uid, gid of the invoking user in the user
+ namespace.
"""
if os.getuid() == 0:
- return InvokingUser.uid_gid()
+ return
subuid = read_subrange(Path("/etc/subuid"))
subgid = read_subrange(Path("/etc/subgid"))
os.setresgid(0, 0, 0)
os.setgroups([0])
- return SUBRANGE - 100, SUBRANGE - 100
+ InvokingUser.uid = SUBRANGE - 100
+ InvokingUser.gid = SUBRANGE - 100
def init_mount_namespace() -> None:
stderr: _FILE = None,
user: Optional[int] = None,
group: Optional[int] = None,
+ pass_fds: Sequence[int] = (),
) -> Popen:
if ARG_DEBUG.get():
logging.info(f"+ {shlex.join(os.fspath(s) for s in cmdline)}")
text=True,
user=user,
group=group,
+ pass_fds=pass_fds,
preexec_fn=foreground,
)
except FileNotFoundError:
class MkosiState:
"""State related properties."""
- def __init__(self, args: MkosiArgs, config: MkosiConfig, workspace: Path, name: str, uid: int, gid: int) -> None:
+ def __init__(self, args: MkosiArgs, config: MkosiConfig, workspace: Path) -> None:
self.args = args
self.config = config
self.workspace = workspace
- self.name = name
- self.uid = uid
- self.gid = gid
with umask(~0o755):
# Using a btrfs subvolume as the upperdir in an overlayfs results in EXDEV so make sure we create
import tempfile
from collections.abc import Iterable, Iterator, Mapping, Sequence
from pathlib import Path
-from typing import Any, Callable, Optional, TypeVar
+from typing import Any, Callable, TypeVar
from mkosi.types import PathString
class InvokingUser:
- @staticmethod
- def _uid_from_env() -> Optional[int]:
- uid = os.getenv("SUDO_UID") or os.getenv("PKEXEC_UID")
- return int(uid) if uid is not None else None
-
- @classmethod
- def uid(cls) -> int:
- return cls._uid_from_env() or os.getuid()
-
- @classmethod
- def uid_gid(cls) -> tuple[int, int]:
- if (uid := cls._uid_from_env()) is not None:
- gid = int(os.getenv("SUDO_GID", pwd.getpwuid(uid).pw_gid))
- return uid, gid
- return os.getuid(), os.getgid()
-
- @classmethod
- def name(cls) -> str:
- return pwd.getpwuid(cls.uid()).pw_name
-
- @classmethod
- def home(cls) -> Path:
- return Path(f"~{cls.name()}").expanduser()
+ uid = int(os.getenv("SUDO_UID") or os.getenv("PKEXEC_UID") or os.getuid())
+ gid = int(os.getenv("SUDO_GID") or os.getgid())
+ name = pwd.getpwuid(uid).pw_name
+ home = Path(f"~{name}").expanduser()
+ invoked_as_root = (uid == 0)
@classmethod
def is_running_user(cls) -> bool:
- return cls.uid() == os.getuid()
+ return cls.uid == os.getuid()
@contextlib.contextmanager