From 79f5e82e5a4236c79867a41fd19d0cd010d6b215 Mon Sep 17 00:00:00 2001 From: Joerg Behrmann Date: Tue, 19 Sep 2023 10:42:02 +0200 Subject: [PATCH] treewide: address ruff warnings --- mkosi/__init__.py | 56 +++++--- mkosi/config.py | 69 ++++++---- mkosi/distributions/__init__.py | 6 +- mkosi/distributions/centos.py | 232 +++++++++++++++++++++++++++----- mkosi/distributions/fedora.py | 7 +- mkosi/distributions/opensuse.py | 4 +- mkosi/installer/apt.py | 4 +- mkosi/log.py | 3 +- mkosi/mounts.py | 8 +- mkosi/partition.py | 4 +- mkosi/qemu.py | 29 +++- mkosi/run.py | 15 ++- mkosi/util.py | 8 +- tests/test_config.py | 14 +- tests/test_versioncomp.py | 10 +- 15 files changed, 359 insertions(+), 110 deletions(-) diff --git a/mkosi/__init__.py b/mkosi/__init__.py index c686902fd..3b2e5c61b 100644 --- a/mkosi/__init__.py +++ b/mkosi/__init__.py @@ -19,7 +19,7 @@ import textwrap import uuid from collections.abc import Iterator, Sequence from pathlib import Path -from typing import ContextManager, Optional, TextIO, Union +from typing import Optional, TextIO, Union from mkosi.archive import extract_tar, make_cpio, make_tar from mkosi.config import ( @@ -151,7 +151,9 @@ def install_build_packages(state: MkosiState) -> None: if not need_build_packages(state.config): return - with complete_step(f"Installing build packages for {str(state.config.distribution).capitalize()}"), mount_build_overlay(state): + # TODO: move to parenthesised context managers once on 3.10 + pd = str(state.config.distribution).capitalize() + with complete_step(f"Installing build packages for {pd}"), mount_build_overlay(state): state.config.distribution.install_packages(state, state.config.build_packages) @@ -196,7 +198,7 @@ def mount_cache_overlay(state: MkosiState) -> Iterator[None]: yield -def mount_build_overlay(state: MkosiState, read_only: bool = False) -> ContextManager[Path]: +def mount_build_overlay(state: MkosiState, read_only: bool = False) -> contextlib.AbstractContextManager[Path]: d = state.workspace / "build-overlay" if not d.is_symlink(): with umask(~0o755): @@ -559,7 +561,7 @@ def find_grub_bios_directory(state: MkosiState) -> Optional[Path]: def find_grub_binary(state: MkosiState, binary: str) -> Optional[Path]: path = ":".join(os.fspath(p) for p in [state.root / "usr/bin", state.root / "usr/sbin"]) - assert "grub" in binary and not "grub2" in binary + assert "grub" in binary and "grub2" not in binary path = shutil.which(binary, path=path) or shutil.which(binary.replace("grub", "grub2"), path=path) if not path: @@ -716,12 +718,18 @@ def prepare_grub_bios(state: MkosiState, partitions: Sequence[Partition]) -> Non kimg = Path(shutil.copy2(state.root / kimg, kdst / "vmlinuz")) kmods = Path(shutil.copy2(kmods, kdst / "kmods")) + distribution = state.config.distribution + image = kimg.relative_to(state.root / "efi") + cmdline = " ".join(state.config.kernel_command_line) + initrd = initrd.relative_to(state.root / "efi") + kmods = kmods.relative_to(state.root / "efi") + f.write( textwrap.dedent( f"""\ - menuentry "{state.config.distribution}-{kver}" {{ - linux /{kimg.relative_to(state.root / "efi")} {root} {" ".join(state.config.kernel_command_line)} - initrd /{initrd.relative_to(state.root / "efi")} /{kmods.relative_to(state.root / "efi")} + menuentry "{distribution}-{kver}" {{ + linux /{image} {root} {cmdline} + initrd /{initrd} /{kmods} }} """ ) @@ -914,7 +922,11 @@ def build_initrd(state: MkosiState) -> Path: "--make-initrd", "yes", "--bootable", "no", "--manifest-format", "", - *(["--source-date-epoch", str(state.config.source_date_epoch)] if state.config.source_date_epoch is not None else []), + *( + ["--source-date-epoch", str(state.config.source_date_epoch)] + if state.config.source_date_epoch is not None else + [] + ), *(["--locale", state.config.locale] if state.config.locale else []), *(["--locale-messages", state.config.locale_messages] if state.config.locale_messages else []), *(["--keymap", state.config.keymap] if state.config.keymap else []), @@ -1084,7 +1096,10 @@ def install_uki(state: MkosiState, partitions: Sequence[Partition]) -> None: shutil.copy(state.root / kimg, state.staging / state.config.output_split_kernel) break - if state.config.output_format in (OutputFormat.cpio, OutputFormat.uki) and state.config.bootable == ConfigFeature.auto: + if ( + state.config.output_format in (OutputFormat.cpio, OutputFormat.uki) and + state.config.bootable == ConfigFeature.auto + ): return if state.config.architecture.to_efi() is None and state.config.bootable == ConfigFeature.auto: @@ -1110,7 +1125,9 @@ def install_uki(state: MkosiState, partitions: Sequence[Partition]) -> None: if state.config.bootloader == Bootloader.uki: boot_binary = state.root / "efi/EFI/BOOT/BOOTX64.EFI" elif state.config.image_version: - boot_binary = state.root / f"efi/EFI/Linux/{image_id}_{state.config.image_version}-{kver}{boot_count}.efi" + boot_binary = ( + state.root / f"efi/EFI/Linux/{image_id}_{state.config.image_version}-{kver}{boot_count}.efi" + ) elif roothash: _, _, h = roothash.partition("=") boot_binary = state.root / f"efi/EFI/Linux/{image_id}-{kver}-{h}{boot_count}.efi" @@ -1245,7 +1262,7 @@ def calculate_signature(state: MkosiState) -> None: # Set the path of the keyring to use based on the environment if possible and fallback to the default # path. Without this the keyring for the root user will instead be used which will fail for a # non-root build. - env = dict(GNUPGHOME=os.environ.get("GNUPGHOME", os.fspath(((Path(os.environ["HOME"]) / ".gnupg"))))) + env = dict(GNUPGHOME=os.environ.get("GNUPGHOME", os.fspath(Path(os.environ["HOME"]) / ".gnupg"))) if sys.stderr.isatty(): env |= dict(GPGTTY=os.ttyname(sys.stderr.fileno())) @@ -2079,7 +2096,8 @@ def bump_image_version(uid: int = -1, gid: int = -1) -> None: except ValueError: new_version = version + ".2" logging.info( - f"Last component of current version is not a decimal integer, appending '.2', bumping '{version}' → '{new_version}'." + "Last component of current version is not a decimal integer, " + f"appending '.2', bumping '{version}' → '{new_version}'." ) else: new_version = ".".join(v[:-1] + [str(m + 1)]) @@ -2130,7 +2148,10 @@ def expand_specifier(s: str) -> str: def needs_build(args: MkosiArgs, config: MkosiConfig) -> bool: - return args.verb.needs_build() and (args.force > 0 or not (config.output_dir / config.output_with_compression).exists()) + return ( + args.verb.needs_build() and + (args.force > 0 or not (config.output_dir / config.output_with_compression).exists()) + ) @contextlib.contextmanager @@ -2179,7 +2200,10 @@ def finalize_tools(args: MkosiArgs, presets: Sequence[MkosiConfig]) -> Sequence[ "--incremental", str(p.incremental), "--acl", str(p.acl), "--format", "directory", - *flatten(["--package", package] for package in itertools.chain(distribution.tools_tree_packages(), p.tools_tree_packages)), + *flatten( + ["--package", package] + for package in itertools.chain(distribution.tools_tree_packages(), p.tools_tree_packages)) + , "--output", f"{distribution}-tools", "--bootable", "no", "--manifest-format", "", @@ -2223,7 +2247,9 @@ def mount_tools(tree: Optional[Path]) -> Iterator[None]: continue (Path("/") / subdir).mkdir(parents=True, exist_ok=True) - stack.enter_context(mount(what=tree / subdir, where=Path("/") / subdir, operation="--bind", read_only=True)) + stack.enter_context( + mount(what=tree / subdir, where=Path("/") / subdir, operation="--bind", read_only=True) + ) yield diff --git a/mkosi/config.py b/mkosi/config.py index daa088dd0..3539831da 100644 --- a/mkosi/config.py +++ b/mkosi/config.py @@ -18,9 +18,9 @@ import shutil import subprocess import textwrap import uuid -from collections.abc import Iterable, Sequence +from collections.abc import Iterable, Iterator, Sequence from pathlib import Path -from typing import Any, Callable, Iterator, Optional, Type, Union, cast +from typing import Any, Callable, Optional, Union, cast from mkosi.architecture import Architecture from mkosi.distributions import Distribution, detect_distribution @@ -287,7 +287,10 @@ def config_parse_source_date_epoch(value: Optional[str], old: Optional[int]) -> def config_default_compression(namespace: argparse.Namespace) -> Compression: if namespace.output_format in (OutputFormat.cpio, OutputFormat.uki): - return Compression.xz if namespace.distribution.is_centos_variant() and int(namespace.release) <= 8 else Compression.zst + if namespace.distribution.is_centos_variant() and int(namespace.release) <= 8: + return Compression.xz + else: + return Compression.zst else: return Compression.none @@ -340,7 +343,7 @@ def config_default_source_date_epoch(namespace: argparse.Namespace) -> Optional[ return config_parse_source_date_epoch(os.environ.get("SOURCE_DATE_EPOCH"), None) -def make_enum_parser(type: Type[enum.Enum]) -> Callable[[str], enum.Enum]: +def make_enum_parser(type: type[enum.Enum]) -> Callable[[str], enum.Enum]: def parse_enum(value: str) -> enum.Enum: try: return type(value) @@ -350,14 +353,14 @@ def make_enum_parser(type: Type[enum.Enum]) -> Callable[[str], enum.Enum]: return parse_enum -def config_make_enum_parser(type: Type[enum.Enum]) -> ConfigParseCallback: +def config_make_enum_parser(type: type[enum.Enum]) -> ConfigParseCallback: def config_parse_enum(value: Optional[str], old: Optional[enum.Enum]) -> Optional[enum.Enum]: return make_enum_parser(type)(value) if value else None return config_parse_enum -def config_make_enum_matcher(type: Type[enum.Enum]) -> ConfigMatchCallback: +def config_make_enum_matcher(type: type[enum.Enum]) -> ConfigMatchCallback: def config_match_enum(match: str, value: enum.Enum) -> bool: return make_enum_parser(type)(match) == value @@ -553,7 +556,7 @@ class CustomHelpFormatter(argparse.HelpFormatter): subsequent_indent=subindent) for line in lines) -def config_make_action(settings: Sequence[MkosiConfigSetting]) -> Type[argparse.Action]: +def config_make_action(settings: Sequence[MkosiConfigSetting]) -> type[argparse.Action]: lookup = {s.dest: s for s in settings} class MkosiAction(argparse.Action): @@ -833,20 +836,20 @@ def parse_ini(path: Path, only_sections: Sequence[str] = ()) -> Iterator[tuple[s setting: Optional[str] = None value: Optional[str] = None - for l in textwrap.dedent(path.read_text()).splitlines(): + for line in textwrap.dedent(path.read_text()).splitlines(): # Systemd unit files allow both '#' and ';' to indicate comments so we do the same. for c in ("#", ";"): - comment = l.find(c) + comment = line.find(c) if comment >= 0: - l = l[:comment] + line = line[:comment] - if not l.strip(): + if not line.strip(): continue # If we have a section, setting and value, any line that's indented is considered part of the # setting's value. - if section and setting and value is not None and l[0].isspace(): - value = f"{value}\n{l.strip()}" + if section and setting and value is not None and line[0].isspace(): + value = f"{value}\n{line.strip()}" continue # So the line is not indented, that means we either found a new section or a new setting. Either way, @@ -855,29 +858,29 @@ def parse_ini(path: Path, only_sections: Sequence[str] = ()) -> Iterator[tuple[s yield section, setting, value setting = value = None - l = l.strip() + line = line.strip() - if l[0] == '[': - if l[-1] != ']': - die(f"{l} is not a valid section") + if line[0] == '[': + if line[-1] != ']': + die(f"{line} is not a valid section") - section = l[1:-1].strip() + section = line[1:-1].strip() if not section: die("Section name cannot be empty or whitespace") continue if not section: - die(f"Setting {l} is located outside of section") + die(f"Setting {line} is located outside of section") if only_sections and section not in only_sections: continue - setting, delimiter, value = l.partition("=") + setting, delimiter, value = line.partition("=") if not delimiter: die(f"Setting {setting} must be followed by '='") if not setting: - die(f"Missing setting name before '=' in {l}") + die(f"Missing setting name before '=' in {line}") setting = setting.strip() value = value.strip() @@ -1976,7 +1979,9 @@ def parse_config(argv: Sequence[str] = ()) -> tuple[MkosiArgs, tuple[MkosiConfig if path.exists(): logging.debug(f"Including configuration file {Path.cwd() / path}") - for section, k, v in parse_ini(path, only_sections=["Distribution", "Output", "Content", "Validation", "Host", "Preset"]): + for section, k, v in parse_ini( + path, only_sections=["Distribution", "Output", "Content", "Validation", "Host", "Preset"] + ): ns = defaults if k.startswith("@") else namespace if not (s := settings_lookup_by_name.get(k.removeprefix("@"))): @@ -2134,7 +2139,11 @@ def load_credentials(args: argparse.Namespace) -> dict[str, str]: if "firstboot.locale" not in creds: creds["firstboot.locale"] = "C.UTF-8" - if args.ssh and "ssh.authorized_keys.root" not in creds and "SSH_AUTH_SOCK" in os.environ and shutil.which("ssh-add"): + if ( + args.ssh and + "ssh.authorized_keys.root" not in creds and + "SSH_AUTH_SOCK" in os.environ and shutil.which("ssh-add") + ): key = run( ["ssh-add", "-L"], stdout=subprocess.PIPE, @@ -2269,8 +2278,13 @@ def load_config(args: argparse.Namespace) -> MkosiConfig: if args.incremental and not args.cache_dir: die("A cache directory must be configured in order to use --incremental") - # For unprivileged builds we need the userxattr OverlayFS mount option, which is only available in Linux v5.11 and later. - if (args.build_script is not None or args.base_trees) and GenericVersion(platform.release()) < GenericVersion("5.11") and os.geteuid() != 0: + # For unprivileged builds we need the userxattr OverlayFS mount option, which is only available + # in Linux v5.11 and later. + if ( + (args.build_script is not None or args.base_trees) and + GenericVersion(platform.release()) < GenericVersion("5.11") and + os.geteuid() != 0 + ): die("This unprivileged build configuration requires at least Linux v5.11") return MkosiConfig.from_namespace(args) @@ -2321,9 +2335,8 @@ def line_join_source_target_list(array: Sequence[tuple[Path, Optional[Path]]]) - def summary(args: MkosiArgs, config: MkosiConfig) -> str: - b = Style.bold - e = Style.reset - bold: Callable[..., str] = lambda s: f"{b}{s}{e}" + def bold(s: Any) -> str: + return f"{Style.bold}{s}{Style.reset}" maniformats = (" ".join(i.name for i in config.manifest_format)) or "(none)" env = [f"{k}={v}" for k, v in config.environment.items()] diff --git a/mkosi/distributions/__init__.py b/mkosi/distributions/__init__.py index 10fb97ce9..3f0ee994d 100644 --- a/mkosi/distributions/__init__.py +++ b/mkosi/distributions/__init__.py @@ -4,7 +4,7 @@ import enum import importlib import re from collections.abc import Sequence -from typing import TYPE_CHECKING, Optional, Type, cast +from typing import TYPE_CHECKING, Optional, cast from mkosi.architecture import Architecture from mkosi.log import die @@ -122,13 +122,13 @@ class Distribution(StrEnum): def tools_tree_packages(self) -> list[str]: return self.installer().tools_tree_packages() - def installer(self) -> Type[DistributionInstaller]: + def installer(self) -> type[DistributionInstaller]: try: mod = importlib.import_module(f"mkosi.distributions.{self}") installer = getattr(mod, f"{str(self).title().replace('_','')}Installer") if not issubclass(installer, DistributionInstaller): die(f"Distribution installer for {self} is not a subclass of DistributionInstaller") - return cast(Type[DistributionInstaller], installer) + return cast(type[DistributionInstaller], installer) except (ImportError, AttributeError): die("No installer for this distribution.") diff --git a/mkosi/distributions/centos.py b/mkosi/distributions/centos.py index 5487083ad..afa994c05 100644 --- a/mkosi/distributions/centos.py +++ b/mkosi/distributions/centos.py @@ -28,8 +28,12 @@ def move_rpm_db(root: Path) -> None: newdb.symlink_to(os.path.relpath(olddb, start=newdb.parent)) -class CentosInstaller(DistributionInstaller): +def join_mirror(config: MkosiConfig, link: str) -> str: + assert config.mirror is not None + return urllib.parse.urljoin(config.mirror, link) + +class CentosInstaller(DistributionInstaller): @classmethod def filesystem(cls) -> str: return "xfs" @@ -133,43 +137,113 @@ class CentosInstaller(DistributionInstaller): if config.mirror: if int(config.release) <= 8: return [ - Repo(repo.lower(), f"baseurl={urllib.parse.urljoin(config.mirror, f'centos/$stream/{repo}/$basearch/os')}", cls.gpgurls()), - Repo(f"{repo.lower()}-debuginfo", f"baseurl={urllib.parse.urljoin(config.mirror, 'centos-debuginfo/$stream/$basearch')}", cls.gpgurls(), enabled=False), - Repo(f"{repo.lower()}-source", f"baseurl={urllib.parse.urljoin(config.mirror, f'centos/$stream/{repo}/Source')}", cls.gpgurls(), enabled=False), + Repo( + repo.lower(), + f"baseurl={join_mirror(config, f'centos/$stream/{repo}/$basearch/os')}", + cls.gpgurls() + ), + Repo( + f"{repo.lower()}-debuginfo", + f"baseurl={join_mirror(config, 'centos-debuginfo/$stream/$basearch')}", + cls.gpgurls(), + enabled=False, + ), + Repo( + f"{repo.lower()}-source", + f"baseurl={join_mirror(config, f'centos/$stream/{repo}/Source')}", + cls.gpgurls(), + enabled=False, + ), ] else: if repo == "extras": return [ - Repo(repo.lower(), f"baseurl={urllib.parse.urljoin(config.mirror, f'SIGs/$stream/{repo}/$basearch/extras-common')}", cls.gpgurls()), - Repo(f"{repo.lower()}-source", f"baseurl={urllib.parse.urljoin(config.mirror, f'SIGs/$stream/{repo}/source/extras-common')}", cls.gpgurls(), enabled=False), + Repo( + repo.lower(), + f"baseurl={join_mirror(config, f'SIGs/$stream/{repo}/$basearch/extras-common')}", + cls.gpgurls(), + ), + Repo( + f"{repo.lower()}-source", + f"baseurl={join_mirror(config, f'SIGs/$stream/{repo}/source/extras-common')}", + cls.gpgurls(), + enabled=False, + ), ] return [ - Repo(repo.lower(), f"baseurl={urllib.parse.urljoin(config.mirror, f'$stream/{repo}/$basearch/os')}", cls.gpgurls()), - Repo(f"{repo.lower()}-debuginfo", f"baseurl={urllib.parse.urljoin(config.mirror, f'$stream/{repo}/$basearch/debug/tree')}", cls.gpgurls(), enabled=False), - Repo(f"{repo.lower()}-source", f"baseurl={urllib.parse.urljoin(config.mirror, f'$stream/{repo}/source/tree')}", cls.gpgurls(), enabled=False), + Repo( + repo.lower(), + f"baseurl={join_mirror(config, f'$stream/{repo}/$basearch/os')}", + cls.gpgurls(), + ), + Repo( + f"{repo.lower()}-debuginfo", + f"baseurl={join_mirror(config, f'$stream/{repo}/$basearch/debug/tree')}", + cls.gpgurls(), + enabled=False, + ), + Repo( + f"{repo.lower()}-source", + f"baseurl={join_mirror(config, f'$stream/{repo}/source/tree')}", + cls.gpgurls(), + enabled=False, + ), ] else: if int(config.release) <= 8: return [ - Repo(repo.lower(), f"mirrorlist=http://mirrorlist.centos.org/?release=$stream&arch=$basearch&repo={repo}", cls.gpgurls()), + Repo( + repo.lower(), + f"mirrorlist=http://mirrorlist.centos.org/?release=$stream&arch=$basearch&repo={repo}", + cls.gpgurls(), + ), # These can't be retrieved from the mirrorlist. - Repo(f"{repo.lower()}-debuginfo", "baseurl=http://debuginfo.centos.org/$stream/$basearch", cls.gpgurls(), enabled=False), - Repo(f"{repo.lower()}-source", f"baseurl=https://vault.centos.org/centos/$stream/{repo}/Source", cls.gpgurls(), enabled=False), + Repo( + f"{repo.lower()}-debuginfo", + "baseurl=http://debuginfo.centos.org/$stream/$basearch", + cls.gpgurls(), + enabled=False, + ), + Repo( + f"{repo.lower()}-source", + f"baseurl=https://vault.centos.org/centos/$stream/{repo}/Source", + cls.gpgurls(), + enabled=False, + ), ] else: url = "metalink=https://mirrors.centos.org/metalink" if repo == "extras": return [ - Repo(repo.lower(), f"{url}?arch=$basearch&repo=centos-extras-sig-extras-common-$stream", cls.gpgurls()), - Repo(f"{repo.lower()}-source", f"{url}?arch=source&repo=centos-extras-sig-extras-common-source-$stream", cls.gpgurls(), enabled=False), + Repo( + repo.lower(), + f"{url}?arch=$basearch&repo=centos-extras-sig-extras-common-$stream", + cls.gpgurls(), + ), + Repo( + f"{repo.lower()}-source", + f"{url}?arch=source&repo=centos-extras-sig-extras-common-source-$stream", + cls.gpgurls(), + enabled=False, + ), ] return [ Repo(repo.lower(), f"{url}?arch=$basearch&repo=centos-{repo.lower()}-$stream", cls.gpgurls()), - Repo(f"{repo.lower()}-debuginfo", f"{url}?arch=$basearch&repo=centos-{repo.lower()}-debug-$stream", cls.gpgurls(), enabled=False), - Repo(f"{repo.lower()}-source", f"{url}?arch=source&repo=centos-{repo.lower()}-source-$stream", cls.gpgurls(), enabled=False), + Repo( + f"{repo.lower()}-debuginfo", + f"{url}?arch=$basearch&repo=centos-{repo.lower()}-debug-$stream", + cls.gpgurls(), + enabled=False, + ), + Repo( + f"{repo.lower()}-source", + f"{url}?arch=source&repo=centos-{repo.lower()}-source-$stream", + cls.gpgurls(), + enabled=False, + ), ] @classmethod @@ -200,11 +274,31 @@ class CentosInstaller(DistributionInstaller): repos = [] if config.mirror: - for repo, dir in (("epel", "epel"), ("epel-next", "epel/next"), ("epel-testing", "epel/testing"), ("epel-next-testing", "epel/testing/next")): + for repo, dir in ( + ("epel", "epel"), + ("epel-next", "epel/next"), + ("epel-testing", "epel/testing"), + ("epel-next-testing", "epel/testing/next") + ): repos += [ - Repo(repo, f"baseurl={urllib.parse.urljoin(config.mirror, f'{dir}/$releasever/Everything/$basearch')}", gpgurls, enabled=False), - Repo(f"{repo}-debuginfo", f"baseurl={urllib.parse.urljoin(config.mirror, f'{dir}/$releasever/Everything/$basearch/debug')}", gpgurls, enabled=False), - Repo(f"{repo}-source", f"baseurl={urllib.parse.urljoin(config.mirror, f'{dir}/$releasever/Everything/source/tree')}", gpgurls, enabled=False), + Repo( + repo, + f"baseurl={join_mirror(config, f'{dir}/$releasever/Everything/$basearch')}", + gpgurls, + enabled=False, + ), + Repo( + f"{repo}-debuginfo", + f"baseurl={join_mirror(config, f'{dir}/$releasever/Everything/$basearch/debug')}", + gpgurls, + enabled=False, + ), + Repo( + f"{repo}-source", + f"baseurl={join_mirror(config, f'{dir}/$releasever/Everything/source/tree')}", + gpgurls, + enabled=False, + ), ] else: url = "metalink=https://mirrors.fedoraproject.org/metalink?arch=$basearch" @@ -220,8 +314,18 @@ class CentosInstaller(DistributionInstaller): Repo("epel-testing-debuginfo", f"{url}&repo=testing-debug-epel$releasever", gpgurls, enabled=False), Repo("epel-testing-source", f"{url}&repo=testing-source-epel$releasever", gpgurls, enabled=False), Repo("epel-next-testing", f"{url}&repo=epel-testing-next-$releasever", gpgurls, enabled=False), - Repo("epel-next-testing-debuginfo", f"{url}&repo=epel-testing-next-debug-$releasever", gpgurls, enabled=False), - Repo("epel-next-testing-source", f"{url}&repo=epel-testing-next-source-$releasever", gpgurls, enabled=False), + Repo( + "epel-next-testing-debuginfo", + f"{url}&repo=epel-testing-next-debug-$releasever", + gpgurls, + enabled=False, + ), + Repo( + "epel-next-testing-source", + f"{url}&repo=epel-testing-next-source-$releasever", + gpgurls, + enabled=False, + ), ] return repos @@ -246,30 +350,90 @@ class CentosInstaller(DistributionInstaller): if config.mirror: if int(config.release) <= 8: repos += [ - Repo(f"{sig}-{c}", f"baseurl={urllib.parse.urljoin(config.mirror, f'centos/$stream/{sig}/$basearch/{c}')}", gpgurls, enabled=False), - Repo(f"{sig}-{c}-debuginfo", f"baseurl={urllib.parse.urljoin(config.mirror, f'$stream/{sig}/$basearch')}", gpgurls, enabled=False), - Repo(f"{sig}-{c}-source", f"baseurl={urllib.parse.urljoin(config.mirror, f'centos/$stream/{sig}/Source')}", gpgurls, enabled=False), + Repo( + f"{sig}-{c}", + f"baseurl={join_mirror(config, f'centos/$stream/{sig}/$basearch/{c}')}", + gpgurls, + enabled=False, + ), + Repo( + f"{sig}-{c}-debuginfo", + f"baseurl={join_mirror(config, f'$stream/{sig}/$basearch')}", + gpgurls, + enabled=False, + ), + Repo( + f"{sig}-{c}-source", + f"baseurl={join_mirror(config, f'centos/$stream/{sig}/Source')}", + gpgurls, + enabled=False, + ), ] else: repos += [ - Repo(f"{sig}-{c}", f"baseurl={urllib.parse.urljoin(config.mirror, f'SIGs/$stream/{sig}/$basearch/{c}')}", gpgurls, enabled=False), - Repo(f"{sig}-{c}-debuginfo", f"baseurl={urllib.parse.urljoin(config.mirror, f'SIGs/$stream/{sig}/$basearch/{c}/debug')}", gpgurls, enabled=False), - Repo(f"{sig}-{c}-source", f"baseurl={urllib.parse.urljoin(config.mirror, f'SIGs/$stream/{sig}/source/{c}')}", gpgurls, enabled=False), + Repo( + f"{sig}-{c}", + f"baseurl={join_mirror(config, f'SIGs/$stream/{sig}/$basearch/{c}')}", + gpgurls, + enabled=False, + ), + Repo( + f"{sig}-{c}-debuginfo", + f"baseurl={join_mirror(config, f'SIGs/$stream/{sig}/$basearch/{c}/debug')}", + gpgurls, + enabled=False, + ), + Repo( + f"{sig}-{c}-source", + f"baseurl={join_mirror(config, f'SIGs/$stream/{sig}/source/{c}')}", + gpgurls, + enabled=False, + ), ] else: if int(config.release) <= 8: repos += [ - Repo(f"{sig}-{c}", f"mirrorlist=http://mirrorlist.centos.org/?release=$stream&arch=$basearch&repo={sig}-{c}", gpgurls, enabled=False), + Repo( + f"{sig}-{c}", + f"mirrorlist=http://mirrorlist.centos.org/?release=$stream&arch=$basearch&repo={sig}-{c}", + gpgurls, + enabled=False, + ), # These can't be retrieved from the mirrorlist. - Repo(f"{sig}-{c}-debuginfo", f"baseurl=http://debuginfo.centos.org/centos/$stream/{sig}/$basearch", gpgurls, enabled=False), - Repo(f"{sig}-{c}-source", f"baseurl=https://vault.centos.org/$stream/{sig}/Source/{c}", gpgurls, enabled=False), + Repo( + f"{sig}-{c}-debuginfo", + f"baseurl=http://debuginfo.centos.org/centos/$stream/{sig}/$basearch", + gpgurls, + enabled=False, + ), + Repo( + f"{sig}-{c}-source", + f"baseurl=https://vault.centos.org/$stream/{sig}/Source/{c}", + gpgurls, + enabled=False, + ), ] else: url = "metalink=https://mirrors.centos.org/metalink" repos += [ - Repo(f"{sig}-{c}", f"{url}?arch=$basearch&repo=centos-{sig}-sig-{c}-$stream", gpgurls, enabled=False), - Repo(f"{sig}-{c}-debuginfo", f"{url}?arch=$basearch&repo=centos-{sig}-sig-{c}-debug-$stream", gpgurls, enabled=False), - Repo(f"{sig}-{c}-source", f"{url}?arch=source&repo=centos-{sig}-sig-{c}-source-$stream", gpgurls, enabled=False), + Repo( + f"{sig}-{c}", + f"{url}?arch=$basearch&repo=centos-{sig}-sig-{c}-$stream", + gpgurls, + enabled=False, + ), + Repo( + f"{sig}-{c}-debuginfo", + f"{url}?arch=$basearch&repo=centos-{sig}-sig-{c}-debug-$stream", + gpgurls, + enabled=False, + ), + Repo( + f"{sig}-{c}-source", + f"{url}?arch=source&repo=centos-{sig}-sig-{c}-source-$stream", + gpgurls, + enabled=False, + ), ] repos += [ diff --git a/mkosi/distributions/fedora.py b/mkosi/distributions/fedora.py index 4133cc6e3..1bf86d532 100644 --- a/mkosi/distributions/fedora.py +++ b/mkosi/distributions/fedora.py @@ -110,7 +110,12 @@ class FedoraInstaller(DistributionInstaller): if state.config.release != "rawhide": repos += [ Repo("updates", f"{url}&repo=updates-released-f$releasever", gpgurls), - Repo("updates-debuginfo", f"{url}&repo=updates-released-debug-f$releasever", gpgurls, enabled=False), + Repo( + "updates-debuginfo", + f"{url}&repo=updates-released-debug-f$releasever", + gpgurls, + enabled=False, + ), Repo("updates-source", f"{url}&repo=updates-released-source-f$releasever", gpgurls, enabled=False), ] diff --git a/mkosi/distributions/opensuse.py b/mkosi/distributions/opensuse.py index 89d707b38..ff659b51c 100644 --- a/mkosi/distributions/opensuse.py +++ b/mkosi/distributions/opensuse.py @@ -95,7 +95,9 @@ class OpensuseInstaller(DistributionInstaller): else: repos = [Repo("repo-oss", f"baseurl={release_url}", fetch_gpgurls(release_url) if not zypper else ())] if updates_url is not None: - repos += [Repo("repo-update", f"baseurl={updates_url}", fetch_gpgurls(updates_url) if not zypper else ())] + repos += [ + Repo("repo-update", f"baseurl={updates_url}", fetch_gpgurls(updates_url) if not zypper else ()) + ] if zypper: setup_zypper(state, repos) diff --git a/mkosi/installer/apt.py b/mkosi/installer/apt.py index 50a0162c2..ca1f20195 100644 --- a/mkosi/installer/apt.py +++ b/mkosi/installer/apt.py @@ -60,7 +60,9 @@ def apt_cmd(state: MkosiState, command: str) -> list[PathString]: debarch = state.config.distribution.architecture(state.config.architecture) trustedkeys = state.pkgmngr / "etc/apt/trusted.gpg" - trustedkeys = trustedkeys if trustedkeys.exists() else f"/usr/share/keyrings/{state.config.distribution}-archive-keyring.gpg" + trustedkeys = ( + trustedkeys if trustedkeys.exists() else f"/usr/share/keyrings/{state.config.distribution}-archive-keyring.gpg" + ) trustedkeys_dir = state.pkgmngr / "etc/apt/trusted.gpg.d" trustedkeys_dir = trustedkeys_dir if trustedkeys_dir.exists() else "/usr/share/keyrings" diff --git a/mkosi/log.py b/mkosi/log.py index eb72c758f..a03a9a3cd 100644 --- a/mkosi/log.py +++ b/mkosi/log.py @@ -5,7 +5,8 @@ import contextvars import logging import os import sys -from typing import Any, Iterator, NoReturn, Optional +from collections.abc import Iterator +from typing import Any, NoReturn, Optional # This global should be initialized after parsing arguments ARG_DEBUG = contextvars.ContextVar("debug", default=False) diff --git a/mkosi/mounts.py b/mkosi/mounts.py index 0a5e5fc4a..b88494973 100644 --- a/mkosi/mounts.py +++ b/mkosi/mounts.py @@ -124,7 +124,13 @@ def mount_usr(tree: Optional[Path], umount: bool = True) -> Iterator[None]: # If we mounted over /usr, trying to use umount will fail with "target is busy", because umount is # being called from /usr, which we're trying to unmount. To work around this issue, we do a lazy # unmount. - with mount(what=tree / "usr", where=Path("/usr"), operation="--bind", read_only=True, umount=umount, lazy=True): + with mount( + what=tree / "usr", + where=Path("/usr"), + operation="--bind", + read_only=True, + umount=umount, lazy=True + ): yield finally: os.environ["PATH"] = old diff --git a/mkosi/partition.py b/mkosi/partition.py index 13c51f6a8..f8a4db1ee 100644 --- a/mkosi/partition.py +++ b/mkosi/partition.py @@ -1,9 +1,9 @@ import dataclasses import json import subprocess -from collections.abc import Sequence +from collections.abc import Mapping, Sequence from pathlib import Path -from typing import Any, Mapping, Optional +from typing import Any, Optional from mkosi.log import die from mkosi.run import run diff --git a/mkosi/qemu.py b/mkosi/qemu.py index 8019bf412..5423df180 100644 --- a/mkosi/qemu.py +++ b/mkosi/qemu.py @@ -12,8 +12,9 @@ import subprocess import sys import tempfile import uuid +from collections.abc import Iterator from pathlib import Path -from typing import Iterator, Optional +from typing import Optional from mkosi.architecture import Architecture from mkosi.config import ( @@ -227,7 +228,11 @@ def run_qemu(args: MkosiArgs, config: MkosiConfig) -> None: die(f"{config.output_format} images cannot be booted with the '{config.qemu_firmware}' firmware") accel = "tcg" - auto = config.qemu_kvm == ConfigFeature.auto and config.architecture.is_native() and qemu_check_kvm_support(log=True) + auto = ( + config.qemu_kvm == ConfigFeature.auto and + config.architecture.is_native() and + qemu_check_kvm_support(log=True) + ) if config.qemu_kvm == ConfigFeature.enabled or auto: accel = "kvm" @@ -278,8 +283,13 @@ def run_qemu(args: MkosiArgs, config: MkosiConfig) -> None: if config.architecture.supports_smbios(): for k, v in config.credentials.items(): - cmdline += ["-smbios", f"type=11,value=io.systemd.credential.binary:{k}={base64.b64encode(v.encode()).decode()}"] - cmdline += ["-smbios", f"type=11,value=io.systemd.stub.kernel-cmdline-extra={' '.join(config.kernel_command_line_extra)}"] + cmdline += [ + "-smbios", f"type=11,value=io.systemd.credential.binary:{k}={base64.b64encode(v.encode()).decode()}" + ] + cmdline += [ + "-smbios", + f"type=11,value=io.systemd.stub.kernel-cmdline-extra={' '.join(config.kernel_command_line_extra)}" + ] # QEMU has built-in logic to look for the BIOS firmware so we don't need to do anything special for that. if firmware == QemuFirmware.uefi: @@ -334,7 +344,10 @@ def run_qemu(args: MkosiArgs, config: MkosiConfig) -> None: elif "-kernel" not in args.cmdline: kernel = config.output_dir / config.output_split_kernel if not kernel.exists(): - die("No kernel found, please install a kernel in the image or provide a -kernel argument to mkosi qemu") + die( + "No kernel found, please install a kernel in the image " + "or provide a -kernel argument to mkosi qemu" + ) else: kernel = None @@ -364,7 +377,11 @@ def run_qemu(args: MkosiArgs, config: MkosiConfig) -> None: "-device", "virtio-scsi-pci,id=scsi", "-device", f"scsi-{'cd' if config.qemu_cdrom else 'hd'},drive=mkosi,bootindex=1"] - if firmware == QemuFirmware.uefi and config.qemu_swtpm != ConfigFeature.disabled and shutil.which("swtpm") is not None: + if ( + firmware == QemuFirmware.uefi and + config.qemu_swtpm != ConfigFeature.disabled and + shutil.which("swtpm") is not None + ): sock = stack.enter_context(start_swtpm()) cmdline += ["-chardev", f"socket,id=chrtpm,path={sock}", "-tpmdev", "emulator,id=tpm0,chardev=chrtpm"] diff --git a/mkosi/run.py b/mkosi/run.py index 8f77a2598..063712409 100644 --- a/mkosi/run.py +++ b/mkosi/run.py @@ -17,9 +17,10 @@ import sys import tempfile import textwrap import threading +from collections.abc import Awaitable, Mapping, Sequence from pathlib import Path from types import TracebackType -from typing import Any, Awaitable, Mapping, Optional, Sequence, Tuple, Type +from typing import Any, Optional from mkosi.log import ARG_DEBUG, ARG_DEBUG_SHELL, die from mkosi.types import _FILE, CompletedProcess, PathString, Popen @@ -58,7 +59,10 @@ def read_subrange(path: Path) -> int: die(f"No mapping found for {user or uid} in {path}") if int(count) < SUBRANGE: - die(f"subuid/subgid range length must be at least {SUBRANGE}, got {count} for {user or uid} from line '{line}'") + die( + f"subuid/subgid range length must be at least {SUBRANGE}, " + f"got {count} for {user or uid} from line '{line}'" + ) return int(start) @@ -142,7 +146,7 @@ def foreground(*, new_process_group: bool = True) -> None: signal.signal(signal.SIGTTOU, old) -def ensure_exc_info() -> Tuple[Type[BaseException], BaseException, TracebackType]: +def ensure_exc_info() -> tuple[type[BaseException], BaseException, TracebackType]: exctype, exc, tb = sys.exc_info() assert exctype assert exc @@ -339,7 +343,8 @@ def bwrap( result = run([*cmdline, *cmd], env=env, log=False, stdin=stdin, stdout=stdout, input=input) except subprocess.CalledProcessError as e: if log: - logging.error(f"\"{shlex.join(os.fspath(s) for s in cmd)}\" returned non-zero exit code {e.returncode}.") + c = shlex.join(os.fspath(s) for s in cmd) + logging.error(f"\"{c}\" returned non-zero exit code {e.returncode}.") if ARG_DEBUG_SHELL.get(): run([*cmdline, "sh"], stdin=sys.stdin, check=False, env=env, log=False) raise e @@ -465,7 +470,7 @@ class MkosiAsyncioThread(threading.Thread): def __exit__( self, - type: Optional[Type[BaseException]], + type: Optional[type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType], ) -> None: diff --git a/mkosi/util.py b/mkosi/util.py index 11de8203a..3a8403117 100644 --- a/mkosi/util.py +++ b/mkosi/util.py @@ -67,8 +67,7 @@ def sort_packages(packages: Iterable[str]) -> list[str]: """Sorts packages: normal first, paths second, conditional third""" m = {"(": 2, "/": 1} - sort = lambda name: (m.get(name[0], 0), name) - return sorted(packages, key=sort) + return sorted(packages, key=lambda name: (m.get(name[0], 0), name)) def flatten(lists: Iterable[Iterable[T]]) -> list[T]: @@ -150,7 +149,10 @@ def qemu_check_vsock_support(log: bool) -> bool: return False elif e.errno in (errno.EPERM, errno.EACCES): if log: - logging.warning("Permission denied to access /dev/vhost-vsock. Not adding a vsock device to the virtual machine.") + logging.warning( + "Permission denied to access /dev/vhost-vsock. " + "Not adding a vsock device to the virtual machine." + ) return False raise e diff --git a/tests/test_config.py b/tests/test_config.py index 7f4608a27..1ecdf3c9c 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -26,13 +26,13 @@ def test_compression_enum_creation() -> None: def test_compression_enum_bool() -> None: - assert bool(Compression.none) == False - assert bool(Compression.zst) == True - assert bool(Compression.xz) == True - assert bool(Compression.bz2) == True - assert bool(Compression.gz) == True - assert bool(Compression.lz4) == True - assert bool(Compression.lzma) == True + assert not bool(Compression.none) + assert bool(Compression.zst) + assert bool(Compression.xz) + assert bool(Compression.bz2) + assert bool(Compression.gz) + assert bool(Compression.lz4) + assert bool(Compression.lzma) def test_compression_enum_str() -> None: diff --git a/tests/test_versioncomp.py b/tests/test_versioncomp.py index b6f8e8c94..98ec9d284 100644 --- a/tests/test_versioncomp.py +++ b/tests/test_versioncomp.py @@ -67,8 +67,14 @@ def test_generic_version_spec() -> None: 2 ) ) -def test_generic_version_strverscmp_improved_doc(s1: tuple[int, GenericVersion], s2: tuple[int, GenericVersion]) -> None: - """Example from the doc string of strverscmp_improved in systemd/src/fundamental/string-util-fundamental.c""" +def test_generic_version_strverscmp_improved_doc( + s1: tuple[int, GenericVersion], + s2: tuple[int, GenericVersion], +) -> None: + """Example from the doc string of strverscmp_improved. + + strverscmp_improved can be found in systemd/src/fundamental/string-util-fundamental.c + """ i1, v1 = s1 i2, v2 = s2 assert (v1 == v2) == (i1 == i2) -- 2.47.2