import textwrap
import uuid
from collections.abc import Iterator, Mapping, Sequence
+from contextlib import AbstractContextManager
from pathlib import Path
from typing import Optional, Union, cast
def finalize_host_scripts(
context: Context,
helpers: Mapping[str, Sequence[PathString]] = {},
-) -> contextlib.AbstractContextManager[Path]:
+) -> AbstractContextManager[Path]:
scripts: dict[str, Sequence[PathString]] = {}
for binary in ("useradd", "groupadd"):
if find_binary(binary, root=context.config.tools()):
],
options=["--dir", "/work/src", "--chdir", "/work/src"],
scripts=hd,
- ) + (chroot if script.suffix == ".chroot" else []),
+ extra=chroot if script.suffix == ".chroot" else [],
+ )
)
],
options=["--dir", "/work/src", "--chdir", "/work/src"],
scripts=hd,
- ) + (chroot if script.suffix == ".chroot" else []),
+ extra=chroot if script.suffix == ".chroot" else [],
+ ),
)
if context.want_local_repo() and context.config.output_format != OutputFormat.none:
],
options=["--dir", "/work/src", "--chdir", "/work/src"],
scripts=hd,
- ) + (chroot if script.suffix == ".chroot" else []),
+ extra=chroot if script.suffix == ".chroot" else [],
+ ),
)
],
options=["--dir", "/work/src", "--chdir", "/work/src"],
scripts=hd,
- ) + (chroot if script.suffix == ".chroot" else []),
+ extra=chroot if script.suffix == ".chroot" else [],
+ ),
)
Mount(context.staging, context.staging),
Mount(mountinfo.name, mountinfo.name),
],
- ) + ["sh", "-c", f"mount --bind {mountinfo.name} /proc/$$/mountinfo && exec $0 \"$@\""],
+ extra=["sh", "-c", f"mount --bind {mountinfo.name} /proc/$$/mountinfo && exec $0 \"$@\""],
+ ),
)
sandbox=context.sandbox(
mounts=mounts,
options=options,
- ) + ["setpriv", f"--reuid={INVOKING_USER.uid}", f"--regid={INVOKING_USER.gid}", "--clear-groups"]
+ extra=["setpriv", f"--reuid={INVOKING_USER.uid}", f"--regid={INVOKING_USER.gid}", "--clear-groups"],
+ )
)
with umask(~0o755):
dst.mkdir(parents=True, exist_ok=True)
- def sandbox(*, mounts: Sequence[Mount] = ()) -> list[PathString]:
+ def sandbox(*, mounts: Sequence[Mount] = ()) -> AbstractContextManager[list[PathString]]:
return context.sandbox(mounts=[*mounts, *exclude])
copy_tree(
import typing
import uuid
from collections.abc import Collection, Iterable, Iterator, Sequence
+from contextlib import AbstractContextManager
from pathlib import Path
from typing import Any, Callable, Optional, TypeVar, Union, cast
scripts: Optional[Path] = None,
mounts: Sequence[Mount] = (),
options: Sequence[PathString] = (),
- ) -> list[PathString]:
+ extra: Sequence[PathString] = (),
+ ) -> AbstractContextManager[list[PathString]]:
mounts = [
*[Mount(d, d, ro=True) for d in self.extra_search_paths if not relaxed and not self.tools_tree],
*([Mount(p, "/proxy.cacert", ro=True)] if (p := self.proxy_peer_certificate) else []),
tools=tools or self.tools(),
mounts=mounts,
options=options,
+ extra=extra,
)
# SPDX-License-Identifier: LGPL-2.1+
from collections.abc import Sequence
+from contextlib import AbstractContextManager
from pathlib import Path
from typing import Optional
scripts: Optional[Path] = None,
mounts: Sequence[Mount] = (),
options: Sequence[PathString] = (),
- ) -> list[PathString]:
+ extra: Sequence[PathString] = (),
+ ) -> AbstractContextManager[list[PathString]]:
+ if (self.pkgmngr / "usr").exists():
+ extra = [
+ "sh",
+ "-c",
+ f"mount -t overlay -o lowerdir={self.pkgmngr / 'usr'}:/usr overlayfs /usr && exec $0 \"$@\"",
+ *extra,
+ ]
+
return self.config.sandbox(
network=network,
devices=devices,
"--cap-add", "ALL",
*options,
],
- ) + (
- [
- "sh",
- "-c",
- f"mount -t overlay -o lowerdir={self.pkgmngr / 'usr'}:/usr overlayfs /usr && exec $0 \"$@\"",
- ] if (self.pkgmngr / "usr").exists() else []
+ extra=extra,
)
-
def want_local_repo(self) -> bool:
return any(self.packages.iterdir())
network=True,
mounts=[Mount(context.root, "/buildroot"), *cls.mounts(context), *sources, *mounts],
options=["--dir", "/work/src", "--chdir", "/work/src"],
- ) + (apivfs_cmd() if apivfs else [])
+ extra=apivfs_cmd() if apivfs else []
+ )
),
env=context.config.environment | cls.finalize_environment(context),
stdout=stdout,
network=True,
mounts=[Mount(context.root, "/buildroot"), *cls.mounts(context), *sources],
options=["--dir", "/work/src", "--chdir", "/work/src"],
- ) + (apivfs_cmd() if apivfs else [])
+ extra=apivfs_cmd() if apivfs else [],
+ )
),
env=context.config.environment | cls.finalize_environment(context),
stdout=stdout,
network=True,
mounts=[Mount(context.root, "/buildroot"), *cls.mounts(context), *sources],
options=["--dir", "/work/src", "--chdir", "/work/src"],
- ) + (apivfs_cmd() if apivfs else [])
+ extra=apivfs_cmd() if apivfs else [],
+ )
),
env=context.config.environment | cls.finalize_environment(context),
stdout=stdout,
network=True,
mounts=[Mount(context.root, "/buildroot"), *cls.mounts(context), *sources],
options=["--dir", "/work/src", "--chdir", "/work/src"],
- ) + (apivfs_cmd() if apivfs else [])
+ extra=apivfs_cmd() if apivfs else [],
+ )
),
env=context.config.environment | cls.finalize_environment(context),
stdout=stdout,
import sys
import threading
from collections.abc import Awaitable, Collection, Iterator, Mapping, Sequence
+from contextlib import AbstractContextManager
from pathlib import Path
from types import TracebackType
from typing import Any, Callable, NoReturn, Optional
foreground: bool = True,
preexec_fn: Optional[Callable[[], None]] = None,
success_exit_status: Sequence[int] = (0,),
- sandbox: Sequence[PathString] = (),
+ sandbox: AbstractContextManager[Sequence[PathString]] = contextlib.nullcontext([]),
) -> CompletedProcess:
if input is not None:
assert stdin is None # stdin and input cannot be specified together
foreground: bool = False,
preexec_fn: Optional[Callable[[], None]] = None,
success_exit_status: Sequence[int] = (0,),
- sandbox: Sequence[PathString] = (),
+ sandbox: AbstractContextManager[Sequence[PathString]] = contextlib.nullcontext([]),
) -> Iterator[Popen]:
assert sorted(set(pass_fds)) == list(pass_fds)
- sandbox = [os.fspath(x) for x in sandbox]
cmdline = [os.fspath(x) for x in cmdline]
if ARG_DEBUG.get():
# expect it to pick.
assert nfd == SD_LISTEN_FDS_START + i
- # First, check if the sandbox works at all before executing the command.
- if sandbox and (rc := subprocess.run(sandbox + ["true"]).returncode) != 0:
- log_process_failure(sandbox, cmdline, rc)
- raise subprocess.CalledProcessError(rc, sandbox + cmdline)
+ with sandbox as sbx:
+ prefix = [os.fspath(x) for x in sbx]
- if subprocess.run(sandbox + ["sh", "-c", f"command -v {cmdline[0]}"], stdout=subprocess.DEVNULL).returncode != 0:
- die(f"{cmdline[0]} not found.", hint=f"Is {cmdline[0]} installed on the host system?")
+ # First, check if the sandbox works at all before executing the command.
+ if prefix and (rc := subprocess.run(prefix + ["true"]).returncode) != 0:
+ log_process_failure(prefix, cmdline, rc)
+ raise subprocess.CalledProcessError(rc, prefix + cmdline)
- if (
- foreground and
- sandbox and
- subprocess.run(sandbox + ["sh", "-c", "command -v setpgid"], stdout=subprocess.DEVNULL).returncode == 0
- ):
- sandbox += ["setpgid", "--foreground", "--"]
+ if subprocess.run(
+ prefix + ["sh", "-c", f"command -v {cmdline[0]}"],
+ stdout=subprocess.DEVNULL,
+ ).returncode != 0:
+ die(f"{cmdline[0]} not found.", hint=f"Is {cmdline[0]} installed on the host system?")
- if pass_fds:
- # We don't know the PID before we start the process and we can't modify the environment in preexec_fn so we
- # have to spawn a temporary shell to set the necessary environment variables before spawning the actual
- # command.
- sandbox += ["sh", "-c", f"LISTEN_FDS={len(pass_fds)} LISTEN_PID=$$ exec $0 \"$@\""]
+ if (
+ foreground and
+ prefix and
+ subprocess.run(prefix + ["sh", "-c", "command -v setpgid"], stdout=subprocess.DEVNULL).returncode == 0
+ ):
+ prefix += ["setpgid", "--foreground", "--"]
- try:
- with subprocess.Popen(
- sandbox + cmdline,
- stdin=stdin,
- stdout=stdout,
- stderr=stderr,
- text=True,
- user=user,
- group=group,
- # pass_fds only comes into effect after python has invoked the preexec function, so we make sure that
- # pass_fds contains the file descriptors to keep open after we've done our transformation in preexec().
- pass_fds=[SD_LISTEN_FDS_START + i for i in range(len(pass_fds))],
- env=env,
- cwd=cwd,
- preexec_fn=preexec,
- ) as proc:
- try:
- yield proc
- except BaseException:
- proc.terminate()
- raise
- finally:
- returncode = proc.wait()
-
- if check and returncode not in success_exit_status:
- if log:
- log_process_failure(sandbox, cmdline, returncode)
- if ARG_DEBUG_SHELL.get():
- subprocess.run(
- [*sandbox, "bash"],
- check=False,
- stdin=sys.stdin,
- text=True,
- user=user,
- group=group,
- env=env,
- cwd=cwd,
- preexec_fn=preexec,
- )
- raise subprocess.CalledProcessError(returncode, cmdline)
- except FileNotFoundError as e:
- die(f"{e.filename} not found.")
- finally:
- if foreground:
- make_foreground_process(new_process_group=False)
+ if pass_fds:
+ # We don't know the PID before we start the process and we can't modify the environment in preexec_fn so we
+ # have to spawn a temporary shell to set the necessary environment variables before spawning the actual
+ # command.
+ prefix += ["sh", "-c", f"LISTEN_FDS={len(pass_fds)} LISTEN_PID=$$ exec $0 \"$@\""]
+
+ try:
+ with subprocess.Popen(
+ prefix + cmdline,
+ stdin=stdin,
+ stdout=stdout,
+ stderr=stderr,
+ text=True,
+ user=user,
+ group=group,
+ # pass_fds only comes into effect after python has invoked the preexec function, so we make sure that
+ # pass_fds contains the file descriptors to keep open after we've done our transformation in preexec().
+ pass_fds=[SD_LISTEN_FDS_START + i for i in range(len(pass_fds))],
+ env=env,
+ cwd=cwd,
+ preexec_fn=preexec,
+ ) as proc:
+ try:
+ yield proc
+ except BaseException:
+ proc.terminate()
+ raise
+ finally:
+ returncode = proc.wait()
+
+ if check and returncode not in success_exit_status:
+ if log:
+ log_process_failure(prefix, cmdline, returncode)
+ if ARG_DEBUG_SHELL.get():
+ subprocess.run(
+ [*prefix, "bash"],
+ check=False,
+ stdin=sys.stdin,
+ text=True,
+ user=user,
+ group=group,
+ env=env,
+ cwd=cwd,
+ preexec_fn=preexec,
+ )
+ raise subprocess.CalledProcessError(returncode, cmdline)
+ except FileNotFoundError as e:
+ die(f"{e.filename} not found.")
+ finally:
+ if foreground:
+ make_foreground_process(new_process_group=False)
def find_binary(*names: PathString, root: Path = Path("/")) -> Optional[Path]:
# SPDX-License-Identifier: LGPL-2.1+
+import contextlib
import enum
import logging
import os
+import shutil
import uuid
-from collections.abc import Sequence
+from collections.abc import Iterator, Sequence
+from contextlib import AbstractContextManager
from pathlib import Path
from typing import NamedTuple, Optional, Protocol
class SandboxProtocol(Protocol):
- def __call__(self, *, mounts: Sequence[Mount] = ()) -> list[PathString]: ...
+ def __call__(self, *, mounts: Sequence[Mount] = ()) -> AbstractContextManager[list[PathString]]: ...
-def nosandbox(*, mounts: Sequence[Mount] = ()) -> list[PathString]:
- return []
+def nosandbox(*, mounts: Sequence[Mount] = ()) -> AbstractContextManager[list[PathString]]:
+ return contextlib.nullcontext([])
# https://github.com/torvalds/linux/blob/master/include/uapi/linux/capability.h
return flatten(m.options() for m in mounts)
+@contextlib.contextmanager
def sandbox_cmd(
*,
network: bool = False,
relaxed: bool = False,
mounts: Sequence[Mount] = (),
options: Sequence[PathString] = (),
-) -> list[PathString]:
+ extra: Sequence[PathString] = (),
+) -> Iterator[list[PathString]]:
cmdline: list[PathString] = []
mounts = list(mounts)
if not relaxed:
- # We want to use an empty subdirectory in the host's temporary directory as the sandbox's /var/tmp. To make
- # sure it only gets created when we run the sandboxed command and cleaned up when the sandboxed command exits,
- # we create it using shell.
+ # We want to use an empty subdirectory in the host's temporary directory as the sandbox's /var/tmp.
vartmp = Path(os.getenv("TMPDIR", "/var/tmp")) / f"mkosi-var-tmp-{uuid.uuid4().hex[:16]}"
- cmdline += ["sh", "-c", f"trap 'rm -rf {vartmp}' EXIT && mkdir --mode 1777 {vartmp} && $0 \"$@\""]
else:
vartmp = None
ops += ["chmod 755 /etc"]
ops += ["exec $0 \"$@\""]
- cmdline += ["sh", "-c", " && ".join(ops)]
+ cmdline += ["sh", "-c", " && ".join(ops), *extra]
- return cmdline
+ if vartmp:
+ vartmp.mkdir(mode=0o1777)
+
+ try:
+ yield cmdline
+ finally:
+ if vartmp:
+ shutil.rmtree(vartmp)
def apivfs_cmd() -> list[PathString]: