from mkosi.install import add_dropin_config_from_resource, copy_path, flock
from mkosi.log import Style, color_error, complete_step, die, log_step
from mkosi.manifest import Manifest
-from mkosi.mounts import mount_overlay, scandir_recursive
+from mkosi.mounts import mount_overlay, mount_passwd, mount_tools, scandir_recursive
from mkosi.pager import page
from mkosi.qemu import copy_ephemeral, machine_cid, run_qemu
from mkosi.remove import unlink_try_hard
-from mkosi.run import (
- become_root,
- bwrap,
- bwrap_cmd,
- chroot_cmd,
- fork_and_wait,
- run,
- spawn,
-)
+from mkosi.run import become_root, bwrap, chroot_cmd, init_mount_namespace, run, spawn
from mkosi.state import MkosiState
from mkosi.types import PathString
from mkosi.util import (
format_rlimit,
is_apt_distribution,
is_portage_distribution,
- prepend_to_environ_path,
+ tmp_dir,
+ try_import,
)
MKOSI_COMMANDS_NEED_BUILD = (Verb.build, Verb.shell, Verb.boot, Verb.qemu, Verb.serve)
shutil.unpack_archive(path, d)
bases += [d]
elif path.suffix == ".raw":
- # We want to use bwrap() here but it doesn't propagate mounts so we use run() instead.
run(["systemd-dissect", "-M", path, d])
stack.callback(lambda: run(["systemd-dissect", "-U", d]))
bases += [d]
with complete_step("Running prepare script in build overlay…"), mount_build_overlay(state):
bwrap(
["chroot", "/work/prepare", "build"],
- tools=state.config.tools_tree,
apivfs=state.root,
scripts=dict(chroot=chroot_cmd(state.root, options=options, network=True)),
env=dict(SRCDIR="/work/src") | state.environment,
with complete_step("Running prepare script…"):
bwrap(
["chroot", "/work/prepare", "final"],
- tools=state.config.tools_tree,
apivfs=state.root,
scripts=dict(chroot=chroot_cmd(state.root, options=options, network=True)),
env=dict(SRCDIR="/work/src") | state.environment,
with complete_step("Running postinstall script…"):
bwrap(
["chroot", "/work/postinst", "final"],
- tools=state.config.tools_tree,
apivfs=state.root,
scripts=dict(
chroot=chroot_cmd(
return
with complete_step("Running finalize script…"):
- bwrap([state.config.finalize_script],
- root=state.config.tools_tree,
- env={**state.environment, "BUILDROOT": str(state.root), "OUTPUTDIR": str(state.staging)})
+ run([state.config.finalize_script],
+ env={**state.environment, "BUILDROOT": str(state.root), "OUTPUTDIR": str(state.staging)})
def certificate_common_name(state: MkosiState, certificate: Path) -> str:
- output = bwrap([
+ output = run([
"openssl",
"x509",
"-noout",
"-subject",
"-nameopt", "multiline",
"-in", certificate,
- ], root=state.config.tools_tree, stdout=subprocess.PIPE).stdout
+ ], stdout=subprocess.PIPE).stdout
for line in output.splitlines():
if not line.strip().startswith("commonName"):
# pesign takes a certificate directory and a certificate common name as input arguments, so we have
# to transform our input key and cert into that format. Adapted from
# https://www.mankier.com/1/pesign#Examples-Signing_with_the_certificate_and_private_key_in_individual_files
- bwrap(["openssl",
- "pkcs12",
- "-export",
- # Arcane incantation to create a pkcs12 certificate without a password.
- "-keypbe", "NONE",
- "-certpbe", "NONE",
- "-nomaciter",
- "-passout", "pass:",
- "-out", state.workspace / "secure-boot.p12",
- "-inkey", state.config.secure_boot_key,
- "-in", state.config.secure_boot_certificate],
- root=state.config.tools_tree)
-
- bwrap(["pk12util",
- "-K", "",
- "-W", "",
- "-i", state.workspace / "secure-boot.p12",
- "-d", state.workspace / "pesign"],
- root=state.config.tools_tree)
+ run(["openssl",
+ "pkcs12",
+ "-export",
+ # Arcane incantation to create a pkcs12 certificate without a password.
+ "-keypbe", "NONE",
+ "-certpbe", "NONE",
+ "-nomaciter",
+ "-passout", "pass:",
+ "-out", state.workspace / "secure-boot.p12",
+ "-inkey", state.config.secure_boot_key,
+ "-in", state.config.secure_boot_certificate])
+
+ run(["pk12util",
+ "-K", "",
+ "-W", "",
+ "-i", state.workspace / "secure-boot.p12",
+ "-d", state.workspace / "pesign"])
def install_boot_loader(state: MkosiState) -> None:
if (state.config.secure_boot_sign_tool == SecureBootSignTool.sbsign or
state.config.secure_boot_sign_tool == SecureBootSignTool.auto and
shutil.which("sbsign") is not None):
- bwrap(["sbsign",
- "--key", state.config.secure_boot_key,
- "--cert", state.config.secure_boot_certificate,
- "--output", output,
- input],
- root=state.config.tools_tree)
+ run(["sbsign",
+ "--key", state.config.secure_boot_key,
+ "--cert", state.config.secure_boot_certificate,
+ "--output", output,
+ input])
elif (state.config.secure_boot_sign_tool == SecureBootSignTool.pesign or
state.config.secure_boot_sign_tool == SecureBootSignTool.auto and
shutil.which("pesign") is not None):
pesign_prepare(state)
- bwrap(["pesign",
- "--certdir", state.workspace / "pesign",
- "--certificate", certificate_common_name(state, state.config.secure_boot_certificate),
- "--sign",
- "--force",
- "--in", input,
- "--out", output],
- root=state.config.tools_tree)
+ run(["pesign",
+ "--certdir", state.workspace / "pesign",
+ "--certificate", certificate_common_name(state, state.config.secure_boot_certificate),
+ "--sign",
+ "--force",
+ "--in", input,
+ "--out", output])
else:
die("One of sbsign or pesign is required to use SecureBoot=")
with complete_step("Installing boot loader…"):
- bwrap(["bootctl", "install", "--root", state.root, "--all-architectures"],
- env={"SYSTEMD_ESP_PATH": "/efi"}, root=state.config.tools_tree)
+ run(["bootctl", "install", "--root", state.root, "--all-architectures"],
+ env={"SYSTEMD_ESP_PATH": "/efi"})
if state.config.secure_boot:
assert state.config.secure_boot_key
keys.mkdir(parents=True, exist_ok=True)
# sbsiglist expects a DER certificate.
- bwrap(["openssl",
- "x509",
- "-outform", "DER",
- "-in", state.config.secure_boot_certificate,
- "-out", state.workspace / "mkosi.der"],
- root=state.config.tools_tree)
- bwrap(["sbsiglist",
- "--owner", str(uuid.uuid4()),
- "--type", "x509",
- "--output", state.workspace / "mkosi.esl",
- state.workspace / "mkosi.der"],
- root=state.config.tools_tree)
+ run(["openssl",
+ "x509",
+ "-outform", "DER",
+ "-in", state.config.secure_boot_certificate,
+ "-out", state.workspace / "mkosi.der"])
+ run(["sbsiglist",
+ "--owner", str(uuid.uuid4()),
+ "--type", "x509",
+ "--output", state.workspace / "mkosi.esl",
+ state.workspace / "mkosi.der"])
# We reuse the key for all secure boot databases to keep things simple.
for db in ["PK", "KEK", "db"]:
- bwrap(["sbvarsign",
- "--attr",
- "NON_VOLATILE,BOOTSERVICE_ACCESS,RUNTIME_ACCESS,TIME_BASED_AUTHENTICATED_WRITE_ACCESS",
- "--key", state.config.secure_boot_key,
- "--cert", state.config.secure_boot_certificate,
- "--output", keys / f"{db}.auth",
- db,
- state.workspace / "mkosi.esl"],
- root=state.config.tools_tree)
+ run(["sbvarsign",
+ "--attr",
+ "NON_VOLATILE,BOOTSERVICE_ACCESS,RUNTIME_ACCESS,TIME_BASED_AUTHENTICATED_WRITE_ACCESS",
+ "--key", state.config.secure_boot_key,
+ "--cert", state.config.secure_boot_certificate,
+ "--output", keys / f"{db}.auth",
+ db,
+ state.workspace / "mkosi.esl"])
def install_base_trees(state: MkosiState) -> None:
elif path.suffix == ".tar":
shutil.unpack_archive(path, state.root)
elif path.suffix == ".raw":
- bwrap(["systemd-dissect", "--copy-from", path, "/", state.root],
- root=state.config.tools_tree)
+ run(["systemd-dissect", "--copy-from", path, "/", state.root])
else:
die(f"Unsupported base tree source {path}")
t.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
if source.is_dir() or target:
- copy_path(source, t, preserve_owner=False, root=state.config.tools_tree)
+ copy_path(source, t, preserve_owner=False)
else:
shutil.unpack_archive(source, t)
t.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
if source.is_dir() or target:
- copy_path(source, t, preserve_owner=False, root=state.config.tools_tree)
+ copy_path(source, t, preserve_owner=False)
else:
shutil.unpack_archive(source, t)
t.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
if source.is_dir() or target:
- copy_path(source, t, preserve_owner=False, root=state.config.tools_tree)
+ copy_path(source, t, preserve_owner=False)
else:
shutil.unpack_archive(source, t)
return
with complete_step("Copying in build tree…"):
- copy_path(state.install_dir, state.root, root=state.config.tools_tree)
+ copy_path(state.install_dir, state.root)
def gzip_binary() -> str:
]
with complete_step("Creating archive…"):
- bwrap(cmd, root=state.config.tools_tree)
+ run(cmd)
def find_files(dir: Path, root: Path) -> Iterator[Path]:
def make_cpio(state: MkosiState, files: Iterator[Path], output: Path) -> None:
- with complete_step(f"Creating cpio {output}…"), bwrap_cmd(root=state.config.tools_tree) as bwrap:
+ with complete_step(f"Creating cpio {output}…"):
cmd: list[PathString] = [
- *bwrap,
"cpio",
"-o",
"--reproducible",
# We could run modinfo once for each module but that's slow. Luckily we can pass multiple modules to
# modinfo and it'll process them all in a single go. We get the modinfo for all modules to build two maps
# that map the path of the module to its module dependencies and its firmware dependencies respectively.
- info = bwrap(["modinfo", "--basedir", state.root, "--set-version", kver, "--null", *nametofile.keys(), *builtin],
- stdout=subprocess.PIPE, root=state.config.tools_tree).stdout
+ info = run(["modinfo", "--basedir", state.root, "--set-version", kver, "--null", *nametofile.keys(), *builtin],
+ stdout=subprocess.PIPE).stdout
moddep = {}
firmwaredep = {}
config = presets[0]
unlink_output(args, config)
- build_image(args, config, state.uid, state.gid)
+ build_image(args, config)
initrds = [config.output_dir / config.output]
if state.config.kernel_modules_initrd:
cmd += [gen_kernel_modules_initrd(state, kver)]
- bwrap(cmd, root=state.config.tools_tree)
+ run(cmd)
if not state.staging.joinpath(state.config.output_split_uki).exists():
shutil.copy(boot_binary, state.staging / state.config.output_split_uki)
src.unlink() # if src == dst, make sure dst doesn't truncate the src file but creates a new file.
with dst.open("wb") as o:
- bwrap(compressor_command(compression), stdin=i, stdout=o, root=state.config.tools_tree)
- os.chown(dst, uid=state.uid, gid=state.gid)
+ run(compressor_command(compression), stdin=i, stdout=o)
def copy_nspawn_settings(state: MkosiState) -> None:
state.staging / state.config.output_checksum,
]
- bwrap(
+ run(
cmdline,
# Do not output warnings about keyring permissions
stderr=subprocess.DEVNULL,
Path(os.environ['HOME']).joinpath('.gnupg')
)
},
- root=state.config.tools_tree,
)
process_kernel_modules(state, kver)
with complete_step(f"Running depmod for {kver}"):
- bwrap(["depmod", "--all", "--basedir", state.root, kver], root=state.config.tools_tree)
+ run(["depmod", "--all", "--basedir", state.root, kver])
def run_sysusers(state: MkosiState) -> None:
with complete_step("Generating system users"):
- bwrap(["systemd-sysusers", "--root", state.root], root=state.config.tools_tree)
+ run(["systemd-sysusers", "--root", state.root])
def run_preset(state: MkosiState) -> None:
with complete_step("Applying presets…"):
- bwrap(["systemctl", "--root", state.root, "preset-all"], root=state.config.tools_tree)
+ run(["systemctl", "--root", state.root, "preset-all"])
def run_hwdb(state: MkosiState) -> None:
with complete_step("Generating hardware database"):
- bwrap(["systemd-hwdb", "--root", state.root, "--usr", "--strict", "update"],
- root=state.config.tools_tree)
+ run(["systemd-hwdb", "--root", state.root, "--usr", "--strict", "update"])
def run_firstboot(state: MkosiState) -> None:
return
with complete_step("Applying first boot settings"):
- bwrap(["systemd-firstboot", "--root", state.root, "--force", *options],
- root=state.config.tools_tree)
+ run(["systemd-firstboot", "--root", state.root, "--force", *options])
# Initrds generally don't ship with only /usr so there's not much point in putting the credentials in
# /usr/lib/credstore.
if not selinux.exists():
return
- policy = bwrap(["sh", "-c", f". {selinux} && echo $SELINUXTYPE"],
- stdout=subprocess.PIPE, root=state.config.tools_tree).stdout.strip()
+ policy = run(["sh", "-c", f". {selinux} && echo $SELINUXTYPE"], stdout=subprocess.PIPE).stdout.strip()
if not policy:
return
with complete_step(f"Relabeling files using {policy} policy"):
bwrap(
cmd=["chroot", "sh", "-c", cmd],
- tools=state.config.tools_tree,
apivfs=state.root,
scripts=dict(chroot=chroot_cmd(state.root)),
env=state.environment,
env[option] = value
with complete_step("Generating disk image"):
- output = json.loads(bwrap(cmdline, stdout=subprocess.PIPE, env=env,
- root=state.config.tools_tree).stdout)
+ output = json.loads(run(cmdline, stdout=subprocess.PIPE, env=env).stdout)
roothash = usrhash = None
for p in output:
def finalize_staging(state: MkosiState) -> None:
for f in state.staging.iterdir():
- if not f.is_dir():
- os.chown(f, state.uid, state.gid)
-
shutil.move(f, state.config.output_dir)
-def build_image(args: MkosiArgs, config: MkosiConfig, uid: int, gid: int) -> None:
- state = MkosiState(args, config, uid, gid)
+def build_image(args: MkosiArgs, config: MkosiConfig) -> None:
+ state = MkosiState(args, config)
manifest = Manifest(config)
# Make sure tmpfiles' aging doesn't interfere with our workspace
# while we are working on it.
- with flock(state.workspace), acl_toggle_build(state):
+ with flock(state.workspace):
install_package_manager_trees(state)
with mount_image(state):
if not output_base.exists() or output_base.is_symlink():
output_base.unlink(missing_ok=True)
output_base.symlink_to(state.config.output_with_compression)
- os.chown(output_base, uid, gid, follow_symlinks=False)
print_output_size(config.output_dir / config.output)
options += ["--bind", state.config.build_dir, "/work/build"]
env |= dict(BUILDDIR="/work/build")
- # build-script output goes to stdout so we can run language servers from within mkosi
- # build-scripts. See https://github.com/systemd/mkosi/pull/566 for more information.
bwrap(
["chroot", "/work/build-script"],
- tools=state.config.tools_tree,
apivfs=state.root,
scripts=dict(chroot=chroot_cmd(state.root, options=options, network=state.config.with_network)),
env=env | state.environment,
- stdout=sys.stdout,
)
-def setfacl(config: MkosiConfig, root: Path, uid: int, allow: bool) -> None:
- bwrap(["setfacl",
- "--physical",
- "--modify" if allow else "--remove",
- f"user:{uid}:rwx" if allow else f"user:{uid}",
- "-"],
- root=config.tools_tree,
- # Supply files via stdin so we don't clutter --debug run output too much
- input="\n".join([str(root),
- *(e.path for e in cast(Iterator[os.DirEntry[str]], scandir_recursive(root)) if e.is_dir())])
+def setfacl(root: Path, uid: int, allow: bool) -> None:
+ run(["setfacl",
+ "--physical",
+ "--modify" if allow else "--remove",
+ f"user:{uid}:rwx" if allow else f"user:{uid}",
+ "-"],
+ # Supply files via stdin so we don't clutter --debug run output too much
+ input="\n".join([str(root),
+ *(e.path for e in cast(Iterator[os.DirEntry[str]], scandir_recursive(root)) if e.is_dir())])
)
# getfacl complains about absolute paths so make sure we pass a relative one.
if root.exists():
- has_acl = f"user:{uid}:rwx" in bwrap([
- "getfacl", "-n", root.relative_to(Path.cwd())],
- stdout=subprocess.PIPE,
- root=config.tools_tree,
- ).stdout
+ has_acl = f"user:{uid}:rwx" in run(["getfacl", "-n", root.relative_to(Path.cwd())],
+ stdout=subprocess.PIPE).stdout
if not has_acl and not always:
yield
try:
if has_acl:
with complete_step(f"Removing ACLs from {root}"):
- setfacl(config, root, uid, allow=False)
+ setfacl(root, uid, allow=False)
yield
finally:
if has_acl or always:
with complete_step(f"Adding ACLs to {root}"):
- setfacl(config, root, uid, allow=True)
+ setfacl(root, uid, allow=True)
@contextlib.contextmanager
-def acl_toggle_build(state: MkosiState) -> Iterator[None]:
- if not state.config.acl:
+def acl_toggle_build(config: MkosiConfig, uid: int) -> Iterator[None]:
+ if not config.acl:
yield
return
- extras = [e[0] for e in state.config.extra_trees]
- skeletons = [s[0] for s in state.config.skeleton_trees]
+ extras = [e[0] for e in config.extra_trees]
+ skeletons = [s[0] for s in config.skeleton_trees]
with contextlib.ExitStack() as stack:
- for p in (*state.config.base_trees, *extras, *skeletons):
+ for p in (*config.base_trees, *extras, *skeletons):
if p and p.is_dir():
- stack.enter_context(acl_maybe_toggle(state.config, p, state.uid, always=False))
+ stack.enter_context(acl_maybe_toggle(config, p, uid, always=False))
- for p in (state.config.cache_dir, state.config.build_dir):
+ for p in (config.cache_dir, config.build_dir):
if p:
- stack.enter_context(acl_maybe_toggle(state.config, p, state.uid, always=True))
+ stack.enter_context(acl_maybe_toggle(config, p, uid, always=True))
- if state.config.output_format == OutputFormat.directory:
- stack.enter_context(acl_maybe_toggle(state.config,
- state.config.output_dir / state.config.output,
- state.uid, always=True))
+ if config.output_format == OutputFormat.directory:
+ stack.enter_context(acl_maybe_toggle(config, config.output_dir / config.output, uid, always=True))
yield
@contextlib.contextmanager
-def acl_toggle_boot(config: MkosiConfig) -> Iterator[None]:
+def acl_toggle_boot(config: MkosiConfig, uid: int) -> Iterator[None]:
if not config.acl or config.output_format != OutputFormat.directory:
yield
return
- with acl_maybe_toggle(config, config.output_dir / config.output, InvokingUser.uid(), always=False):
+ with acl_maybe_toggle(config, config.output_dir / config.output, uid, always=False):
yield
fname = config.output_dir / config.output
if config.output_format == OutputFormat.disk and args.verb == Verb.boot:
- bwrap(["systemd-repart",
- "--image", fname,
- "--size", "8G",
- "--no-pager",
- "--dry-run=no",
- "--offline=no",
- fname],
- root=config.tools_tree)
+ run(["systemd-repart",
+ "--image", fname,
+ "--size", "8G",
+ "--no-pager",
+ "--dry-run=no",
+ "--offline=no",
+ fname])
if config.output_format == OutputFormat.directory:
cmdline += ["--directory", fname]
cmdline += ["--"]
cmdline += args.cmdline
- stack.enter_context(acl_toggle_boot(config))
-
- bwrap(cmdline,
- stdin=sys.stdin,
- stdout=sys.stdout,
- env=os.environ,
- log=False,
- root=config.tools_tree)
+ run(cmdline, stdin=sys.stdin, stdout=sys.stdout, env=os.environ, log=False)
def run_ssh(args: MkosiArgs, config: MkosiConfig) -> None:
cmd += args.cmdline
- bwrap(cmd, stdin=sys.stdin, stdout=sys.stdout, env=os.environ, log=False, root=config.tools_tree)
+ run(cmd, stdin=sys.stdin, stdout=sys.stdout, env=os.environ, log=False)
def run_serve(config: MkosiConfig) -> None:
run(cmd)
-def bump_image_version() -> None:
+def bump_image_version(uid: Optional[int] = None, gid: Optional[int] = None) -> None:
"""Write current image version plus one to mkosi.version"""
+ assert bool(uid) == bool(gid)
version = Path("mkosi.version").read_text().strip()
v = version.split(".")
logging.info(f"Increasing last component of version by one, bumping '{version}' → '{new_version}'.")
Path("mkosi.version").write_text(f"{new_version}\n")
+ if uid and gid:
+ os.chown("mkosi.version", uid, gid)
def expand_specifier(s: str) -> str:
return args.verb in MKOSI_COMMANDS_NEED_BUILD and (args.force > 0 or not config.output_dir.joinpath(config.output_with_compression).exists())
+@contextlib.contextmanager
+def prepend_to_environ_path(config: MkosiConfig) -> Iterator[None]:
+ if config.tools_tree or not config.extra_search_paths:
+ yield
+ return
+
+ with tempfile.TemporaryDirectory(prefix="mkosi.path", dir=tmp_dir()) as d:
+
+ for path in config.extra_search_paths:
+ if not path.is_dir():
+ Path(d).joinpath(path.name).symlink_to(path.absolute())
+
+ news = [os.fspath(path) for path in [Path(d), *config.extra_search_paths] if path.is_dir()]
+ olds = os.getenv("PATH", "").split(":")
+ os.environ["PATH"] = ":".join(news + olds)
+
+ try:
+ yield
+ finally:
+ os.environ["PATH"] = ":".join(olds)
+
+
def run_verb(args: MkosiArgs, presets: Sequence[MkosiConfig]) -> None:
if args.verb in MKOSI_COMMANDS_SUDO:
check_root()
if args.verb == Verb.build and not args.force:
check_outputs(config)
+ # Because we overmount /usr when using a tools tree, we need to make sure we load all python modules we
+ # might end up using before overmounting /usr. Any modules that might be dynamically loaded during
+ # execution are forcibly loaded early here.
+ try_import("importlib.readers")
+ try_import("importlib.resources.readers")
+ for config in presets:
+ try_import(f"mkosi.distributions.{config.distribution}")
+
+ name = InvokingUser.name()
+
+ # Get the user UID/GID either on the host or in the user namespace running the build
+ uid, gid = become_root()
+ init_mount_namespace()
+
+ # For extra safety when running as root, remount a bunch of stuff read-only.
+ for d in ("/usr", "/etc", "/opt", "/srv", "/boot", "/efi"):
+ if Path(d).exists():
+ run(["mount", "--rbind", d, d, "--options", "ro"])
+
# First, process all directory removals because otherwise if different presets share directories a later
# preset could end up output generated by an earlier preset.
if not needs_build(args, config) and args.verb != Verb.clean:
continue
- def target() -> None:
- become_root()
- unlink_output(args, config)
-
- fork_and_wait(target)
+ unlink_output(args, config)
if args.verb == Verb.clean:
return
if not needs_build(args, config):
continue
- with prepend_to_environ_path(config.extra_search_paths):
- def target() -> None:
- # Create these before changing user to make sure they're owned by the user running mkosi.
- for d in (
- config.output_dir,
- config.cache_dir,
- config.build_dir,
- config.workspace_dir,
- ):
- if d:
- d.mkdir(parents=True, exist_ok=True)
-
- # Get the user UID/GID either on the host or in the user namespace running the build
- uid, gid = become_root()
- build_image(args, config, uid, gid)
-
- # We only want to run the build in a user namespace but not the following steps. Since we
- # can't rejoin the parent user namespace after unsharing from it, let's run the build in a
- # fork so that the main process does not leave its user namespace.
- with complete_step(f"Building {config.preset or 'default'} image"):
- fork_and_wait(target)
+ with complete_step(f"Building {config.preset or 'default'} image"),\
+ mount_tools(config),\
+ prepend_to_environ_path(config):
+
+ # Create these as the invoking user to make sure they're owned by the user running mkosi.
+ for p in (
+ config.output_dir,
+ config.cache_dir,
+ config.build_dir,
+ config.workspace_dir,
+ ):
+ if p:
+ run(["mkdir", "--parents", p], user=uid, group=gid)
+
+ with acl_toggle_build(config, uid):
+ build_image(args, config)
+
+ # Make sure all build outputs that are not directories are owned by the user running mkosi.
+ for p in config.output_dir.iterdir():
+ if not p.is_dir():
+ os.chown(p, uid, gid, follow_symlinks=False)
build = True
- if build and args.auto_bump:
- bump_image_version()
+ # We want to drop privileges after mounting the last tools tree, but to unmount it we still need
+ # privileges. To avoid a permission error, let's not unmount the final tools tree, since we'll exit
+ # right after (and we're in a mount namespace so the /usr mount disappears when we exit)
+ with mount_tools(last, umount=False), mount_passwd(name, uid, gid, umount=False):
+
+ # After mounting the last tools tree, if we're not going to execute systemd-nspawn, we don't need to
+ # be (fake) root anymore, so switch user to the invoking user.
+ if args.verb not in (Verb.shell, Verb.boot):
+ os.setresgid(gid, gid, gid)
+ os.setresuid(uid, uid, uid)
+
+ if build and args.auto_bump:
+ bump_image_version(uid, gid)
- with prepend_to_environ_path(last.extra_search_paths):
- if args.verb in (Verb.shell, Verb.boot):
- run_shell(args, last)
+ with prepend_to_environ_path(last):
+ if args.verb in (Verb.shell, Verb.boot):
+ with acl_toggle_boot(last, uid):
+ run_shell(args, last)
- if args.verb == Verb.qemu:
- run_qemu(args, last)
+ if args.verb == Verb.qemu:
+ run_qemu(args, last)
- if args.verb == Verb.ssh:
- run_ssh(args, last)
+ if args.verb == Verb.ssh:
+ run_ssh(args, last)
- if args.verb == Verb.serve:
- run_serve(last)
+ if args.verb == Verb.serve:
+ run_serve(last)
import asyncio
import asyncio.tasks
-import contextlib
import ctypes
import ctypes.util
import logging
import tempfile
import textwrap
import threading
-import traceback
from pathlib import Path
from types import TracebackType
-from typing import (
- Any,
- Awaitable,
- Callable,
- Iterator,
- Mapping,
- Optional,
- Sequence,
- Tuple,
- Type,
- TypeVar,
-)
+from typing import Any, Awaitable, Mapping, Optional, Sequence, Tuple, Type, TypeVar
from mkosi.log import ARG_DEBUG, ARG_DEBUG_SHELL, die
from mkosi.types import _FILE, CompletedProcess, PathString, Popen
os._exit(0)
- unshare(CLONE_NEWUSER|CLONE_NEWNS)
+ unshare(CLONE_NEWUSER)
event.set()
os.waitpid(child, 0)
return SUBRANGE - 100, SUBRANGE - 100
+def init_mount_namespace() -> None:
+ unshare(CLONE_NEWNS)
+ run(["mount", "--make-rslave", "/"])
+
+
def foreground(*, new_process_group: bool = True) -> None:
"""
If we're connected to a terminal, put the process in a new process group and make that the foreground
signal.signal(signal.SIGTTOU, old)
-class RemoteException(Exception):
- """
- Stores the exception from a subprocess along with its traceback. We have to do this explicitly because
- the original traceback object cannot be pickled. When stringified, produces the subprocess stacktrace
- plus the exception message.
- """
- def __init__(self, e: BaseException, tb: traceback.StackSummary):
- self.exception = e
- self.tb = tb
-
- def __str__(self) -> str:
- return f"Traceback (most recent call last):\n{''.join(self.tb.format()).strip()}\n{type(self.exception).__name__}: {self.exception}"
-
-
def ensure_exc_info() -> Tuple[Type[BaseException], BaseException, TracebackType]:
exctype, exc, tb = sys.exc_info()
assert exctype
return (exctype, exc, tb)
-def excepthook(exctype: Type[BaseException], exc: BaseException, tb: Optional[TracebackType]) -> None:
- """Attach to sys.excepthook to automatically format exceptions with a RemoteException attached correctly."""
- if isinstance(exc.__cause__, RemoteException):
- print(exc.__cause__, file=sys.stderr)
- else:
- sys.__excepthook__(exctype, exc, tb)
-
-
-def fork_and_wait(target: Callable[[], T]) -> T:
- """Run the target function in the foreground in a child process and collect its backtrace if there is one."""
- pout, pin = multiprocessing.Pipe(duplex=False)
-
- pid = os.fork()
- if pid == 0:
- foreground()
-
- try:
- result = target()
- except BaseException as e:
- # Just getting the stacktrace from the traceback doesn't get us the parent frames for some reason
- # so we have to attach those manually.
- tb = traceback.StackSummary.from_list(traceback.extract_stack()[:-1] + traceback.extract_tb(e.__traceback__))
- pin.send(RemoteException(e, tb))
- else:
- pin.send(result)
- finally:
- pin.close()
-
- sys.stdout.flush()
- sys.stderr.flush()
-
- os._exit(0)
-
- try:
- os.waitpid(pid, 0)
- finally:
- foreground(new_process_group=False)
-
- result = pout.recv()
- if isinstance(result, RemoteException):
- # Reraise the original exception and attach the remote exception with full traceback as the cause.
- raise result.exception from result
-
- return result
-
def run(
cmdline: Sequence[PathString],
check: bool = True,
stdout: _FILE = None,
stderr: _FILE = None,
input: Optional[str] = None,
- text: bool = True,
+ user: Optional[int] = None,
+ group: Optional[int] = None,
env: Mapping[str, PathString] = {},
log: bool = True,
) -> CompletedProcess:
stderr=stderr,
input=input,
text=True,
+ user=user,
+ group=group,
env=env,
preexec_fn=foreground,
)
stdout: _FILE = None,
stderr: _FILE = None,
text: bool = True,
+ user: Optional[int] = None,
+ group: Optional[int] = None,
) -> Popen:
if ARG_DEBUG.get():
logging.info(f"+ {' '.join(str(s) for s in cmdline)}")
stdout=stdout,
stderr=stderr,
text=text,
+ user=user,
+ group=group,
preexec_fn=foreground,
)
except FileNotFoundError:
raise e
-@contextlib.contextmanager
-def bwrap_cmd(
+def bwrap(
+ cmd: Sequence[PathString],
*,
- root: Optional[Path] = None,
apivfs: Optional[Path] = None,
+ log: bool = True,
scripts: Mapping[str, Sequence[PathString]] = {},
-) -> Iterator[list[PathString]]:
+ env: Mapping[str, PathString] = {},
+) -> CompletedProcess:
cmdline: list[PathString] = [
"bwrap",
"--dev-bind", "/", "/",
"--chdir", Path.cwd(),
"--die-with-parent",
- "--ro-bind", (root or Path("/")) / "usr", "/usr",
]
- for d in ("/etc", "/opt", "/srv", "/boot", "/efi"):
- if Path(d).exists():
- cmdline += ["--ro-bind", d, d]
-
if apivfs:
if not (apivfs / "etc/machine-id").exists():
# Uninitialized means we want it to get initialized on first boot.
with tempfile.TemporaryDirectory(dir="/var/tmp", prefix="mkosi-var-tmp") as var_tmp,\
tempfile.TemporaryDirectory(dir="/tmp", prefix="mkosi-scripts") as d:
- for name, cmd in scripts.items():
+ for name, script in scripts.items():
# Make sure we don't end up in a recursive loop when we name a script after the binary it execs
# by removing the scripts directory from the PATH when we execute a script.
(Path(d) / name).write_text(
#!/bin/sh
PATH="$(echo $PATH | tr ':' '\n' | grep -v {Path(d)} | tr '\n' ':')"
export PATH
- exec {shlex.join(str(s) for s in cmd)} "$@"
+ exec {shlex.join(str(s) for s in script)} "$@"
"""
)
)
make_executable(Path(d) / name)
- # We modify the PATH via --setenv so that bwrap itself is looked up in PATH before we change it.
- if root:
- # If a tools tree is specified, we should ignore any local modifications made to PATH as any of
- # those binaries might not work anymore when /usr is replaced wholesale. We also make sure that
- # both /usr/bin and /usr/sbin/ are searched so that e.g. if the host is Arch and the root is
- # Debian we don't ignore the binaries from /usr/sbin in the Debian root. We also keep the scripts
- # directory in PATH as all of them are interpreted and can't be messed up by replacing /usr.
- cmdline += ["--setenv", "PATH", f"{d}:/usr/bin:/usr/sbin"]
- else:
- cmdline += ["--setenv", "PATH", f"{d}:{os.environ['PATH']}"]
+ cmdline += ["--setenv", "PATH", f"{d}:{os.environ['PATH']}"]
if apivfs:
cmdline += [
cmdline += ["sh", "-c", f"{chmod} && exec $0 \"$@\" || exit $?"]
try:
- yield cmdline
+ result = run([*cmdline, *cmd], env=env, log=False)
+ except subprocess.CalledProcessError as e:
+ if log:
+ logging.error(f"\"{' '.join(str(s) for s in cmd)}\" returned non-zero exit code {e.returncode}.")
+ if ARG_DEBUG_SHELL.get():
+ run([*cmdline, "sh"], stdin=sys.stdin, check=False, env=env, log=False)
+ raise e
finally:
# Clean up some stuff that might get written by package manager post install scripts.
if apivfs:
if (apivfs / f).exists():
(apivfs / f).unlink()
-
-def bwrap(
- cmd: Sequence[PathString],
- *,
- root: Optional[Path] = None,
- apivfs: Optional[Path] = None,
- log: bool = True,
- scripts: Mapping[str, Sequence[PathString]] = {},
- # The following arguments are passed directly to run().
- stdin: _FILE = None,
- stdout: _FILE = None,
- stderr: _FILE = None,
- input: Optional[str] = None,
- check: bool = True,
- env: Mapping[str, PathString] = {},
-) -> CompletedProcess:
- with bwrap_cmd(root=root, apivfs=apivfs, scripts=scripts) as bwrap:
- try:
- result = run(
- [*bwrap, *cmd],
- text=True,
- env=env,
- log=False,
- stdin=stdin,
- stdout=stdout,
- stderr=stderr,
- input=input,
- check=check,
- )
- except subprocess.CalledProcessError as e:
- if log:
- logging.error(f"\"{' '.join(str(s) for s in cmd)}\" returned non-zero exit code {e.returncode}.")
- if ARG_DEBUG_SHELL.get():
- run(
- [*bwrap, "sh"],
- stdin=sys.stdin,
- check=False,
- env=env,
- log=False,
- )
- raise e
-
return result