(context.root / "etc/machine-id").write_text("uninitialized\n")
# Ensure /efi exists so that the ESP is mounted there, as recommended by
- # https://0pointer.net/blog/linux-boot-partitions.html. Use the most restrictive access mode we
- # can without tripping up mkfs tools since this directory is only meant to be overmounted and
- # should not be read from or written to.
+ # https://0pointer.net/blog/linux-boot-partitions.html. Use the most restrictive access
+ # mode we can without tripping up mkfs tools since this directory is only meant to be
+ # overmounted and should not be read from or written to.
with umask(~0o500):
(context.root / "efi").mkdir(exist_ok=True)
(context.root / "boot").mkdir(exist_ok=True)
- # Ensure /boot/loader/entries.srel exists and has type1 written to it to nudge kernel-install towards using
- # the boot loader specification layout.
+ # Ensure /boot/loader/entries.srel exists and has "type1" written to it to nudge
+ # kernel-install towards using the boot loader specification layout.
with umask(~0o700):
(context.root / "boot/loader").mkdir(exist_ok=True)
with umask(~0o600):
def finalize_scripts(config: Config, scripts: Mapping[str, Sequence[PathString]]) -> Iterator[Path]:
with tempfile.TemporaryDirectory(prefix="mkosi-scripts-") as d:
for name, script in scripts.items():
- # Make sure we don't end up in a recursive loop when we name a script after the binary it execs
- # by removing the scripts directory from the PATH when we execute a script.
+ # Make sure we don't end up in a recursive loop when we name a script after the binary
+ # it execs by removing the scripts directory from the PATH when we execute a script.
with (Path(d) / name).open("w") as f:
f.write("#!/bin/sh\n")
if context.config.find_binary(binary):
scripts[binary] = (binary, "--root", "/buildroot")
if ukify := context.config.find_binary("ukify"):
- # A script will always run with the tools tree mounted so we pass binary=None to disable the conditional search
- # logic of python_binary() depending on whether the binary is in an extra search path or not.
+ # A script will always run with the tools tree mounted, so we pass binary=None to disable
+ # the conditional search logic of python_binary() depending on whether the binary is in an
+ # extra search path or not.
scripts["ukify"] = (python_binary(context.config, binary=None), ukify)
return finalize_scripts(context.config, scripts | dict(helpers))
if config.profile:
env["PROFILE"] = config.profile
- # We make sure to mount everything in to make ssh work since syncing might involve git which could invoke ssh.
+ # We make sure to mount everything in to make ssh work since syncing might involve git which
+ # could invoke ssh.
if agent := os.getenv("SSH_AUTH_SOCK"):
env["SSH_AUTH_SOCK"] = agent
]
if (p := INVOKING_USER.home()).exists() and p != Path("/"):
- # We use a writable mount here to keep git worktrees working which encode absolute paths to the parent
- # git repository and might need to modify the git config in the parent git repository when submodules
- # are in use as well.
+ # We use a writable mount here to keep git worktrees working which encode absolute
+ # paths to the parent git repository and might need to modify the git config in the
+ # parent git repository when submodules are in use as well.
options += ["--bind", p, p]
env["HOME"] = os.fspath(p)
if (p := Path(f"/run/user/{os.getuid()}")).exists():
env=env | context.config.environment,
sandbox=context.sandbox(
binary=None,
- # postoutput scripts should run as (fake) root so that file ownership is always recorded as if
- # owned by root.
+ # postoutput scripts should run as (fake) root so that file ownership is
+ # always recorded as if owned by root.
options=[
"--ro-bind", script, "/work/postoutput",
"--ro-bind", json, "/work/config.json",
vmlinuz = context.root / "usr/lib/modules" / kver / type
if not vmlinuz.parent.exists():
continue
- # Some distributions (OpenMandriva) symlink /usr/lib/modules/<kver>/vmlinuz to /boot/vmlinuz-<kver>, so
- # get rid of the symlink and copy the actual vmlinuz to /usr/lib/modules/<kver>.
+ # Some distributions (OpenMandriva) symlink /usr/lib/modules/<kver>/vmlinuz to
+ # /boot/vmlinuz-<kver>, so get rid of the symlink and copy the actual vmlinuz to
+ # /usr/lib/modules/<kver>.
if vmlinuz.is_symlink() and vmlinuz.resolve().is_relative_to("/boot"):
vmlinuz.unlink()
if not vmlinuz.exists():
"--repository-key-fetch", str(config.repository_key_fetch),
"--repositories", ",".join(config.repositories),
"--sandbox-tree", ",".join(str(t) for t in config.sandbox_trees),
- # Note that when compress_output == Compression.none == 0 we don't pass --compress-output which means the
- # default compression will get picked. This is exactly what we want so that initrds are always compressed.
+ # Note that when compress_output == Compression.none == 0 we don't pass --compress-output
+ # which means the default compression will get picked. This is exactly what we want so that
+ # initrds are always compressed.
*(["--compress-output", str(config.compress_output)] if config.compress_output else []),
"--compress-level", str(config.compress_level),
"--with-network", str(config.with_network),
# Ubuntu Focal's kernel does not support zstd-compressed initrds so use xz instead.
if context.config.distribution == Distribution.ubuntu and context.config.release == "focal":
compression = Compression.xz
- # Older Debian and Ubuntu releases do not compress their kernel modules, so we compress the initramfs instead.
- # Note that this is not ideal since the compressed kernel modules will all be decompressed on boot which
- # requires significant memory.
+ # Older Debian and Ubuntu releases do not compress their kernel modules, so we compress the
+ # initramfs instead. Note that this is not ideal since the compressed kernel modules will
+ # all be decompressed on boot which requires significant memory.
elif context.config.distribution == Distribution.debian and context.config.release in ("sid", "testing"):
compression = Compression.none
else:
cmdline: Sequence[str],
output: Path,
) -> None:
- # Older versions of systemd-stub expect the cmdline section to be null terminated. We can't embed
- # nul terminators in argv so let's communicate the cmdline via a file instead.
+ # Older versions of systemd-stub expect the cmdline section to be null terminated. We can't
+ # embed NUL terminators in argv so let's communicate the cmdline via a file instead.
(context.workspace / "cmdline").write_text(f"{' '.join(cmdline).strip()}\x00")
if not (arch := context.config.architecture.to_efi()):
if want_signed_pcrs(context.config):
cmd += [
"--pcr-private-key", context.config.secure_boot_key,
- # SHA1 might be disabled in OpenSSL depending on the distro so we opt to not sign for SHA1 to avoid
- # having to manage a bunch of configuration to re-enable SHA1.
+ # SHA1 might be disabled in OpenSSL depending on the distro so we opt to not sign
+ # for SHA1 to avoid having to manage a bunch of configuration to re-enable SHA1.
"--pcr-banks", "sha256",
]
if context.config.secure_boot_key.exists():
sdmagic_text = sdmagic.read_text().strip("\x00")
- # Older versions of the stub have misaligned sections which results in an empty sdmagic text. Let's check for that
- # explicitly and treat it as no version.
+ # Older versions of the stub have misaligned sections which results in an empty sdmagic text.
+ # Let's check for that explicitly and treat it as no version.
+ #
# TODO: Drop this logic once every distribution we support ships systemd-stub v254 or newer.
if not sdmagic_text:
return None
def install_kernel(context: Context, partitions: Sequence[Partition]) -> None:
# Iterates through all kernel versions included in the image and generates a combined
- # kernel+initrd+cmdline+osrelease EFI file from it and places it in the /EFI/Linux directory of the ESP.
- # sd-boot iterates through them and shows them in the menu. These "unified" single-file images have the
- # benefit that they can be signed like normal EFI binaries, and can encode everything necessary to boot a
- # specific root device, including the root hash.
+ # kernel+initrd+cmdline+osrelease EFI file from it and places it in the /EFI/Linux directory of
+ # the ESP. sd-boot iterates through them and shows them in the menu. These "unified"
+ # single-file images have the benefit that they can be signed like normal EFI binaries, and can
+ # encode everything necessary to boot a specific root device, including the root hash.
if context.config.output_format in (OutputFormat.uki, OutputFormat.esp):
return
# Extract the combined initrds from the UKI so we can use it to direct kernel boot with qemu if needed.
extract_pe_section(context, uki, ".initrd", context.staging / context.config.output_split_initrd)
- # ukify will have signed the kernel image as well. Let's make sure we put the signed kernel image in the output
- # directory instead of the unsigned one by reading it from the UKI.
+ # ukify will have signed the kernel image as well. Let's make sure we put the signed kernel
+ # image in the output directory instead of the unsigned one by reading it from the UKI.
extract_pe_section(context, uki, ".linux", context.staging / context.config.output_split_kernel)
def check_inputs(config: Config) -> None:
"""
- Make sure all the inputs exist that aren't checked during config parsing because they might be created by an
- earlier build.
+ Make sure all the inputs exist that aren't checked during config parsing because they might
+ be created by an earlier build.
"""
for base in config.base_trees:
if not base.exists():
"--remove",
# Exclude APIVFS and temporary files directories.
*(f"--exclude-prefix={d}" for d in ("/tmp", "/var/tmp", "/run", "/proc", "/sys", "/dev")),
- # Exclude /var if we're not invoked as root as all the chown()'s for daemon owned directories will fail
+ # Exclude /var if we're not invoked as root as all the chown()'s for daemon owned
+ # directories will fail.
*(["--exclude-prefix=/var"] if os.getuid() != 0 or userns_has_single_user() else []),
],
env={"SYSTEMD_TMPFILES_FORCE_SUBVOL": "0"},
- # systemd-tmpfiles can exit with DATAERR or CANTCREAT in some cases which are handled as success by the
- # systemd-tmpfiles service so we handle those as success as well.
+ # systemd-tmpfiles can exit with DATAERR or CANTCREAT in some cases which are handled
+ # as success by the systemd-tmpfiles service so we handle those as success as well.
success_exit_status=(0, 65, 73),
sandbox=context.sandbox(
binary="systemd-tmpfiles",
options=[
"--bind", context.root, "/buildroot",
- # systemd uses acl.h to parse ACLs in tmpfiles snippets which uses the host's passwd so we have to
- # mount the image's passwd over it to make ACL parsing work.
+ # systemd uses acl.h to parse ACLs in tmpfiles snippets which uses the host's
+ # passwd so we have to mount the image's passwd over it to make ACL parsing
+ # work.
*finalize_passwd_mounts(context.root),
- # Sometimes directories are configured to be owned by root in tmpfiles snippets so we want to make
- # sure those chown()'s succeed by making ourselves the root user so that the root user exists.
+ # Sometimes directories are configured to be owned by root in tmpfiles snippets
+ # so we want to make sure those chown()'s succeed by making ourselves the root
+ # user so that the root user exists.
"--become-root",
],
),
run(["systemd-firstboot", "--root=/buildroot", "--force", *options],
sandbox=context.sandbox(binary="systemd-firstboot", options=["--bind", context.root, "/buildroot"]))
- # Initrds generally don't ship with only /usr so there's not much point in putting the credentials in
- # /usr/lib/credstore.
+ # Initrds generally don't ship with only /usr so there's not much point in putting the
+ # credentials in /usr/lib/credstore.
if context.config.output_format != OutputFormat.cpio or not context.config.make_initrd:
with umask(~0o755):
(context.root / "usr/lib/credstore").mkdir(exist_ok=True)
context.staging / context.config.output_with_format,
]
options: list[PathString] = [
- # Make sure we're root so that the mkfs tools invoked by systemd-repart think the files that go
- # into the disk image are owned by root.
+ # Make sure we're root so that the mkfs tools invoked by systemd-repart think the files
+ # that go into the disk image are owned by root.
"--become-root",
"--bind", context.staging, context.staging,
]
bios = (context.config.bootable != ConfigFeature.disabled and want_grub_bios(context))
if esp or bios:
- # Even if we're doing BIOS, let's still use the ESP to store the kernels, initrds and grub
- # modules. We cant use UKIs so we have to put each kernel and initrd on the ESP twice, so
- # let's make the ESP twice as big in that case.
+ # Even if we're doing BIOS, let's still use the ESP to store the kernels, initrds
+ # and grub modules. We cant use UKIs so we have to put each kernel and initrd on
+ # the ESP twice, so let's make the ESP twice as big in that case.
(defaults / "00-esp.conf").write_text(
textwrap.dedent(
f"""\
)
)
- # If grub for BIOS is installed, let's add a BIOS boot partition onto which we can install grub.
+ # If grub for BIOS is installed, let's add a BIOS boot partition onto which we can
+ # install grub.
if bios:
(defaults / "05-bios.conf").write_text(
textwrap.dedent(
definitions = context.workspace / "esp-definitions"
definitions.mkdir(exist_ok=True)
- # Use a minimum of 36MB or 260MB depending on sector size because otherwise the generated FAT filesystem will have
- # too few clusters to be considered a FAT32 filesystem by OVMF which will refuse to boot from it.
- # See https://superuser.com/questions/1702331/what-is-the-minimum-size-of-a-4k-native-partition-when-formatted-with-fat32/1717643#1717643
+ # Use a minimum of 36MB or 260MB depending on sector size because otherwise the generated FAT
+ # filesystem will have too few clusters to be considered a FAT32 filesystem by OVMF which will
+ # refuse to boot from it. See
+ # https://superuser.com/questions/1702331/what-is-the-minimum-size-of-a-4k-native-partition-when-formatted-with-fat32/1717643#1717643
if context.config.sector_size == 512:
m = 36
# TODO: Figure out minimum size for 2K sector size
# Always reserve 10MB for filesystem metadata.
size = max(uki.stat().st_size, (m - 10) * 1024**2) + 10 * 1024**2
- # TODO: Remove the extra 4096 for the max size once https://github.com/systemd/systemd/pull/29954 is in a stable
- # release.
+ # TODO: Remove the extra 4096 for the max size once
+ # https://github.com/systemd/systemd/pull/29954 is in a stable release.
(definitions / "00-esp.conf").write_text(
textwrap.dedent(
f"""\
workdir(output),
]
options: list[PathString] = [
- # Make sure we're root so that the mkfs tools invoked by systemd-repart think the files that go
- # into the disk image are owned by root.
+ # Make sure we're root so that the mkfs tools invoked by systemd-repart think the files
+ # that go into the disk image are owned by root.
"--become-root",
"--bind", output.parent, workdir(output.parent),
"--ro-bind", context.root, "/buildroot",
with tempfile.TemporaryDirectory() as tmp:
os.chmod(tmp, 0o755)
- # cp doesn't support excluding directories but we can imitate it by bind mounting an empty directory
- # over the directories we want to exclude.
+ # cp doesn't support excluding directories but we can imitate it by bind mounting
+ # an empty directory over the directories we want to exclude.
exclude: list[PathString]
if d == "cache":
exclude = flatten(
return
with umask(~0o755):
- # Using a btrfs subvolume as the upperdir in an overlayfs results in EXDEV so make sure we create
- # the root directory as a regular directory if the Overlay= option is enabled.
+ # Using a btrfs subvolume as the upperdir in an overlayfs results in EXDEV so make sure we
+ # create the root directory as a regular directory if the Overlay= option is enabled.
if context.config.overlay:
context.root.mkdir()
else:
stack.callback(lambda: (config.output_dir_or_cwd() / f"{name}.nspawn").unlink(missing_ok=True))
shutil.copy2(config.nspawn_settings, config.output_dir_or_cwd() / f"{name}.nspawn")
- # If we're booting a directory image that wasn't built by root, we always make an ephemeral copy to avoid
- # ending up with files not owned by the directory image owner in the directory image.
+ # If we're booting a directory image that wasn't built by root, we always make an ephemeral
+ # copy to avoid ending up with files not owned by the directory image owner in the
+ # directory image.
if config.ephemeral or (
config.output_format == OutputFormat.directory and
args.verb == Verb.boot and
owner = os.stat(fname).st_uid
if owner != 0:
- # Let's allow running a shell in a non-ephemeral image but in that case only map a single user into the
- # image so it can't get poluted with files or directories owned by other users.
+ # Let's allow running a shell in a non-ephemeral image but in that case only map a
+ # single user into the image so it can't get poluted with files or directories
+ # owned by other users.
if args.verb == Verb.shell and config.output_format == OutputFormat.directory and not config.ephemeral:
range = 1
else:
for tree in config.runtime_trees:
target = Path("/root/src") / (tree.target or "")
- # We add norbind because very often RuntimeTrees= will be used to mount the source directory into the
- # container and the output directory from which we're running will very likely be a subdirectory of the
- # source directory which would mean we'd be mounting the container root directory as a subdirectory in
- # itself which tends to lead to all kinds of weird issues, which we avoid by not doing a recursive mount
- # which means the container root directory mounts will be skipped.
+ # We add norbind because very often RuntimeTrees= will be used to mount the source
+ # directory into the container and the output directory from which we're running will
+ # very likely be a subdirectory of the source directory which would mean we'd be
+ # mounting the container root directory as a subdirectory in itself which tends to lead
+ # to all kinds of weird issues, which we avoid by not doing a recursive mount which
+ # means the container root directory mounts will be skipped.
uidmap = "rootidmap" if tree.source.stat().st_uid != 0 else "noidmap"
cmdline += ["--bind", f"{tree.source}:{target}:norbind,{uidmap}"]
# Add nspawn options first since systemd-nspawn ignores all options after the first argument.
argv = args.cmdline
- # When invoked by the kernel, all unknown arguments are passed as environment variables to pid1. Let's
- # mimick the same behavior when we invoke nspawn as a container.
+ # When invoked by the kernel, all unknown arguments are passed as environment variables
+ # to pid1. Let's mimick the same behavior when we invoke nspawn as a container.
for arg in itertools.chain(config.kernel_command_line, config.kernel_command_line_extra):
name, sep, value = arg.partition("=")
- # If there's a '.' in the argument name, it's not considered an environment variable by the kernel.
+ # If there's a '.' in the argument name, it's not considered an environment
+ # variable by the kernel.
if sep and "." not in name:
cmdline += ["--setenv", f"{name.replace('-', '_')}={value}"]
else:
- # kernel cmdline config of the form systemd.xxx= get interpreted by systemd when running in nspawn
- # as well.
+ # kernel cmdline config of the form systemd.xxx= get interpreted by systemd
+ # when running in nspawn as well.
argv += [arg]
cmdline += argv
return (
args.force >= force or
not (config.output_dir_or_cwd() / config.output_with_compression).exists() or
- # When the output is a directory, its name is the same as the symlink we create that points to the actual
- # output when not building a directory. So if the full output path exists, we have to check that it's not
- # a symlink as well.
+ # When the output is a directory, its name is the same as the symlink we create that points
+ # to the actual output when not building a directory. So if the full output path exists, we
+ # have to check that it's not a symlink as well.
(config.output_dir_or_cwd() / config.output_with_compression).is_symlink()
)
def run_clean(args: Args, config: Config, *, resources: Path) -> None:
- # We remove any cached images if either the user used --force twice, or he/she called "clean" with it
- # passed once. Let's also remove the downloaded package cache if the user specified one additional
- # "--force".
+ # We remove any cached images if either the user used --force twice, or he/she called "clean"
+ # with it passed once. Let's also remove the downloaded package cache if the user specified one
+ # additional "--force".
- # We don't want to require a tools tree to run mkosi clean so we pass in a sandbox that disables use of the tools
- # tree. We still need a sandbox as we need to acquire privileges to be able to remove various files from the
- # rootfs.
+ # We don't want to require a tools tree to run mkosi clean so we pass in a sandbox that
+ # disables use of the tools tree. We still need a sandbox as we need to acquire privileges to
+ # be able to remove various files from the rootfs.
sandbox = functools.partial(config.sandbox, tools=False)
if args.verb == Verb.clean:
if (config.output_dir_or_cwd() / output).exists() or (config.output_dir_or_cwd() / output).is_symlink()
}
- # Make sure we resolve the symlink we create in the output directory and remove its target as well as it might
- # not be in the list of outputs anymore if the compression or output format was changed.
+ # Make sure we resolve the symlink we create in the output directory and remove its target
+ # as well as it might not be in the list of outputs anymore if the compression or output
+ # format was changed.
outputs |= {o.resolve() for o in outputs}
if outputs:
check_workspace_directory(last)
- # If we're doing an incremental build and the cache is not out of date, don't clean up the tools tree
- # so that we can reuse the previous one.
+ # If we're doing an incremental build and the cache is not out of date, don't clean up the
+ # tools tree so that we can reuse the previous one.
if (
tools and
(
):
run_clean(args, tools, resources=resources)
- # First, process all directory removals because otherwise if different images share directories a later
- # image build could end up deleting the output generated by an earlier image build.
+ # First, process all directory removals because otherwise if different images share directories
+ # a later image build could end up deleting the output generated by an earlier image build.
if needs_build(args, last) or args.wipe_build_dir:
for config in images:
run_clean(args, config, resources=resources)
run_sync_scripts(config)
for config in images:
- # If the output format is "none" and there are no build scripts, there's nothing to do so exit early.
+ # If the output format is "none" and there are no build scripts, there's nothing to
+ # do so exit early.
if config.output_format == OutputFormat.none and not config.build_scripts:
continue
# SPDX-License-Identifier: LGPL-2.1-or-later
"""
-This is a standalone implementation of sandboxing which is used by mkosi. Note that this is invoked many times while
-building the image and as a result, the performance of this script has a substantial impact on the performance of mkosi
-itself. To keep the runtime of this script to a minimum, please don't import any extra modules if it can be avoided.
+This is a standalone implementation of sandboxing which is used by mkosi. Note that this is
+invoked many times while building the image and as a result, the performance of this script has a
+substantial impact on the performance of mkosi itself. To keep the runtime of this script to a
+minimum, please don't import any extra modules if it can be avoided.
+
"""
import ctypes
def statfs(path: str) -> int:
- # struct statfs is 120 bytes, which equals 15 longs. Since we only care about the first field and the first field
- # is of type long, we avoid declaring the full struct by just passing an array of 15 longs as the output argument.
+ # struct statfs is 120 bytes, which equals 15 longs. Since we only care about the first field
+ # and the first field is of type long, we avoid declaring the full struct by just passing an
+ # array of 15 longs as the output argument.
buffer = (ctypes.c_long * 15)()
if libc.statfs(path.encode(), ctypes.byref(buffer)) < 0:
def cap_permitted_to_ambient() -> None:
"""
- When unsharing a user namespace and mapping the current user to itself, the user has a full set of capabilities in
- the user namespace. This allows the user to do mounts after unsharing a mount namespace for example. However, these
- capabilities are lost again when the user executes a subprocess. As we also want subprocesses invoked by the user
- to be able to mount stuff, we make sure the capabilities are inherited by adding all the user's capabilities to the
- inherited and ambient capabilities set, which makes sure that they are passed down to subprocesses.
+ When unsharing a user namespace and mapping the current user to itself, the user has a full
+ set of capabilities in the user namespace. This allows the user to do mounts after unsharing a
+ mount namespace for example. However, these capabilities are lost again when the user executes
+ a subprocess. As we also want subprocesses invoked by the user to be able to mount stuff, we
+ make sure the capabilities are inherited by adding all the user's capabilities to the inherited
+ and ambient capabilities set, which makes sure that they are passed down to subprocesses.
"""
header = cap_user_header_t(LINUX_CAPABILITY_VERSION_3, 0)
payload = (cap_user_data_t * LINUX_CAPABILITY_U32S_3)()
def seccomp_suppress_chown() -> None:
"""
- There's still a few files and directories left in distributions in /usr and /etc that are not owned by root. This
- causes package managers to fail to install the corresponding packages when run from a single uid user namespace.
- Unfortunately, non-root users can only create files owned by their own uid. To still allow non-root users to build
- images, if requested we install a seccomp filter that makes calls to chown() and friends a noop.
+ There's still a few files and directories left in distributions in /usr and /etc that are
+ not owned by root. This causes package managers to fail to install the corresponding packages
+ when run from a single uid user namespace. Unfortunately, non-root users can only create files
+ owned by their own uid. To still allow non-root users to build images, if requested we install
+ a seccomp filter that makes calls to chown() and friends a noop.
"""
libseccomp = ctypes.CDLL("libseccomp.so.2")
if libseccomp is None:
def mount_rbind(src: str, dst: str, attrs: int = 0) -> None:
"""
- When using the old mount syscall to do a recursive bind mount, mount options are not applied recursively. Because
- we want to do recursive read-only bind mounts in some cases, we use the new mount API for that which does allow
- recursively changing mount options when doing bind mounts.
+ When using the old mount syscall to do a recursive bind mount, mount options are not
+ applied recursively. Because we want to do recursive read-only bind mounts in some cases, we
+ use the new mount API for that which does allow recursively changing mount options when doing
+ bind mounts.
"""
-
flags = AT_NO_AUTOMOUNT | AT_RECURSIVE | AT_SYMLINK_NOFOLLOW | OPEN_TREE_CLONE
try:
def become_user(uid: int, gid: int) -> None:
"""
- This function implements the required dance to unshare a user namespace and map the current user to itself or to
- root within it. The kernel only allows a process running outside of the unshared user namespace to write the
- necessary uid and gid mappings, so we fork off a child process, make it wait until the parent process has unshared
- a user namespace, and then writes the necessary uid and gid mappings.
+ This function implements the required dance to unshare a user namespace and map the current
+ user to itself or to root within it. The kernel only allows a process running outside of the
+ unshared user namespace to write the necessary uid and gid mappings, so we fork off a child
+ process, make it wait until the parent process has unshared a user namespace, and then writes
+ the necessary uid and gid mappings.
"""
ppid = os.getpid()
else:
rest.append(fsop)
- # Drop all bind mounts that are mounted from beneath another bind mount to the same location within the new
- # rootfs.
+ # Drop all bind mounts that are mounted from beneath another bind mount to the same
+ # location within the new rootfs.
optimized = [
m for m in binds
if not any(
)
]
- # Make sure bind mounts override other operations on the same destination by appending them to the rest and
- # depending on python's stable sort behavior.
+ # Make sure bind mounts override other operations on the same destination by appending them
+ # to the rest and depending on python's stable sort behavior.
return sorted([*rest, *optimized], key=lambda fsop: splitpath(fsop.dst))
super().__init__(dst)
def execute(self, oldroot: str, newroot: str) -> None:
- # We don't put actual devices in /dev, just the API stuff in there that all manner of things depend on,
- # like /dev/null.
+ # We don't put actual devices in /dev, just the API stuff in there that all manner of
+ # things depend on, like /dev/null.
dst = chase(newroot, self.dst)
with umask(~0o755):
os.makedirs(dst, exist_ok=True)
- # Note that the mode is curcial here. If the default mode (1777) is used, trying to access /dev/null fails
- # with EACCESS for unknown reasons.
+ # Note that the mode is curcial here. If the default mode (1777) is used, trying to access
+ # /dev/null fails with EACCESS for unknown reasons.
mount("tmpfs", dst, "tmpfs", 0, "mode=0755")
for node in ("null", "zero", "full", "random", "urandom", "tty"):
self.workdir = workdir
super().__init__(dst)
- # This supports being used as a context manager so we can reuse the logic for mount_overlay() in mounts.py.
+ # This supports being used as a context manager so we can reuse the logic for mount_overlay()
+ # in mounts.py.
def __enter__(self) -> None:
self.execute("/", "/")
"""
def main() -> None:
- # We don't use argparse as it takes +- 10ms to import and since this is purely for internal use, it's not necessary
- # to have good UX for this CLI interface so it's trivial to write ourselves.
+ # We don't use argparse as it takes +- 10ms to import and since this is purely for internal
+ # use, it's not necessary to have good UX for this CLI interface so it's trivial to write
+ # ourselves.
argv = list(reversed(sys.argv[1:]))
fsops: list[FSOperation] = []
setenv = []
userns = acquire_privileges(become_root=become_root)
- # If we're root in a user namespace with a single user, we're still not going to be able to chown() stuff, so check
- # for that and apply the seccomp filter as well in that case.
+ # If we're root in a user namespace with a single user, we're still not going to be able to
+ # chown() stuff, so check for that and apply the seccomp filter as well in that case.
if suppress_chown and (userns or userns_has_single_user()):
seccomp_suppress_chown()
if not userns:
mount("", "/", "", MS_SLAVE | MS_REC, "")
- # We need a workspace to setup the sandbox, the easiest way to do this in a tmpfs, since it's automatically cleaned
- # up. We need a mountpoint to put the workspace on and it can't be root, so let's use /tmp which is almost
- # guaranteed to exist.
+ # We need a workspace to setup the sandbox, the easiest way to do this in a tmpfs, since it's
+ # automatically cleaned up. We need a mountpoint to put the workspace on and it can't be root,
+ # so let's use /tmp which is almost guaranteed to exist.
mount("tmpfs", "/tmp", "tmpfs", 0, "")
os.chdir("/tmp")
# Make the workspace in /tmp / and put the old rootfs in oldroot.
if libc.pivot_root(b".", b"oldroot") < 0:
- # pivot_root() can fail in the initramfs since / isn't a mountpoint there, so let's fall back to MS_MOVE if
- # that's the case.
+ # pivot_root() can fail in the initramfs since / isn't a mountpoint there, so let's fall
+ # back to MS_MOVE if that's the case.
# First we move the old rootfs to oldroot.
mount("/", "oldroot", "", MS_BIND | MS_REC, "")
os.chroot(".")
os.chdir(".")
- # When we use MS_MOVE we have to unmount oldroot/tmp manually to reveal the original /tmp again as it might
- # contain stuff that we want to mount into the sandbox.
+ # When we use MS_MOVE we have to unmount oldroot/tmp manually to reveal the original /tmp
+ # again as it might contain stuff that we want to mount into the sandbox.
umount2("oldroot/tmp", MNT_DETACH)
for fsop in fsops:
fsop.execute("oldroot", "newroot")
- # Now that we're done setting up the sandbox let's pivot root into newroot to make it the new root. We use the
- # pivot_root(".", ".") process described in the pivot_root() man page.
+ # Now that we're done setting up the sandbox let's pivot root into newroot to make it the new
+ # root. We use the pivot_root(".", ".") process described in the pivot_root() man page.
os.chdir("newroot")
- # We're guaranteed to have / be a mount when we get here, so pivot_root() won't fail anymore, even if we're in the
- # initramfs.
+ # We're guaranteed to have / be a mount when we get here, so pivot_root() won't fail anymore,
+ # even if we're in the initramfs.
if libc.pivot_root(b".", b".") < 0:
oserror()
# As documented in the pivot_root() man page, this will unmount the old rootfs.
umount2(".", MNT_DETACH)
- # Avoid surprises by making sure the sandbox's mount propagation is shared. This doesn't actually mean mounts get
- # propagated into the host. Instead, a new mount propagation peer group is set up.
+ # Avoid surprises by making sure the sandbox's mount propagation is shared. This doesn't
+ # actually mean mounts get propagated into the host. Instead, a new mount propagation peer
+ # group is set up.
mount("", ".", "", MS_SHARED | MS_REC, "")
if chdir:
try:
os.execvp(argv[0], argv)
except OSError as e:
- # Let's return a recognizable error when the binary we're going to execute is not found. We use 127 as that's
- # the exit code used by shells when a program to execute is not found.
+ # Let's return a recognizable error when the binary we're going to execute is not found.
+ # We use 127 as that's the exit code used by shells when a program to execute is not found.
if e.errno == ENOENT:
sys.exit(127)