tmp_dir,
)
from mkosi.install import add_dropin_config_from_resource, copy_path, flock
-from mkosi.log import (
- ARG_DEBUG,
- MkosiException,
- MkosiNotSupportedException,
- MkosiPrinter,
- die,
- warn,
-)
+from mkosi.log import ARG_DEBUG, MkosiPrinter, die, warn
from mkosi.manifest import GenericVersion, Manifest
from mkosi.mounts import dissect_and_mount, mount_overlay, scandir_recursive
from mkosi.pager import page
OutputFormat.tar,
OutputFormat.cpio,
):
- die("Directory, subvolume, tar, cpio, and plain squashfs images cannot be booted in qemu.", MkosiNotSupportedException)
+ die("Directory, subvolume, tar, cpio, and plain squashfs images cannot be booted in qemu.")
if shutil.which("bsdtar") and args.distribution == Distribution.openmandriva and args.tar_strip_selinux_context:
- die("Sorry, bsdtar on OpenMandriva is incompatible with --tar-strip-selinux-context", MkosiNotSupportedException)
+ die("Sorry, bsdtar on OpenMandriva is incompatible with --tar-strip-selinux-context")
if args.cache_dir:
args.cache_dir = args.cache_dir / f"{args.distribution}~{args.release}"
if args.verb in (Verb.shell, Verb.boot):
opname = "acquire shell" if args.verb == Verb.shell else "boot"
if args.output_format in (OutputFormat.tar, OutputFormat.cpio):
- die(f"Sorry, can't {opname} with a {args.output_format} archive.", MkosiNotSupportedException)
+ die(f"Sorry, can't {opname} with a {args.output_format} archive.")
if should_compress_output(args):
- die(f"Sorry, can't {opname} with a compressed image.", MkosiNotSupportedException)
+ die(f"Sorry, can't {opname} with a compressed image.")
if args.verb == Verb.qemu:
if not args.output_format == OutputFormat.disk:
- die("Sorry, can't boot non-disk images with qemu.", MkosiNotSupportedException)
+ die("Sorry, can't boot non-disk images with qemu.")
if args.repo_dirs and not (is_dnf_distribution(args.distribution) or args.distribution == Distribution.arch):
die("--repo-dir is only supported on DNF based distributions and Arch")
die("Must be invoked as root.")
-@contextlib.contextmanager
-def suppress_stacktrace() -> Iterator[None]:
- try:
- yield
- except subprocess.CalledProcessError as e:
- # MkosiException is silenced in main() so it doesn't print a stacktrace.
- raise MkosiException() from e
-
-
def machine_name(config: Union[MkosiConfig, argparse.Namespace]) -> str:
return config.image_id or config.output.with_suffix("").name.partition("_")[0]
if config.auto_bump:
bump_image_version(config)
- with suppress_stacktrace():
- if config.verb in (Verb.shell, Verb.boot):
- run_shell(config)
+ if config.verb in (Verb.shell, Verb.boot):
+ run_shell(config)
- if config.verb == Verb.qemu:
- run_qemu(config)
+ if config.verb == Verb.qemu:
+ run_qemu(config)
- if config.verb == Verb.ssh:
- run_ssh(config)
+ if config.verb == Verb.ssh:
+ run_ssh(config)
if config.verb == Verb.serve:
run_serve(config)
import contextlib
import os
+import subprocess
import sys
from collections.abc import Iterator
-from subprocess import CalledProcessError
from mkosi import load_args, run_verb
from mkosi.config import MkosiConfigParser
-from mkosi.log import MkosiException, die
+from mkosi.log import ARG_DEBUG, MkosiPrinter, die
from mkosi.run import excepthook
def propagate_failed_return() -> Iterator[None]:
try:
yield
- except MkosiException as e:
- cause = e.__cause__
- if cause and isinstance(cause, CalledProcessError):
- sys.exit(cause.returncode)
+ except SystemExit as e:
+ if ARG_DEBUG:
+ raise e
+
+ sys.exit(e.code)
+ except KeyboardInterrupt as e:
+ if ARG_DEBUG:
+ raise e
+
+ MkosiPrinter.error("Interrupted")
+ sys.exit(1)
+ except subprocess.CalledProcessError as e:
+ if ARG_DEBUG:
+ raise e
+
+ # We always log when subprocess.CalledProcessError is raised, so we don't log again here.
+ sys.exit(e.returncode)
+ except Exception as e:
+ if ARG_DEBUG:
+ raise e
+
+ MkosiPrinter.error("Error: mkosi failed because of an exception, rerun mkosi with --debug run to get more information")
sys.exit(1)
from typing import Any, Callable, Optional, TypeVar, Union
from mkosi.distributions import DistributionInstaller
-from mkosi.log import MkosiException, die
+from mkosi.log import die
T = TypeVar("T")
V = TypeVar("V")
def safe_tar_extract(tar: tarfile.TarFile, path: Path=Path("."), *, numeric_owner: bool=False) -> None:
"""Extract a tar without CVE-2007-4559.
- Throws a MkosiException if a member of the tar resolves to a path that would
+ Throws an exception if a member of the tar resolves to a path that would
be outside of the passed in target path.
Omits the member argument from TarFile.extractall, since we don't need it at
target.resolve().relative_to(path)
members += [member]
except ValueError as e:
- raise MkosiException(f"Attempted path traversal in tar file {tar.name!r}") from e
+ die(f"Attempted path traversal in tar file {tar.name!r}", e)
tar.extractall(path, members=members, numeric_owner=numeric_owner)
from mkosi.backend import MkosiState, safe_tar_extract
from mkosi.distributions import DistributionInstaller
from mkosi.install import copy_path, flock
-from mkosi.log import ARG_DEBUG, MkosiException, MkosiPrinter, complete_step, die
+from mkosi.log import ARG_DEBUG, MkosiPrinter, complete_step, die
from mkosi.remove import unlink_try_hard
from mkosi.run import run_workspace_command
except ImportError as e:
MkosiPrinter.warn(NEED_PORTAGE_MSG)
MkosiPrinter.info(PORTAGE_INSTALL_INSTRUCTIONS)
- raise MkosiException(e)
+ raise e
return dict(profile_path=PROFILE_PATH,
custom_profile_path=CUSTOM_PROFILE_PATH,
ARG_DEBUG: set[str] = set()
-class MkosiException(Exception):
- """Leads to sys.exit"""
-
-
-class MkosiNotSupportedException(MkosiException):
- """Leads to sys.exit when an invalid combination of parsed arguments happens"""
-
-
-def die(message: str, exception: type[MkosiException] = MkosiException) -> NoReturn:
+def die(message: str, exception: Optional[Exception] = None) -> NoReturn:
MkosiPrinter.error(f"Error: {message}")
- raise exception(message)
+ raise exception or RuntimeError(message)
def warn(message: str) -> None:
return result
-
-def run(
+def _run(
cmdline: Sequence[PathString],
check: bool = True,
stdout: _FILE = None,
try:
return subprocess.run(cmdline, check=check, stdout=stdout, stderr=stderr, env=env, **kwargs,
preexec_fn=foreground, close_fds=False)
- except FileNotFoundError:
- die(f"{cmdline[0]} not found in PATH.")
+ except FileNotFoundError as e:
+ die(f"{cmdline[0]} not found in PATH.", e)
+
+
+def run(
+ cmdline: Sequence[PathString],
+ check: bool = True,
+ stdout: _FILE = None,
+ stderr: _FILE = None,
+ env: Mapping[str, PathString] = {},
+ **kwargs: Any,
+) -> CompletedProcess:
+ try:
+ return _run(cmdline, check, stdout, stderr, env, **kwargs)
+ except subprocess.CalledProcessError as e:
+ die(f"\"{shlex.join(str(s) for s in cmdline)}\" returned non-zero exit code {e.returncode}.", e)
def spawn(
return subprocess.Popen(cmdline, stdout=stdout, stderr=stderr, **kwargs, preexec_fn=foreground)
except FileNotFoundError:
die(f"{cmdline[0]} not found in PATH.")
+ except subprocess.CalledProcessError as e:
+ die(f"\"{shlex.join(str(s) for s in cmdline)}\" returned non-zero exit code {e.returncode}.", e)
def run_with_apivfs(
template = f"chmod 1777 {state.root / 'tmp'} {state.root / 'var/tmp'} {state.root / 'dev/shm'} && exec {{}} || exit $?"
try:
- return run([*cmdline, template.format(shlex.join(str(s) for s in cmd))],
+ return _run([*cmdline, template.format(shlex.join(str(s) for s in cmd))],
text=True, stdout=stdout, stderr=stderr, env=env)
except subprocess.CalledProcessError as e:
if "run" in ARG_DEBUG:
- run([*cmdline, template.format("sh")], check=False, env=env)
- die(f"\"{shlex.join(str(s) for s in cmd)}\" returned non-zero exit code {e.returncode}.")
-
+ _run([*cmdline, template.format("sh")], check=False, env=env)
+ raise e
def run_workspace_command(
state: MkosiState,
template = "chmod 1777 /tmp /var/tmp /dev/shm && PATH=$PATH:/usr/bin:/usr/sbin exec {} || exit $?"
try:
- return run([*cmdline, template.format(shlex.join(str(s) for s in cmd))],
+ return _run([*cmdline, template.format(shlex.join(str(s) for s in cmd))],
text=True, stdout=stdout, env=env)
except subprocess.CalledProcessError as e:
if "run" in ARG_DEBUG:
- run([*cmdline, template.format("sh")], check=False, env=env)
- die(f"\"{shlex.join(str(s) for s in cmd)}\" returned non-zero exit code {e.returncode}.")
+ _run([*cmdline, template.format("sh")], check=False, env=env)
+ raise e
finally:
if state.workspace.joinpath("resolv.conf").is_symlink():
resolve.unlink(missing_ok=True)
set_umask,
strip_suffixes,
)
-from mkosi.log import MkosiException
-
def test_distribution() -> None:
assert Distribution.fedora.package_type == PackageType.rpm
assert (safe_target / name).exists()
evil_target = tmp_path / "evil_target"
- with pytest.raises(MkosiException):
+ with pytest.raises(ValueError):
with tarfile.TarFile.open(evil_tar) as t:
safe_tar_extract(t, evil_target)
assert not (evil_target / name).exists()
import mkosi
from mkosi.backend import Distribution, MkosiConfig, Verb
-from mkosi.log import MkosiException
from mkosi.config import MkosiConfigParser
def test_shell_boot() -> None:
with cd_temp_dir():
- with pytest.raises(MkosiException, match=".boot.*tar"):
+ with pytest.raises(RuntimeError, match=".boot.*tar"):
parse(["--format", "tar", "boot"])
- with pytest.raises(MkosiException, match=".boot.*cpio"):
+ with pytest.raises(RuntimeError, match=".boot.*cpio"):
parse(["--format", "cpio", "boot"])
- with pytest.raises(MkosiException, match=".boot.*compressed" ):
+ with pytest.raises(RuntimeError, match=".boot.*compressed" ):
parse(["--format", "disk", "--compress-output=yes", "boot"])