with complete_step("Removing files…"):
for pattern in context.config.remove_files:
- rmtree(*context.root.glob(pattern.lstrip("/")), tools=context.config.tools(), sandbox=context.sandbox)
+ rmtree(*context.root.glob(pattern.lstrip("/")), sandbox=context.sandbox)
def install_distribution(context: Context) -> None:
@contextlib.contextmanager
-def finalize_scripts(scripts: Mapping[str, Sequence[PathString]], root: Path) -> Iterator[Path]:
+def finalize_scripts(config: Config, scripts: Mapping[str, Sequence[PathString]]) -> Iterator[Path]:
with tempfile.TemporaryDirectory(prefix="mkosi-scripts") as d:
# Make sure than when mkosi-as-caller is used the scripts can still be accessed.
os.chmod(d, 0o755)
with (Path(d) / name).open("w") as f:
f.write("#!/bin/sh\n")
- if find_binary(name, root=root):
+ if config.find_binary(name):
f.write(
textwrap.dedent(
"""\
) -> AbstractContextManager[Path]:
scripts: dict[str, Sequence[PathString]] = {}
for binary in ("useradd", "groupadd"):
- if find_binary(binary, root=context.config.tools()):
+ if context.config.find_binary(binary):
scripts[binary] = (binary, "--root", "/buildroot")
- return finalize_scripts(scripts | dict(helpers), root=context.config.tools())
+ return finalize_scripts(context.config, scripts | dict(helpers))
@contextlib.contextmanager
["/work/configure"],
env=env | config.environment,
sandbox=config.sandbox(
+ binary=None,
tools=False,
mounts=[*sources, Mount(script, "/work/configure", ro=True)],
options=["--dir", "/work/src", "--chdir", "/work/src"]
env=env | context.config.environment,
stdin=sys.stdin,
sandbox=context.sandbox(
+ binary=None,
network=True,
mounts=mounts,
options=["--dir", "/work/src", "--chdir", "/work/src"]
env=env | context.config.environment,
stdin=sys.stdin,
sandbox=context.sandbox(
+ binary=None,
network=True,
mounts=[
*sources,
env=env | context.config.environment,
stdin=sys.stdin,
sandbox=context.sandbox(
+ binary=None,
network=context.config.with_network,
mounts=[
*sources,
env=env | context.config.environment,
stdin=sys.stdin,
sandbox=context.sandbox(
+ binary=None,
network=context.config.with_network,
mounts=[
*sources,
env=env | context.config.environment,
stdin=sys.stdin,
sandbox=context.sandbox(
+ binary=None,
network=context.config.with_network,
mounts=[
*sources,
"-in", certificate,
],
stdout=subprocess.PIPE,
- sandbox=context.sandbox(mounts=[Mount(certificate, certificate, ro=True)]),
+ sandbox=context.sandbox(binary="openssl", mounts=[Mount(certificate, certificate, ro=True)]),
).stdout
for line in output.splitlines():
],
stdout=f,
sandbox=context.sandbox(
+ binary="openssl",
mounts=[
Mount(context.config.secure_boot_key, context.config.secure_boot_key, ro=True),
Mount(context.config.secure_boot_certificate, context.config.secure_boot_certificate, ro=True),
"-d", context.workspace / "pesign",
],
sandbox=context.sandbox(
+ binary="pk12util",
mounts=[
Mount(context.workspace / "secure-boot.p12", context.workspace / "secure-boot.p12", ro=True),
Mount(context.workspace / "pesign", context.workspace / "pesign"),
if (
context.config.secure_boot_sign_tool == SecureBootSignTool.sbsign or
context.config.secure_boot_sign_tool == SecureBootSignTool.auto and
- find_binary("sbsign", root=context.config.tools()) is not None
+ context.config.find_binary("sbsign") is not None
):
with tempfile.NamedTemporaryFile(dir=output.parent, prefix=output.name) as f:
os.chmod(f.name, stat.S_IMODE(input.stat().st_mode))
cmd,
stdout=f,
sandbox=context.sandbox(
+ binary="sbsign",
mounts=mounts,
devices=context.config.secure_boot_key_source.type != KeySource.Type.file,
)
elif (
context.config.secure_boot_sign_tool == SecureBootSignTool.pesign or
context.config.secure_boot_sign_tool == SecureBootSignTool.auto and
- find_binary("pesign", root=context.config.tools()) is not None
+ context.config.find_binary("pesign") is not None
):
pesign_prepare(context)
with tempfile.NamedTemporaryFile(dir=output.parent, prefix=output.name) as f:
],
stdout=f,
sandbox=context.sandbox(
+ binary="pesign",
mounts=[
Mount(context.workspace / "pesign", context.workspace / "pesign", ro=True),
Mount(input, input, ro=True),
if not any(gen_kernel_images(context)) and context.config.bootable == ConfigFeature.auto:
return
- if not find_binary("bootctl", root=context.config.tools()):
+ if not context.config.find_binary("bootctl"):
if context.config.bootable == ConfigFeature.enabled:
die("An EFI bootable image with systemd-boot was requested but bootctl was not found")
return
run(
["bootctl", "install", "--root=/buildroot", "--all-architectures", "--no-variables"],
env={"SYSTEMD_ESP_PATH": "/efi", "SYSTEMD_XBOOTLDR_PATH": "/boot"},
- sandbox=context.sandbox(mounts=[Mount(context.root, "/buildroot")]),
+ sandbox=context.sandbox(binary="bootctl", mounts=[Mount(context.root, "/buildroot")]),
)
if context.config.shim_bootloader != ShimBootloader.none:
],
stdout=f,
sandbox=context.sandbox(
+ binary="openssl",
mounts=[
Mount(
context.config.secure_boot_certificate,
],
stdout=f,
sandbox=context.sandbox(
+ binary="sbsiglist",
mounts=[Mount(context.workspace / "mkosi.der", context.workspace / "mkosi.der", ro=True)]
),
)
cmd,
stdout=f,
sandbox=context.sandbox(
+ binary="sbvarsign",
mounts=mounts,
devices=context.config.secure_boot_key_source.type != KeySource.Type.file,
),
return None
-def find_grub_binary(binary: str, root: Path = Path("/")) -> Optional[Path]:
+def find_grub_binary(config: Config, binary: str) -> Optional[Path]:
assert "grub" not in binary
- return find_binary(f"grub-{binary}", f"grub2-{binary}", root=root)
+ return config.find_binary(f"grub-{binary}", f"grub2-{binary}")
def want_grub_efi(context: Context) -> bool:
installed = True
for binary in ("mkimage", "bios-setup"):
- if find_grub_binary(binary, root=context.config.tools()):
+ if find_grub_binary(context.config, binary):
continue
if context.config.bootable == ConfigFeature.enabled:
output: Optional[Path] = None,
sbat: Optional[Path] = None,
) -> None:
- mkimage = find_grub_binary("mkimage", root=context.config.tools())
+ mkimage = find_grub_binary(context.config, "mkimage")
assert mkimage
directory = find_grub_directory(context, target=target)
*modules,
],
sandbox=context.sandbox(
+ binary=mkimage,
mounts=[
Mount(directory, "/grub"),
Mount(earlyconfig.name, earlyconfig.name, ro=True),
if not want_grub_bios(context, partitions):
return
- setup = find_grub_binary("bios-setup", root=context.config.tools())
+ setup = find_grub_binary(context.config, "bios-setup")
assert setup
directory = find_grub_directory(context, target="i386-pc")
context.staging / context.config.output_with_format,
],
sandbox=context.sandbox(
+ binary=setup,
mounts=[
Mount(directory, "/grub"),
Mount(context.staging, context.staging),
src, t,
preserve=preserve,
use_subvolumes=config.use_subvolumes,
- tools=config.tools(),
sandbox=config.sandbox,
)
run(
["systemd-dissect", "--copy-from", src, "/", t],
sandbox=config.sandbox(
+ binary="systemd-dissect",
devices=True,
network=True,
mounts=[Mount(src, src, ro=True), Mount(t.parent, t.parent)],
p, context.pkgmngr / "etc/crypto-policies",
preserve=False,
dereference=True,
- tools=context.config.tools(),
sandbox=context.config.sandbox,
)
copy_tree(
d, context.packages,
use_subvolumes=context.config.use_subvolumes,
- tools=context.config.tools(),
sandbox=context.sandbox,
)
copy_tree(
context.install_dir, context.root,
use_subvolumes=context.config.use_subvolumes,
- tools=context.config.tools(),
sandbox=context.sandbox,
)
def gzip_binary(context: Context) -> str:
- return "pigz" if find_binary("pigz", root=context.config.tools()) else "gzip"
+ return "pigz" if context.config.find_binary("pigz") else "gzip"
def fixup_vmlinuz_location(context: Context) -> None:
[python_binary(context.config)],
input=pefile,
stdout=f,
- sandbox=context.sandbox(mounts=[Mount(binary, binary, ro=True)])
+ sandbox=context.sandbox(binary=python_binary(context.config), mounts=[Mount(binary, binary, ro=True)])
)
return output
config.sign_expected_pcr == ConfigFeature.enabled or
(
config.sign_expected_pcr == ConfigFeature.auto and
- find_binary("systemd-measure", "/usr/lib/systemd/systemd-measure", root=config.tools()) is not None
+ config.find_binary("systemd-measure", "/usr/lib/systemd/systemd-measure") is not None
)
)
if not (arch := context.config.architecture.to_efi()):
die(f"Architecture {context.config.architecture} does not support UEFI")
+ if not (ukify := context.config.find_binary("ukify", "/usr/lib/systemd/ukify")):
+ die("Could not find ukify")
+
cmd: list[PathString] = [
- find_binary("ukify", root=context.config.tools()) or "/usr/lib/systemd/ukify",
+ ukify,
"--cmdline", f"@{context.workspace / 'cmdline'}",
"--os-release", f"@{context.root / 'usr/lib/os-release'}",
"--stub", stub,
run(
cmd,
sandbox=context.sandbox(
+ binary=ukify,
mounts=mounts,
devices=context.config.secure_boot_key_source.type != KeySource.Type.file,
),
context.config.unified_kernel_images == ConfigFeature.enabled or (
context.config.unified_kernel_images == ConfigFeature.auto and
systemd_stub_binary(context).exists() and
- find_binary("ukify", "/usr/lib/systemd/ukify", root=context.config.tools()) is not None
+ context.config.find_binary("ukify", "/usr/lib/systemd/ukify") is not None
)
)
def find_entry_token(context: Context) -> str:
if (
"--version" not in run(["kernel-install", "--help"],
- stdout=subprocess.PIPE, sandbox=context.sandbox()).stdout or
+ stdout=subprocess.PIPE, sandbox=context.sandbox(binary="kernel-install")).stdout or
systemd_tool_version(context.config, "kernel-install") < "255.1"
):
return context.config.image_id or context.config.distribution.name
- output = json.loads(run(["kernel-install", "--root=/buildroot", "--json=pretty", "inspect"],
- sandbox=context.sandbox(mounts=[Mount(context.root, "/buildroot", ro=True)]),
- stdout=subprocess.PIPE,
- env={"SYSTEMD_ESP_PATH": "/efi", "SYSTEMD_XBOOTLDR_PATH": "/boot"}).stdout)
+ output = json.loads(
+ run(
+ ["kernel-install", "--root=/buildroot", "--json=pretty", "inspect"],
+ sandbox=context.sandbox(binary="kernel-install", mounts=[Mount(context.root, "/buildroot", ro=True)]),
+ stdout=subprocess.PIPE,
+ env={"SYSTEMD_ESP_PATH": "/efi", "SYSTEMD_XBOOTLDR_PATH": "/boot"},
+ ).stdout
+ )
+
logging.debug(json.dumps(output, indent=4))
return cast(str, output["EntryToken"])
move_tree(
src, dst,
use_subvolumes=context.config.use_subvolumes,
- tools=context.config.tools(),
sandbox=context.sandbox,
)
return
if not dst:
dst = src.parent / f"{src.name}{compression.extension()}"
+ cmd = compressor_command(context, compression)
+
with complete_step(f"Compressing {src} with {compression}"):
with src.open("rb") as i:
src.unlink() # if src == dst, make sure dst doesn't truncate the src file but creates a new file.
with dst.open("wb") as o:
- run(compressor_command(context, compression), stdin=i, stdout=o, sandbox=context.sandbox())
+ run(cmd, stdin=i, stdout=o, sandbox=context.sandbox(binary=cmd[0]))
def copy_uki(context: Context) -> None:
stdout=o,
# GPG messes with the user's home directory so we run it as the invoking user.
sandbox=context.sandbox(
+ binary="gpg",
mounts=mounts,
options=options,
extra=["setpriv", f"--reuid={INVOKING_USER.uid}", f"--regid={INVOKING_USER.gid}", "--clear-groups"],
def check_tool(config: Config, *tools: PathString, reason: str, hint: Optional[str] = None) -> Path:
- tool = find_binary(*tools, root=config.tools())
+ tool = config.find_binary(*tools)
if not tool:
die(f"Could not find '{tools[0]}' which is required to {reason}.", hint=hint)
with complete_step(f"Running depmod for {kver}"):
run(["depmod", "--all", "--basedir", "/buildroot", kver],
- sandbox=context.sandbox(mounts=[Mount(context.root, "/buildroot")]))
+ sandbox=context.sandbox(binary="depmod", mounts=[Mount(context.root, "/buildroot")]))
def run_sysusers(context: Context) -> None:
if context.config.overlay or context.config.output_format in (OutputFormat.sysext, OutputFormat.confext):
return
- if not find_binary("systemd-sysusers", root=context.config.tools()):
+ if not context.config.find_binary("systemd-sysusers"):
logging.warning("systemd-sysusers is not installed, not generating system users")
return
with complete_step("Generating system users"):
run(["systemd-sysusers", "--root=/buildroot"],
- sandbox=context.sandbox(mounts=[Mount(context.root, "/buildroot")]))
+ sandbox=context.sandbox(binary="systemd-sysusers", mounts=[Mount(context.root, "/buildroot")]))
def run_tmpfiles(context: Context) -> None:
if context.config.overlay or context.config.output_format in (OutputFormat.sysext, OutputFormat.confext):
return
- if not find_binary("systemd-tmpfiles", root=context.config.tools()):
+ if not context.config.find_binary("systemd-tmpfiles"):
logging.warning("systemd-tmpfiles is not installed, not generating volatile files")
return
# systemd-tmpfiles service so we handle those as success as well.
success_exit_status=(0, 65, 73),
sandbox=context.sandbox(
+ binary="systemd-tmpfiles",
mounts=[
Mount(context.root, "/buildroot"),
# systemd uses acl.h to parse ACLs in tmpfiles snippets which uses the host's passwd so we have to
if context.config.overlay or context.config.output_format in (OutputFormat.sysext, OutputFormat.confext):
return
- if not find_binary("systemctl", root=context.config.tools()):
+ if not context.config.find_binary("systemctl"):
logging.warning("systemctl is not installed, not applying presets")
return
with complete_step("Applying presets…"):
run(["systemctl", "--root=/buildroot", "preset-all"],
- sandbox=context.sandbox(mounts=[Mount(context.root, "/buildroot")]))
+ sandbox=context.sandbox(binary="systemctl", mounts=[Mount(context.root, "/buildroot")]))
run(["systemctl", "--root=/buildroot", "--global", "preset-all"],
- sandbox=context.sandbox(mounts=[Mount(context.root, "/buildroot")]))
+ sandbox=context.sandbox(binary="systemctl", mounts=[Mount(context.root, "/buildroot")]))
def run_hwdb(context: Context) -> None:
if context.config.overlay or context.config.output_format in (OutputFormat.sysext, OutputFormat.confext):
return
- if not find_binary("systemd-hwdb", root=context.config.tools()):
+ if not context.config.find_binary("systemd-hwdb"):
logging.warning("systemd-hwdb is not installed, not generating hwdb")
return
with complete_step("Generating hardware database"):
run(["systemd-hwdb", "--root=/buildroot", "--usr", "--strict", "update"],
- sandbox=context.sandbox(mounts=[Mount(context.root, "/buildroot")]))
+ sandbox=context.sandbox(binary="systemd-hwdb", mounts=[Mount(context.root, "/buildroot")]))
# Remove any existing hwdb in /etc in favor of the one we just put in /usr.
(context.root / "etc/udev/hwdb.bin").unlink(missing_ok=True)
if context.config.overlay or context.config.output_format.is_extension_image():
return
- if not find_binary("systemd-firstboot", root=context.config.tools()):
+ if not context.config.find_binary("systemd-firstboot"):
logging.warning("systemd-firstboot is not installed, not applying first boot settings")
return
password, hashed = context.config.root_password or (None, False)
if password and not hashed:
- password = run(["openssl", "passwd", "-stdin", "-6"],
- sandbox=context.sandbox(), input=password, stdout=subprocess.PIPE).stdout.strip()
+ password = run(
+ ["openssl", "passwd", "-stdin", "-6"],
+ sandbox=context.sandbox(binary="openssl"),
+ input=password,
+ stdout=subprocess.PIPE,
+ ).stdout.strip()
settings = (
("--locale", "firstboot.locale", context.config.locale),
with complete_step("Applying first boot settings"):
run(["systemd-firstboot", "--root=/buildroot", "--force", *options],
- sandbox=context.sandbox(mounts=[Mount(context.root, "/buildroot")]))
+ sandbox=context.sandbox(binary="systemd-firstboot", mounts=[Mount(context.root, "/buildroot")]))
# Initrds generally don't ship with only /usr so there's not much point in putting the credentials in
# /usr/lib/credstore.
with complete_step(f"Relabeling files using {policy} policy"):
run(["setfiles", "-mFr", "/buildroot", "-c", binpolicy, fc, "/buildroot"],
- sandbox=context.sandbox(mounts=[Mount(context.root, "/buildroot")]),
+ sandbox=context.sandbox(binary="setfiles", mounts=[Mount(context.root, "/buildroot")]),
check=context.config.selinux_relabel == ConfigFeature.enabled)
final, build, manifest = cache_tree_paths(context.config)
with complete_step("Installing cache copies"):
- rmtree(final, tools=context.config.tools(), sandbox=context.sandbox)
+ rmtree(final, sandbox=context.sandbox)
# We only use the cache-overlay directory for caching if we have a base tree, otherwise we just
# cache the root directory.
move_tree(
context.workspace / "cache-overlay", final,
use_subvolumes=context.config.use_subvolumes,
- tools=context.config.tools(),
sandbox=context.sandbox,
)
else:
)
if need_build_overlay(context.config) and (context.workspace / "build-overlay").exists():
- rmtree(build, tools=context.config.tools(), sandbox=context.sandbox)
+ rmtree(build, sandbox=context.sandbox)
move_tree(
context.workspace / "build-overlay", build,
use_subvolumes=context.config.use_subvolumes,
- tools=context.config.tools(),
sandbox=context.sandbox,
)
logging.info("Cache manifest mismatch, not reusing cached images")
if ARG_DEBUG.get():
run(["diff", manifest, "-"], input=new, check=False,
- sandbox=config.sandbox(mounts=[Mount(manifest, manifest)]))
+ sandbox=config.sandbox(binary="diff", mounts=[Mount(manifest, manifest)]))
return False
else:
copy_tree(
final, context.root,
use_subvolumes=context.config.use_subvolumes,
- tools=context.config.tools(),
sandbox=context.sandbox,
)
stdout=subprocess.PIPE,
env=context.config.environment,
sandbox=context.sandbox(
+ binary="systemd-repart",
devices=(
not context.config.repart_offline or
context.config.verity_key_source.type != KeySource.Type.file
cmdline + ["--definitions", r],
env=env,
sandbox=context.sandbox(
+ binary="systemd-repart",
devices=(
not context.config.repart_offline or
context.config.verity_key_source.type != KeySource.Type.file
move_tree(
f, context.config.output_dir_or_cwd(),
use_subvolumes=context.config.use_subvolumes,
- tools=context.config.tools(),
sandbox=context.sandbox,
)
workspace = Path(tempfile.mkdtemp(dir=config.workspace_dir_or_default(), prefix="mkosi-workspace"))
# Discard setuid/setgid bits as these are inherited and can leak into the image.
workspace.chmod(stat.S_IMODE(workspace.stat().st_mode) & ~(stat.S_ISGID|stat.S_ISUID))
- stack.callback(lambda: rmtree(workspace, tools=config.tools(), sandbox=config.sandbox))
+ stack.callback(lambda: rmtree(workspace, sandbox=config.sandbox))
(workspace / "tmp").mkdir(mode=0o1777)
with scopedenv({"TMPDIR" : os.fspath(workspace / "tmp")}):
with umask(~0o755):
dst.mkdir(parents=True, exist_ok=True)
- def sandbox(*, mounts: Sequence[Mount] = ()) -> AbstractContextManager[list[PathString]]:
- return context.sandbox(mounts=[*mounts, *exclude])
+ def sandbox(
+ *,
+ binary: Optional[PathString],
+ mounts: Sequence[Mount] = (),
+ ) -> AbstractContextManager[list[PathString]]:
+ return context.sandbox(binary=binary, mounts=[*mounts, *exclude])
copy_tree(
src, dst,
- tools=context.config.tools(),
preserve=False,
sandbox=sandbox,
)
],
# Supply files via stdin so we don't clutter --debug run output too much
input="\n".join([str(root), *(os.fspath(p) for p in root.rglob("*") if p.is_dir())]),
- sandbox=config.sandbox(mounts=[Mount(root, root)]),
+ sandbox=config.sandbox(binary="setfacl", mounts=[Mount(root, root)]),
)
# getfacl complains about absolute paths so make sure we pass a relative one.
if root.exists():
- sandbox = config.sandbox(mounts=[Mount(root, root)], options=["--chdir", root])
+ sandbox = config.sandbox(binary="getfacl", mounts=[Mount(root, root)], options=["--chdir", root])
has_acl = f"user:{uid}:rwx" in run(["getfacl", "-n", "."], sandbox=sandbox, stdout=subprocess.PIPE).stdout
if not has_acl and not always:
],
stdin=sys.stdin,
env=config.environment,
- sandbox=config.sandbox(network=True, devices=True, mounts=[Mount(fname, fname)]),
+ sandbox=config.sandbox(
+ binary="systemd-repart",
+ network=True,
+ devices=True,
+ mounts=[Mount(fname, fname)],
+ ),
)
if config.output_format == OutputFormat.directory:
stdout=sys.stdout,
env=os.environ | config.environment,
log=False,
- sandbox=config.sandbox(devices=True, network=True, relaxed=True),
+ sandbox=config.sandbox(binary="systemd-nspawn", devices=True, network=True, relaxed=True),
)
):
die(f"Must be root to run the {args.verb} command")
- if (tool_path := find_binary(tool, root=config.tools())) is None:
+ if (tool_path := config.find_binary(tool)) is None:
die(f"Failed to find {tool}")
if config.ephemeral:
env=os.environ | config.environment,
log=False,
preexec_fn=become_root,
- sandbox=config.sandbox(network=True, devices=config.output_format == OutputFormat.disk, relaxed=True),
+ sandbox=config.sandbox(
+ binary=tool_path,
+ network=True,
+ devices=config.output_format == OutputFormat.disk,
+ relaxed=True,
+ ),
)
def run_serve(args: Args, config: Config) -> None:
"""Serve the output directory via a tiny HTTP server"""
- run([python_binary(config), "-m", "http.server", "8081"],
+ run(
+ [python_binary(config), "-m", "http.server", "8081"],
stdin=sys.stdin, stdout=sys.stdout,
- sandbox=config.sandbox(network=True, relaxed=True, options=["--chdir", config.output_dir_or_cwd()]))
+ sandbox=config.sandbox(
+ binary=python_binary(config),
+ network=True,
+ relaxed=True,
+ options=["--chdir", config.output_dir_or_cwd()],
+ ),
+ )
def generate_key_cert_pair(args: Args) -> None:
@contextlib.contextmanager
def prepend_to_environ_path(config: Config) -> Iterator[None]:
- if config.tools_tree or not config.extra_search_paths:
+ if not config.extra_search_paths:
yield
return
["/work/clean"],
env=env | config.environment,
sandbox=config.sandbox(
+ binary=None,
tools=False,
mounts=[
*sources,
],
stdout=f,
# Make sure tar uses user/group information from the root directory instead of the host.
- sandbox=sandbox(mounts=[Mount(src, src, ro=True), *finalize_passwd_mounts(src)]),
+ sandbox=sandbox(binary="tar", mounts=[Mount(src, src, ro=True), *finalize_passwd_mounts(src)]),
)
],
stdin=f,
sandbox=sandbox(
+ binary="tar",
# Make sure tar uses user/group information from the root directory instead of the host.
mounts=[Mount(src, src, ro=True), Mount(dst, dst), *finalize_passwd_mounts(dst)]
),
],
input="\0".join(os.fspath(f.relative_to(src)) for f in files),
stdout=f,
- sandbox=sandbox(mounts=[Mount(src, src, ro=True), *finalize_passwd_mounts(src)]),
+ sandbox=sandbox(binary="cpio", mounts=[Mount(src, src, ro=True), *finalize_passwd_mounts(src)]),
)
stdout=sys.stdout,
env=os.environ | config.environment,
log=False,
- sandbox=config.sandbox(devices=True, network=True, relaxed=True),
+ sandbox=config.sandbox(binary="systemd-repart", devices=True, network=True, relaxed=True),
)
j = cls._load_json(s)
return dataclasses.replace(cls.default(), **j)
+ def find_binary(self, *names: PathString, tools: bool = True) -> Optional[Path]:
+ return find_binary(*names, root=self.tools() if tools else Path("/"), extra=self.extra_search_paths)
+
def sandbox(
self,
*,
+ binary: Optional[PathString],
network: bool = False,
devices: bool = False,
relaxed: bool = False,
extra: Sequence[PathString] = (),
) -> AbstractContextManager[list[PathString]]:
mounts = [
- *[Mount(d, d, ro=True) for d in self.extra_search_paths if not relaxed and not self.tools_tree],
*([Mount(p, "/proxy.cacert", ro=True)] if (p := self.proxy_peer_certificate) else []),
*([Mount(p, "/proxy.clientcert", ro=True)] if (p := self.proxy_client_certificate) else []),
*([Mount(p, "/proxy.clientkey", ro=True)] if (p := self.proxy_client_key) else []),
*mounts,
]
+ if (
+ binary and
+ (path := self.find_binary(binary, tools=tools)) and
+ any(path.is_relative_to(d) for d in self.extra_search_paths)
+ ):
+ tools = False
+ mounts += [Mount(d, d, ro=True) for d in self.extra_search_paths if not relaxed]
+
return sandbox_cmd(
network=network,
devices=devices,
return None
policy = run(["sh", "-c", f". {selinux} && echo $SELINUXTYPE"],
- sandbox=config.sandbox(mounts=[Mount(selinux, selinux, ro=True)]),
+ sandbox=config.sandbox(binary="sh", mounts=[Mount(selinux, selinux, ro=True)]),
stdout=subprocess.PIPE).stdout.strip()
if not policy:
if fatal and config.selinux_relabel == ConfigFeature.enabled:
die("SELinux relabel is requested but no selinux policy is configured in /etc/selinux/config")
return None
- if not find_binary("setfiles", root=config.tools()):
+ if not config.find_binary("setfiles"):
if fatal and config.selinux_relabel == ConfigFeature.enabled:
die("SELinux relabel is requested but setfiles is not installed")
return None
run(
[tool, "--version"],
stdout=subprocess.PIPE,
- sandbox=config.sandbox()
+ sandbox=config.sandbox(binary=tool),
).stdout.split()[2].strip("()").removeprefix("v")
)
make_tree(
self.root,
use_subvolumes=self.config.use_subvolumes,
- tools=config.tools(),
sandbox=config.sandbox,
)
def sandbox(
self,
*,
+ binary: Optional[PathString],
network: bool = False,
devices: bool = False,
scripts: Optional[Path] = None,
]
return self.config.sandbox(
+ binary=binary,
network=network,
devices=devices,
scripts=scripts,
path = Path(deb)
with open(path, "rb") as i, tempfile.NamedTemporaryFile() as o:
- run(["dpkg-deb", "--fsys-tarfile", "/dev/stdin"], stdin=i, stdout=o, sandbox=context.sandbox())
+ run(
+ ["dpkg-deb", "--fsys-tarfile", "/dev/stdin"],
+ stdin=i,
+ stdout=o,
+ sandbox=context.sandbox(binary="dpkg-deb"),
+ )
extract_tar(Path(o.name), context.root, log=False, sandbox=context.sandbox)
# Finally, run apt to properly install packages in the chroot without having to worry that maintainer
from mkosi.installer.zypper import Zypper
from mkosi.log import die
from mkosi.mounts import finalize_crypto_mounts
-from mkosi.run import find_binary, run
+from mkosi.run import run
from mkosi.sandbox import Mount
from mkosi.util import listify, sort_packages
@classmethod
def package_manager(cls, config: Config) -> type[PackageManager]:
- if find_binary("zypper", root=config.tools()):
+ if config.find_binary("zypper"):
return Zypper
else:
return Dnf
@classmethod
def createrepo(cls, context: Context) -> None:
- if find_binary("zypper", root=context.config.tools()):
+ if context.config.find_binary("zypper"):
Zypper.createrepo(context)
else:
Dnf.createrepo(context)
@classmethod
def setup(cls, context: Context) -> None:
- zypper = find_binary("zypper", root=context.config.tools())
+ zypper = context.config.find_binary("zypper")
if zypper:
Zypper.setup(context, cls.repositories(context))
else:
@classmethod
def install_packages(cls, context: Context, packages: Sequence[str], apivfs: bool = True) -> None:
- if find_binary("zypper", root=context.config.tools()):
+ if context.config.find_binary("zypper"):
Zypper.invoke(
context,
"install",
@classmethod
def remove_packages(cls, context: Context, packages: Sequence[str]) -> None:
- if find_binary("zypper", root=context.config.tools()):
+ if context.config.find_binary("zypper"):
Zypper.invoke(context, "remove", ["--clean-deps", *sort_packages(packages)], apivfs=True)
else:
Dnf.invoke(context, "remove", packages, apivfs=True)
@classmethod
@listify
def repositories(cls, context: Context) -> Iterable[RpmRepository]:
- zypper = find_binary("zypper", root=context.config.tools())
+ zypper = context.config.find_binary("zypper")
release = context.config.release
if release == "leap":
f"{repourl}/repodata/repomd.xml",
],
sandbox=context.sandbox(
+ binary="curl",
network=True,
mounts=[Mount(d, d), *finalize_crypto_mounts(context.config)],
),
("dpkg", ["var/lib/dpkg"]),
(executable, [f"var/lib/{subdir}", f"var/cache/{subdir}"])):
if context.config.clean_package_metadata == ConfigFeature.enabled or not find_binary(tool, root=context.root):
- rmtree(*(context.root / p for p in paths if (context.root / p).exists()),
- tools=context.config.tools(), sandbox=context.sandbox)
+ rmtree(*(context.root / p for p in paths if (context.root / p).exists()), sandbox=context.sandbox)
from mkosi.installer import PackageManager
from mkosi.log import die
from mkosi.mounts import finalize_source_mounts
-from mkosi.run import find_binary, run
+from mkosi.run import run
from mkosi.sandbox import Mount, apivfs_cmd
from mkosi.types import _FILE, CompletedProcess, PathString
from mkosi.util import umask
"-o", "Dir::State=/var/lib/apt",
"-o", "Dir::Log=/var/log/apt",
"-o", "Dir::State::Status=/buildroot/var/lib/dpkg/status",
- "-o", f"Dir::Bin::DPkg={find_binary('dpkg', root=context.config.tools())}",
+ "-o", f"Dir::Bin::DPkg={context.config.find_binary('dpkg')}",
"-o", "Debug::NoLocking=true",
"-o", "DPkg::Options::=--root=/buildroot",
"-o", "DPkg::Options::=--force-unsafe-io",
cls.cmd(context, "apt-get") + [operation, *arguments],
sandbox=(
context.sandbox(
+ binary="apt-get",
network=True,
mounts=[Mount(context.root, "/buildroot"), *cls.mounts(context), *sources, *mounts],
options=["--dir", "/work/src", "--chdir", "/work/src"],
run(
["reprepro", "includedeb", "mkosi"] + [d.name for d in context.packages.glob("*.deb")],
sandbox=context.sandbox(
+ binary="reprepro",
mounts=[Mount(context.packages, context.packages)],
options=["--chdir", context.packages],
),
from mkosi.installer.rpm import RpmRepository, rpm_cmd
from mkosi.log import ARG_DEBUG
from mkosi.mounts import finalize_source_mounts
-from mkosi.run import find_binary, run
+from mkosi.run import run
from mkosi.sandbox import Mount, apivfs_cmd
from mkosi.types import _FILE, CompletedProcess, PathString
def executable(cls, config: Config) -> str:
# Allow the user to override autodetection with an environment variable
dnf = config.environment.get("MKOSI_DNF")
- root = config.tools()
-
- return Path(dnf or find_binary("dnf5", root=root) or find_binary("dnf", root=root) or "yum").name
+ return Path(dnf or config.find_binary("dnf5") or config.find_binary("dnf") or "yum").name
@classmethod
def subdir(cls, config: Config) -> Path:
cls.cmd(context) + [operation,*arguments],
sandbox=(
context.sandbox(
+ binary=cls.executable(context.config),
network=True,
mounts=[Mount(context.root, "/buildroot"), *cls.mounts(context), *sources],
options=["--dir", "/work/src", "--chdir", "/work/src"],
@classmethod
def createrepo(cls, context: Context) -> None:
run(["createrepo_c", context.packages],
- sandbox=context.sandbox(mounts=[Mount(context.packages, context.packages)]))
+ sandbox=context.sandbox(binary="createrepo_c", mounts=[Mount(context.packages, context.packages)]))
(context.pkgmngr / "etc/yum.repos.d/mkosi-local.repo").write_text(
textwrap.dedent(
cls.cmd(context) + [operation, *arguments],
sandbox=(
context.sandbox(
+ binary="pacman",
network=True,
mounts=[Mount(context.root, "/buildroot"), *cls.mounts(context), *sources],
options=["--dir", "/work/src", "--chdir", "/work/src"],
context.packages / "mkosi.db.tar",
*sorted(context.packages.glob("*.pkg.tar*"), key=lambda p: GenericVersion(Path(p).name))
],
- sandbox=context.sandbox(mounts=[Mount(context.packages, context.packages)]),
+ sandbox=context.sandbox(binary="repo-add", mounts=[Mount(context.packages, context.packages)]),
)
(context.pkgmngr / "etc/mkosi-local.conf").write_text(
(confdir / "macros.dbpath").write_text(f"%_dbpath {dbpath}")
plugindir = Path(run(["rpm", "--eval", "%{__plugindir}"],
- sandbox=context.sandbox(), stdout=subprocess.PIPE).stdout.strip())
+ sandbox=context.sandbox(binary="rpm"), stdout=subprocess.PIPE).stdout.strip())
if (plugindir := context.config.tools() / plugindir.relative_to("/")).exists():
with (confdir / "macros.disable-plugins").open("w") as f:
for plugin in plugindir.iterdir():
cls.cmd(context) + [operation, *arguments],
sandbox=(
context.sandbox(
+ binary="zypper",
network=True,
mounts=[Mount(context.root, "/buildroot"), *cls.mounts(context), *sources],
options=["--dir", "/work/src", "--chdir", "/work/src"],
@classmethod
def createrepo(cls, context: Context) -> None:
run(["createrepo_c", context.packages],
- sandbox=context.sandbox(mounts=[Mount(context.packages, context.packages)]))
+ sandbox=context.sandbox(binary="createrepo_c", mounts=[Mount(context.packages, context.packages)]))
(context.pkgmngr / "etc/zypp/repos.d/mkosi-local.repo").write_text(
textwrap.dedent(
info += run(
["modinfo", "--basedir", "/buildroot", "--set-version", kver, "--null", *chunk],
stdout=subprocess.PIPE,
- sandbox=sandbox(mounts=[Mount(root, "/buildroot", ro=True)]),
+ sandbox=sandbox(binary="modinfo", mounts=[Mount(root, "/buildroot", ro=True)]),
).stdout.strip()
log_step("Calculating required kernel modules and firmware")
"--queryformat", r"%{NEVRA}\t%{SOURCERPM}\t%{NAME}\t%{ARCH}\t%{LONGSIZE}\t%{INSTALLTIME}\n",
],
stdout=subprocess.PIPE,
- sandbox=self.context.sandbox(mounts=[Mount(self.context.root, "/buildroot")]),
+ sandbox=self.context.sandbox(binary="rpm", mounts=[Mount(self.context.root, "/buildroot")]),
)
packages = sorted(c.stdout.splitlines())
],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
- sandbox=self.context.sandbox(mounts=[Mount(self.context.root, "/buildroot", ro=True)]),
+ sandbox=self.context.sandbox(
+ binary="rpm",
+ mounts=[Mount(self.context.root, "/buildroot", ro=True)]
+ ),
)
changelog = c.stdout.strip()
source = SourcePackageManifest(srpm, changelog)
r'${Package}\t${source:Package}\t${Version}\t${Architecture}\t${Installed-Size}\t${db-fsys:Last-Modified}\n',
],
stdout=subprocess.PIPE,
- sandbox=self.context.sandbox(mounts=[Mount(self.context.root, "/buildroot", ro=True)]),
+ sandbox=self.context.sandbox(
+ binary="dpkg-query",
+ mounts=[Mount(self.context.root, "/buildroot", ro=True)],
+ ),
)
packages = sorted(c.stdout.splitlines())
["systemd-repart", "--json=short", image],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
- sandbox=sandbox(mounts=[Mount(image, image, ro=True)]),
+ sandbox=sandbox(binary="systemd-repart", mounts=[Mount(image, image, ro=True)]),
).stdout
)
return [Partition.from_dict(d) for d in output]
import sys
import tempfile
import uuid
-from collections.abc import Iterator
+from collections.abc import Iterator, Sequence
from pathlib import Path
from typing import NamedTuple, Optional
@classmethod
def identify(cls, config: Config, path: Path) -> "KernelType":
- if not find_binary("bootctl", root=config.tools()):
+ if not config.find_binary("bootctl"):
logging.warning("bootctl is not installed, assuming 'unknown' kernel type")
return KernelType.unknown
type = run(
["bootctl", "kernel-identify", path],
stdout=subprocess.PIPE,
- sandbox=config.sandbox(mounts=[Mount(path, path, ro=True)]),
+ sandbox=config.sandbox(binary="bootctl", mounts=[Mount(path, path, ro=True)]),
).stdout.strip()
try:
binaries = [f"qemu-system-{config.architecture.to_qemu()}"]
binaries += ["qemu", "qemu-kvm"] if config.architecture.is_native() else []
for binary in binaries:
- if find_binary(binary, root=config.tools()) is not None:
+ if config.find_binary(binary) is not None:
return binary
die("Couldn't find QEMU/KVM binary")
with tempfile.TemporaryDirectory(prefix="mkosi-swtpm") as state:
# swtpm_setup is noisy and doesn't have a --quiet option so we pipe it's stdout to /dev/null.
run(["swtpm_setup", "--tpm-state", state, "--tpm2", "--pcr-banks", "sha256", "--config", "/dev/null"],
- sandbox=config.sandbox(mounts=[Mount(state, state)]),
+ sandbox=config.sandbox(binary="swtpm_setup", mounts=[Mount(state, state)]),
stdout=None if ARG_DEBUG.get() else subprocess.DEVNULL)
cmdline = ["swtpm", "socket", "--tpm2", "--tpmstate", f"dir={state}"]
with spawn(
cmdline,
pass_fds=(sock.fileno(),),
- sandbox=config.sandbox(mounts=[Mount(state, state)]),
+ sandbox=config.sandbox(binary="swtpm", mounts=[Mount(state, state)]),
) as (proc, innerpid):
allocate_scope(
config,
kill(proc, innerpid, signal.SIGTERM)
-def find_virtiofsd(*, tools: Path = Path("/")) -> Optional[Path]:
- if p := find_binary("virtiofsd", root=tools):
+def find_virtiofsd(*, root: Path = Path("/"), extra: Sequence[Path] = ()) -> Optional[Path]:
+ if p := find_binary("virtiofsd", root=root, extra=extra):
return p
- if (p := tools / "usr/libexec/virtiofsd").exists():
- return Path("/") / p.relative_to(tools)
+ if (p := root / "usr/libexec/virtiofsd").exists():
+ return Path("/") / p.relative_to(root)
- if (p := tools / "usr/lib/virtiofsd").exists():
- return Path("/") / p.relative_to(tools)
+ if (p := root / "usr/lib/virtiofsd").exists():
+ return Path("/") / p.relative_to(root)
return None
def start_virtiofsd(config: Config, directory: PathString, *, name: str, selinux: bool = False) -> Iterator[Path]:
uidmap = Path(directory).stat().st_uid == INVOKING_USER.uid
- virtiofsd = find_virtiofsd(tools=config.tools())
+ virtiofsd = find_virtiofsd(root=config.tools(), extra=config.extra_search_paths)
if virtiofsd is None:
die("virtiofsd must be installed to boot directory images or use RuntimeTrees= with mkosi qemu")
group=INVOKING_USER.gid if uidmap else None,
preexec_fn=become_root if not uidmap else None,
sandbox=config.sandbox(
+ binary=virtiofsd,
mounts=[Mount(directory, directory)],
options=["--uid", "0", "--gid", "0", "--cap-add", "all"],
),
def start_journal_remote(config: Config, sockfd: int) -> Iterator[None]:
assert config.forward_journal
- bin = find_binary("systemd-journal-remote", "/usr/lib/systemd/systemd-journal-remote", root=config.tools())
+ bin = config.find_binary("systemd-journal-remote", "/usr/lib/systemd/systemd-journal-remote")
if not bin:
die("systemd-journal-remote must be installed to forward logs from the virtual machine")
"--split-mode", "none" if config.forward_journal.suffix == ".journal" else "host",
],
pass_fds=(sockfd,),
- sandbox=config.sandbox(mounts=[Mount(config.forward_journal.parent, config.forward_journal.parent)]),
+ sandbox=config.sandbox(
+ binary=bin,
+ mounts=[Mount(config.forward_journal.parent, config.forward_journal.parent)],
+ ),
user=config.forward_journal.parent.stat().st_uid if INVOKING_USER.invoked_as_root else None,
group=config.forward_journal.parent.stat().st_gid if INVOKING_USER.invoked_as_root else None,
# If all logs go into a single file, disable compact mode to allow for journal files exceeding 4G.
src, tmp,
preserve=config.output_format == OutputFormat.directory,
use_subvolumes=config.use_subvolumes,
- tools=config.tools(),
sandbox=config.sandbox,
)
if config.output_format == OutputFormat.directory:
become_root()
- rmtree(tmp, tools=config.tools(), sandbox=config.sandbox)
+ rmtree(tmp, sandbox=config.sandbox)
fork_and_wait(rm)
def qemu_version(config: Config) -> GenericVersion:
- return GenericVersion(run([find_qemu_binary(config), "--version"],
- stdout=subprocess.PIPE, sandbox=config.sandbox()).stdout.split()[3])
+ binary = find_qemu_binary(config)
+ return GenericVersion(
+ run(
+ [binary, "--version"],
+ stdout=subprocess.PIPE,
+ sandbox=config.sandbox(binary=binary),
+ ).stdout.split()[3]
+ )
def want_scratch(config: Config) -> bool:
return config.runtime_scratch == ConfigFeature.enabled or (
config.runtime_scratch == ConfigFeature.auto and
- find_binary(f"mkfs.{config.distribution.filesystem()}", root=config.tools()) is not None
+ config.find_binary(f"mkfs.{config.distribution.filesystem()}") is not None
)
run(
[f"mkfs.{fs}", "-L", "scratch", *extra.split(), scratch.name],
stdout=subprocess.DEVNULL,
- sandbox=config.sandbox(mounts=[Mount(scratch.name, scratch.name)]),
+ sandbox=config.sandbox(binary= f"mkfs.{fs}", mounts=[Mount(scratch.name, scratch.name)]),
)
yield Path(scratch.name)
"--loglevel", "WARNING",
],
sandbox=config.sandbox(
+ binary="virt-fw-vars",
mounts=[
Mount(ovmf_vars.name, ovmf_vars.name),
Mount(config.secure_boot_certificate, config.secure_boot_certificate, ro=True),
"--offline=yes",
image,
],
- sandbox=config.sandbox(mounts=[Mount(image, image)]),
+ sandbox=config.sandbox(binary="systemd-repart", mounts=[Mount(image, image)]),
)
],
foreground=False,
env=os.environ | config.environment,
- sandbox=config.sandbox(relaxed=True),
+ sandbox=config.sandbox(binary="busctl", relaxed=True),
)
],
foreground=False,
env=os.environ | config.environment,
- sandbox=config.sandbox(relaxed=True),
+ sandbox=config.sandbox(binary="busctl", relaxed=True),
# systemd-machined might not be installed so let's ignore any failures unless running in debug mode.
check=ARG_DEBUG.get(),
stderr=None if ARG_DEBUG.get() else subprocess.DEVNULL,
"--copy-from", src,
fname,
],
- sandbox=config.sandbox(mounts=[Mount(fname.parent, fname.parent), Mount(src, src, ro=True)]),
+ sandbox=config.sandbox(
+ binary="systemd-repart",
+ mounts=[Mount(fname.parent, fname.parent), Mount(src, src, ro=True)],
+ ),
)
stack.callback(lambda: fname.unlink())
else:
if (
firmware.is_uefi() and
config.qemu_swtpm != ConfigFeature.disabled and
- find_binary("swtpm", root=config.tools()) is not None
+ config.find_binary("swtpm") is not None
):
sock = stack.enter_context(start_swtpm(config))
cmdline += ["-chardev", f"socket,id=chrtpm,path={sock}",
env=os.environ | config.environment,
log=False,
foreground=True,
- sandbox=config.sandbox(network=True, devices=True, relaxed=True),
+ sandbox=config.sandbox(binary=cmdline[0], network=True, devices=True, relaxed=True),
) as (proc, innerpid):
# We have to close these before we wait for qemu otherwise we'll deadlock as qemu will never exit.
for fd in qemu_device_fds.values():
stdout=sys.stdout,
env=os.environ | config.environment,
log=False,
- sandbox=config.sandbox(network=True, devices=True, relaxed=True),
+ sandbox=config.sandbox(binary="ssh", network=True, devices=True, relaxed=True),
)
version is installed on the host system. If this option is not used,
but the `mkosi.tools/` directory is found in the local directory it is
automatically used for this purpose with the root directory as target.
- Note that when looking up binaries in `--tools-tree=`, only `/usr/bin`
- and `/usr/sbin` are considered. Specifically, paths specified by
- `--extra-search-path=` are ignored when looking up binaries in the
- given tools tree.
+
+: Note if a binary is found in any of the paths configured with
+ `ExtraSearchPaths=`, the binary will be executed on the host.
: If set to `default`, mkosi will automatically add an extra tools tree
image and use it as the tools tree. The following table shows for
import contextlib
import errno
import fcntl
+import itertools
import logging
import os
import queue
make_foreground_process(new_process_group=False)
-def find_binary(*names: PathString, root: Path = Path("/")) -> Optional[Path]:
+def find_binary(*names: PathString, root: Path = Path("/"), extra: Sequence[Path] = ()) -> Optional[Path]:
if root != Path("/"):
- path = ":".join(os.fspath(p) for p in (root / "usr/bin", root / "usr/sbin"))
+ path = ":".join(
+ itertools.chain(
+ (os.fspath(p) for p in extra),
+ (os.fspath(p) for p in (root / "usr/bin", root / "usr/sbin")),
+ )
+ )
else:
path = os.environ["PATH"]
for name in names:
- if Path(name).is_absolute():
+ if any(Path(name).is_relative_to(d) for d in extra):
+ pass
+ elif Path(name).is_absolute():
name = root / Path(name).relative_to("/")
elif "/" in str(name):
name = root / name
class SandboxProtocol(Protocol):
- def __call__(self, *, mounts: Sequence[Mount] = ()) -> AbstractContextManager[list[PathString]]: ...
+ def __call__(
+ self,
+ *,
+ binary: Optional[PathString],
+ mounts: Sequence[Mount] = ()
+ ) -> AbstractContextManager[list[PathString]]: ...
-def nosandbox(*, mounts: Sequence[Mount] = ()) -> AbstractContextManager[list[PathString]]:
+def nosandbox(
+ *,
+ binary: Optional[PathString],
+ mounts: Sequence[Mount] = (),
+) -> AbstractContextManager[list[PathString]]:
return contextlib.nullcontext([])
from mkosi.config import ConfigFeature
from mkosi.log import ARG_DEBUG, die
-from mkosi.run import find_binary, run
+from mkosi.run import run
from mkosi.sandbox import Mount, SandboxProtocol, nosandbox
from mkosi.types import PathString
from mkosi.versioncomp import GenericVersion
def statfs(path: Path, *, sandbox: SandboxProtocol = nosandbox) -> str:
- return run(["stat", "--file-system", "--format", "%T", path],
- sandbox=sandbox(mounts=[Mount(path, path, ro=True)]), stdout=subprocess.PIPE).stdout.strip()
+ return run(
+ ["stat", "--file-system", "--format", "%T", path],
+ stdout=subprocess.PIPE,
+ sandbox=sandbox(binary="stat", mounts=[Mount(path, path, ro=True)]),
+ ).stdout.strip()
def is_subvolume(path: Path, *, sandbox: SandboxProtocol = nosandbox) -> bool:
def cp_version(*, sandbox: SandboxProtocol = nosandbox) -> GenericVersion:
return GenericVersion(
- run(["cp", "--version"], sandbox=sandbox(), stdout=subprocess.PIPE).stdout.splitlines()[0].split()[3]
+ run(
+ ["cp", "--version"],
+ sandbox=sandbox(binary="cp"),
+ stdout=subprocess.PIPE,
+ ).stdout.splitlines()[0].split()[3]
)
path: Path,
*,
use_subvolumes: ConfigFeature = ConfigFeature.disabled,
- tools: Path = Path("/"),
sandbox: SandboxProtocol = nosandbox,
) -> Path:
- if use_subvolumes == ConfigFeature.enabled and not find_binary("btrfs", root=tools):
- die("Subvolumes requested but the btrfs command was not found")
-
if statfs(path.parent, sandbox=sandbox) != "btrfs":
if use_subvolumes == ConfigFeature.enabled:
die(f"Subvolumes requested but {path} is not located on a btrfs filesystem")
path.mkdir()
return path
- if use_subvolumes != ConfigFeature.disabled and find_binary("btrfs", root=tools) is not None:
+ if use_subvolumes != ConfigFeature.disabled:
result = run(["btrfs", "subvolume", "create", path],
- sandbox=sandbox(mounts=[Mount(path.parent, path.parent)]),
+ sandbox=sandbox(binary="btrfs", mounts=[Mount(path.parent, path.parent)]),
check=use_subvolumes == ConfigFeature.enabled).returncode
else:
result = 1
preserve: bool = True,
dereference: bool = False,
use_subvolumes: ConfigFeature = ConfigFeature.disabled,
- tools: Path = Path("/"),
sandbox: SandboxProtocol = nosandbox,
) -> Path:
- subvolume = (use_subvolumes == ConfigFeature.enabled or
- use_subvolumes == ConfigFeature.auto and find_binary("btrfs", root=tools) is not None)
-
- if use_subvolumes == ConfigFeature.enabled and not find_binary("btrfs", root=tools):
- die("Subvolumes requested but the btrfs command was not found")
-
copy: list[PathString] = [
"cp",
"--recursive",
# Subvolumes always have inode 256 so we can use that to check if a directory is a subvolume.
if (
- not subvolume or
+ use_subvolumes == ConfigFeature.disabled or
not preserve or
not is_subvolume(src, sandbox=sandbox) or
- not find_binary("btrfs", root=tools) or
(dst.exists() and any(dst.iterdir()))
):
with (
if not preserve
else contextlib.nullcontext()
):
- run(copy, sandbox=sandbox(mounts=mounts))
+ run(copy, sandbox=sandbox(binary="cp", mounts=mounts))
return dst
# btrfs can't snapshot to an existing directory so make sure the destination does not exist.
if dst.exists():
dst.rmdir()
- result = run(["btrfs", "subvolume", "snapshot", src, dst],
- check=use_subvolumes == ConfigFeature.enabled, sandbox=sandbox(mounts=mounts)).returncode
+ result = run(
+ ["btrfs", "subvolume", "snapshot", src, dst],
+ check=use_subvolumes == ConfigFeature.enabled,
+ sandbox=sandbox(binary="btrfs", mounts=mounts),
+ ).returncode
+
if result != 0:
with (
preserve_target_directories_stat(src, dst)
if not preserve
else contextlib.nullcontext()
):
- run(copy, sandbox=sandbox(mounts=mounts))
+ run(copy, sandbox=sandbox(binary="cp", mounts=mounts))
return dst
-def rmtree(*paths: Path, tools: Path = Path("/"), sandbox: SandboxProtocol = nosandbox) -> None:
+def rmtree(*paths: Path, sandbox: SandboxProtocol = nosandbox) -> None:
if not paths:
return
- if (
- find_binary("btrfs", root=tools) and
- (subvolumes := sorted({p for p in paths if is_subvolume(p, sandbox=sandbox)}))
- ):
+ if subvolumes := sorted({p for p in paths if is_subvolume(p, sandbox=sandbox)}):
# Silence and ignore failures since when not running as root, this will fail with a permission error unless the
# btrfs filesystem is mounted with user_subvol_rm_allowed.
run(["btrfs", "subvolume", "delete", *subvolumes],
check=False,
- sandbox=sandbox(mounts=[Mount(p.parent, p.parent) for p in subvolumes]),
+ sandbox=sandbox(binary="btrfs", mounts=[Mount(p.parent, p.parent) for p in subvolumes]),
stdout=subprocess.DEVNULL if not ARG_DEBUG.get() else None,
stderr=subprocess.DEVNULL if not ARG_DEBUG.get() else None)
filtered = sorted({p for p in paths if p.exists()})
if filtered:
run(["rm", "-rf", "--", *filtered],
- sandbox=sandbox(mounts=[Mount(p.parent, p.parent) for p in filtered]))
+ sandbox=sandbox(binary="rm", mounts=[Mount(p.parent, p.parent) for p in filtered]))
def move_tree(
dst: Path,
*,
use_subvolumes: ConfigFeature = ConfigFeature.disabled,
- tools: Path = Path("/"),
sandbox: SandboxProtocol = nosandbox
) -> Path:
if src == dst:
logging.info(
f"Could not rename {src} to {dst} as they are located on different devices, falling back to copying"
)
- copy_tree(src, dst, use_subvolumes=use_subvolumes, tools=tools, sandbox=sandbox)
- rmtree(src, tools=tools, sandbox=sandbox)
+ copy_tree(src, dst, use_subvolumes=use_subvolumes, sandbox=sandbox)
+ rmtree(src, sandbox=sandbox)
return dst