]> git.ipfire.org Git - thirdparty/mkosi.git/commitdiff
treewide: address ruff warnings 1896/head
authorJoerg Behrmann <behrmann@physik.fu-berlin.de>
Tue, 19 Sep 2023 08:42:02 +0000 (10:42 +0200)
committerJoerg Behrmann <behrmann@physik.fu-berlin.de>
Tue, 19 Sep 2023 10:06:04 +0000 (12:06 +0200)
15 files changed:
mkosi/__init__.py
mkosi/config.py
mkosi/distributions/__init__.py
mkosi/distributions/centos.py
mkosi/distributions/fedora.py
mkosi/distributions/opensuse.py
mkosi/installer/apt.py
mkosi/log.py
mkosi/mounts.py
mkosi/partition.py
mkosi/qemu.py
mkosi/run.py
mkosi/util.py
tests/test_config.py
tests/test_versioncomp.py

index c686902fd3b99b11d092674c972f460d951f0765..3b2e5c61b859718fef2df56e68a01c799a7b8cfe 100644 (file)
@@ -19,7 +19,7 @@ import textwrap
 import uuid
 from collections.abc import Iterator, Sequence
 from pathlib import Path
-from typing import ContextManager, Optional, TextIO, Union
+from typing import Optional, TextIO, Union
 
 from mkosi.archive import extract_tar, make_cpio, make_tar
 from mkosi.config import (
@@ -151,7 +151,9 @@ def install_build_packages(state: MkosiState) -> None:
     if not need_build_packages(state.config):
         return
 
-    with complete_step(f"Installing build packages for {str(state.config.distribution).capitalize()}"), mount_build_overlay(state):
+    # TODO: move to parenthesised context managers once on 3.10
+    pd = str(state.config.distribution).capitalize()
+    with complete_step(f"Installing build packages for {pd}"), mount_build_overlay(state):
         state.config.distribution.install_packages(state, state.config.build_packages)
 
 
@@ -196,7 +198,7 @@ def mount_cache_overlay(state: MkosiState) -> Iterator[None]:
         yield
 
 
-def mount_build_overlay(state: MkosiState, read_only: bool = False) -> ContextManager[Path]:
+def mount_build_overlay(state: MkosiState, read_only: bool = False) -> contextlib.AbstractContextManager[Path]:
     d = state.workspace / "build-overlay"
     if not d.is_symlink():
         with umask(~0o755):
@@ -559,7 +561,7 @@ def find_grub_bios_directory(state: MkosiState) -> Optional[Path]:
 def find_grub_binary(state: MkosiState, binary: str) -> Optional[Path]:
     path = ":".join(os.fspath(p) for p in [state.root / "usr/bin", state.root / "usr/sbin"])
 
-    assert "grub" in binary and not "grub2" in binary
+    assert "grub" in binary and "grub2" not in binary
 
     path = shutil.which(binary, path=path) or shutil.which(binary.replace("grub", "grub2"), path=path)
     if not path:
@@ -716,12 +718,18 @@ def prepare_grub_bios(state: MkosiState, partitions: Sequence[Partition]) -> Non
                 kimg = Path(shutil.copy2(state.root / kimg, kdst / "vmlinuz"))
                 kmods = Path(shutil.copy2(kmods, kdst / "kmods"))
 
+                distribution = state.config.distribution
+                image = kimg.relative_to(state.root / "efi")
+                cmdline = " ".join(state.config.kernel_command_line)
+                initrd = initrd.relative_to(state.root / "efi")
+                kmods = kmods.relative_to(state.root / "efi")
+
                 f.write(
                     textwrap.dedent(
                         f"""\
-                        menuentry "{state.config.distribution}-{kver}" {{
-                            linux /{kimg.relative_to(state.root / "efi")} {root} {" ".join(state.config.kernel_command_line)}
-                            initrd /{initrd.relative_to(state.root / "efi")} /{kmods.relative_to(state.root / "efi")}
+                        menuentry "{distribution}-{kver}" {{
+                            linux /{image} {root} {cmdline}
+                            initrd /{initrd} /{kmods}
                         }}
                         """
                     )
@@ -914,7 +922,11 @@ def build_initrd(state: MkosiState) -> Path:
         "--make-initrd", "yes",
         "--bootable", "no",
         "--manifest-format", "",
-        *(["--source-date-epoch", str(state.config.source_date_epoch)] if state.config.source_date_epoch is not None else []),
+        *(
+            ["--source-date-epoch", str(state.config.source_date_epoch)]
+            if state.config.source_date_epoch is not None else
+            []
+        ),
         *(["--locale", state.config.locale] if state.config.locale else []),
         *(["--locale-messages", state.config.locale_messages] if state.config.locale_messages else []),
         *(["--keymap", state.config.keymap] if state.config.keymap else []),
@@ -1084,7 +1096,10 @@ def install_uki(state: MkosiState, partitions: Sequence[Partition]) -> None:
         shutil.copy(state.root / kimg, state.staging / state.config.output_split_kernel)
         break
 
-    if state.config.output_format in (OutputFormat.cpio, OutputFormat.uki) and state.config.bootable == ConfigFeature.auto:
+    if (
+        state.config.output_format in (OutputFormat.cpio, OutputFormat.uki) and
+        state.config.bootable == ConfigFeature.auto
+    ):
         return
 
     if state.config.architecture.to_efi() is None and state.config.bootable == ConfigFeature.auto:
@@ -1110,7 +1125,9 @@ def install_uki(state: MkosiState, partitions: Sequence[Partition]) -> None:
             if state.config.bootloader == Bootloader.uki:
                 boot_binary = state.root / "efi/EFI/BOOT/BOOTX64.EFI"
             elif state.config.image_version:
-                boot_binary = state.root / f"efi/EFI/Linux/{image_id}_{state.config.image_version}-{kver}{boot_count}.efi"
+                boot_binary = (
+                    state.root / f"efi/EFI/Linux/{image_id}_{state.config.image_version}-{kver}{boot_count}.efi"
+                )
             elif roothash:
                 _, _, h = roothash.partition("=")
                 boot_binary = state.root / f"efi/EFI/Linux/{image_id}-{kver}-{h}{boot_count}.efi"
@@ -1245,7 +1262,7 @@ def calculate_signature(state: MkosiState) -> None:
         # Set the path of the keyring to use based on the environment if possible and fallback to the default
         # path. Without this the keyring for the root user will instead be used which will fail for a
         # non-root build.
-        env = dict(GNUPGHOME=os.environ.get("GNUPGHOME", os.fspath(((Path(os.environ["HOME"]) / ".gnupg")))))
+        env = dict(GNUPGHOME=os.environ.get("GNUPGHOME", os.fspath(Path(os.environ["HOME"]) / ".gnupg")))
         if sys.stderr.isatty():
             env |= dict(GPGTTY=os.ttyname(sys.stderr.fileno()))
 
@@ -2079,7 +2096,8 @@ def bump_image_version(uid: int = -1, gid: int = -1) -> None:
     except ValueError:
         new_version = version + ".2"
         logging.info(
-            f"Last component of current version is not a decimal integer, appending '.2', bumping '{version}' → '{new_version}'."
+            "Last component of current version is not a decimal integer, "
+            f"appending '.2', bumping '{version}' → '{new_version}'."
         )
     else:
         new_version = ".".join(v[:-1] + [str(m + 1)])
@@ -2130,7 +2148,10 @@ def expand_specifier(s: str) -> str:
 
 
 def needs_build(args: MkosiArgs, config: MkosiConfig) -> bool:
-    return args.verb.needs_build() and (args.force > 0 or not (config.output_dir / config.output_with_compression).exists())
+    return (
+        args.verb.needs_build() and
+        (args.force > 0 or not (config.output_dir / config.output_with_compression).exists())
+    )
 
 
 @contextlib.contextmanager
@@ -2179,7 +2200,10 @@ def finalize_tools(args: MkosiArgs, presets: Sequence[MkosiConfig]) -> Sequence[
             "--incremental", str(p.incremental),
             "--acl", str(p.acl),
             "--format", "directory",
-            *flatten(["--package", package] for package in itertools.chain(distribution.tools_tree_packages(), p.tools_tree_packages)),
+            *flatten(
+                ["--package", package]
+                for package in itertools.chain(distribution.tools_tree_packages(), p.tools_tree_packages))
+            ,
             "--output", f"{distribution}-tools",
             "--bootable", "no",
             "--manifest-format", "",
@@ -2223,7 +2247,9 @@ def mount_tools(tree: Optional[Path]) -> Iterator[None]:
                 continue
 
             (Path("/") / subdir).mkdir(parents=True, exist_ok=True)
-            stack.enter_context(mount(what=tree / subdir, where=Path("/") / subdir, operation="--bind", read_only=True))
+            stack.enter_context(
+                mount(what=tree / subdir, where=Path("/") / subdir, operation="--bind", read_only=True)
+            )
 
         yield
 
index daa088dd0dffe1626300c76270cc3249fca8fc58..3539831da76ae38ed4d94fb2958ec79c1de74a25 100644 (file)
@@ -18,9 +18,9 @@ import shutil
 import subprocess
 import textwrap
 import uuid
-from collections.abc import Iterable, Sequence
+from collections.abc import Iterable, Iterator, Sequence
 from pathlib import Path
-from typing import Any, Callable, Iterator, Optional, Type, Union, cast
+from typing import Any, Callable, Optional, Union, cast
 
 from mkosi.architecture import Architecture
 from mkosi.distributions import Distribution, detect_distribution
@@ -287,7 +287,10 @@ def config_parse_source_date_epoch(value: Optional[str], old: Optional[int]) ->
 
 def config_default_compression(namespace: argparse.Namespace) -> Compression:
     if namespace.output_format in (OutputFormat.cpio, OutputFormat.uki):
-        return Compression.xz if namespace.distribution.is_centos_variant() and int(namespace.release) <= 8 else Compression.zst
+        if namespace.distribution.is_centos_variant() and int(namespace.release) <= 8:
+            return Compression.xz
+        else:
+            return Compression.zst
     else:
         return Compression.none
 
@@ -340,7 +343,7 @@ def config_default_source_date_epoch(namespace: argparse.Namespace) -> Optional[
     return config_parse_source_date_epoch(os.environ.get("SOURCE_DATE_EPOCH"), None)
 
 
-def make_enum_parser(type: Type[enum.Enum]) -> Callable[[str], enum.Enum]:
+def make_enum_parser(type: type[enum.Enum]) -> Callable[[str], enum.Enum]:
     def parse_enum(value: str) -> enum.Enum:
         try:
             return type(value)
@@ -350,14 +353,14 @@ def make_enum_parser(type: Type[enum.Enum]) -> Callable[[str], enum.Enum]:
     return parse_enum
 
 
-def config_make_enum_parser(type: Type[enum.Enum]) -> ConfigParseCallback:
+def config_make_enum_parser(type: type[enum.Enum]) -> ConfigParseCallback:
     def config_parse_enum(value: Optional[str], old: Optional[enum.Enum]) -> Optional[enum.Enum]:
         return make_enum_parser(type)(value) if value else None
 
     return config_parse_enum
 
 
-def config_make_enum_matcher(type: Type[enum.Enum]) -> ConfigMatchCallback:
+def config_make_enum_matcher(type: type[enum.Enum]) -> ConfigMatchCallback:
     def config_match_enum(match: str, value: enum.Enum) -> bool:
         return make_enum_parser(type)(match) == value
 
@@ -553,7 +556,7 @@ class CustomHelpFormatter(argparse.HelpFormatter):
                                      subsequent_indent=subindent) for line in lines)
 
 
-def config_make_action(settings: Sequence[MkosiConfigSetting]) -> Type[argparse.Action]:
+def config_make_action(settings: Sequence[MkosiConfigSetting]) -> type[argparse.Action]:
     lookup = {s.dest: s for s in settings}
 
     class MkosiAction(argparse.Action):
@@ -833,20 +836,20 @@ def parse_ini(path: Path, only_sections: Sequence[str] = ()) -> Iterator[tuple[s
     setting: Optional[str] = None
     value: Optional[str] = None
 
-    for l in textwrap.dedent(path.read_text()).splitlines():
+    for line in textwrap.dedent(path.read_text()).splitlines():
         # Systemd unit files allow both '#' and ';' to indicate comments so we do the same.
         for c in ("#", ";"):
-            comment = l.find(c)
+            comment = line.find(c)
             if comment >= 0:
-                l = l[:comment]
+                line = line[:comment]
 
-        if not l.strip():
+        if not line.strip():
             continue
 
         # If we have a section, setting and value, any line that's indented is considered part of the
         # setting's value.
-        if section and setting and value is not None and l[0].isspace():
-            value = f"{value}\n{l.strip()}"
+        if section and setting and value is not None and line[0].isspace():
+            value = f"{value}\n{line.strip()}"
             continue
 
         # So the line is not indented, that means we either found a new section or a new setting. Either way,
@@ -855,29 +858,29 @@ def parse_ini(path: Path, only_sections: Sequence[str] = ()) -> Iterator[tuple[s
             yield section, setting, value
             setting = value = None
 
-        l = l.strip()
+        line = line.strip()
 
-        if l[0] == '[':
-            if l[-1] != ']':
-                die(f"{l} is not a valid section")
+        if line[0] == '[':
+            if line[-1] != ']':
+                die(f"{line} is not a valid section")
 
-            section = l[1:-1].strip()
+            section = line[1:-1].strip()
             if not section:
                 die("Section name cannot be empty or whitespace")
 
             continue
 
         if not section:
-            die(f"Setting {l} is located outside of section")
+            die(f"Setting {line} is located outside of section")
 
         if only_sections and section not in only_sections:
             continue
 
-        setting, delimiter, value = l.partition("=")
+        setting, delimiter, value = line.partition("=")
         if not delimiter:
             die(f"Setting {setting} must be followed by '='")
         if not setting:
-            die(f"Missing setting name before '=' in {l}")
+            die(f"Missing setting name before '=' in {line}")
 
         setting = setting.strip()
         value = value.strip()
@@ -1976,7 +1979,9 @@ def parse_config(argv: Sequence[str] = ()) -> tuple[MkosiArgs, tuple[MkosiConfig
         if path.exists():
             logging.debug(f"Including configuration file {Path.cwd() / path}")
 
-            for section, k, v in parse_ini(path, only_sections=["Distribution", "Output", "Content", "Validation", "Host", "Preset"]):
+            for section, k, v in parse_ini(
+                path, only_sections=["Distribution", "Output", "Content", "Validation", "Host", "Preset"]
+            ):
                 ns = defaults if k.startswith("@") else namespace
 
                 if not (s := settings_lookup_by_name.get(k.removeprefix("@"))):
@@ -2134,7 +2139,11 @@ def load_credentials(args: argparse.Namespace) -> dict[str, str]:
     if "firstboot.locale" not in creds:
         creds["firstboot.locale"] = "C.UTF-8"
 
-    if args.ssh and "ssh.authorized_keys.root" not in creds and "SSH_AUTH_SOCK" in os.environ and shutil.which("ssh-add"):
+    if (
+        args.ssh and
+        "ssh.authorized_keys.root" not in creds and
+        "SSH_AUTH_SOCK" in os.environ and shutil.which("ssh-add")
+    ):
         key = run(
             ["ssh-add", "-L"],
             stdout=subprocess.PIPE,
@@ -2269,8 +2278,13 @@ def load_config(args: argparse.Namespace) -> MkosiConfig:
     if args.incremental and not args.cache_dir:
         die("A cache directory must be configured in order to use --incremental")
 
-    # For unprivileged builds we need the userxattr OverlayFS mount option, which is only available in Linux v5.11 and later.
-    if (args.build_script is not None or args.base_trees) and GenericVersion(platform.release()) < GenericVersion("5.11") and os.geteuid() != 0:
+    # For unprivileged builds we need the userxattr OverlayFS mount option, which is only available
+    # in Linux v5.11 and later.
+    if (
+        (args.build_script is not None or args.base_trees) and
+        GenericVersion(platform.release()) < GenericVersion("5.11") and
+        os.geteuid() != 0
+    ):
         die("This unprivileged build configuration requires at least Linux v5.11")
 
     return MkosiConfig.from_namespace(args)
@@ -2321,9 +2335,8 @@ def line_join_source_target_list(array: Sequence[tuple[Path, Optional[Path]]]) -
 
 
 def summary(args: MkosiArgs, config: MkosiConfig) -> str:
-    b = Style.bold
-    e = Style.reset
-    bold: Callable[..., str] = lambda s: f"{b}{s}{e}"
+    def bold(s: Any) -> str:
+        return f"{Style.bold}{s}{Style.reset}"
 
     maniformats = (" ".join(i.name for i in config.manifest_format)) or "(none)"
     env = [f"{k}={v}" for k, v in config.environment.items()]
index 10fb97ce9a999b3dc15da60cb18cb68a75256c24..3f0ee994de0efe335e7825c0ac3e349176f528c6 100644 (file)
@@ -4,7 +4,7 @@ import enum
 import importlib
 import re
 from collections.abc import Sequence
-from typing import TYPE_CHECKING, Optional, Type, cast
+from typing import TYPE_CHECKING, Optional, cast
 
 from mkosi.architecture import Architecture
 from mkosi.log import die
@@ -122,13 +122,13 @@ class Distribution(StrEnum):
     def tools_tree_packages(self) -> list[str]:
         return self.installer().tools_tree_packages()
 
-    def installer(self) -> Type[DistributionInstaller]:
+    def installer(self) -> type[DistributionInstaller]:
         try:
             mod = importlib.import_module(f"mkosi.distributions.{self}")
             installer = getattr(mod, f"{str(self).title().replace('_','')}Installer")
             if not issubclass(installer, DistributionInstaller):
                 die(f"Distribution installer for {self} is not a subclass of DistributionInstaller")
-            return cast(Type[DistributionInstaller], installer)
+            return cast(type[DistributionInstaller], installer)
         except (ImportError, AttributeError):
             die("No installer for this distribution.")
 
index 5487083adaa7dd8453100507ddd409e379e98e57..afa994c057a4f076381c85fd491181b229273dbd 100644 (file)
@@ -28,8 +28,12 @@ def move_rpm_db(root: Path) -> None:
             newdb.symlink_to(os.path.relpath(olddb, start=newdb.parent))
 
 
-class CentosInstaller(DistributionInstaller):
+def join_mirror(config: MkosiConfig, link: str) -> str:
+    assert config.mirror is not None
+    return urllib.parse.urljoin(config.mirror, link)
+
 
+class CentosInstaller(DistributionInstaller):
     @classmethod
     def filesystem(cls) -> str:
         return "xfs"
@@ -133,43 +137,113 @@ class CentosInstaller(DistributionInstaller):
         if config.mirror:
             if int(config.release) <= 8:
                 return [
-                    Repo(repo.lower(), f"baseurl={urllib.parse.urljoin(config.mirror, f'centos/$stream/{repo}/$basearch/os')}", cls.gpgurls()),
-                    Repo(f"{repo.lower()}-debuginfo", f"baseurl={urllib.parse.urljoin(config.mirror, 'centos-debuginfo/$stream/$basearch')}", cls.gpgurls(), enabled=False),
-                    Repo(f"{repo.lower()}-source", f"baseurl={urllib.parse.urljoin(config.mirror, f'centos/$stream/{repo}/Source')}", cls.gpgurls(), enabled=False),
+                    Repo(
+                        repo.lower(),
+                        f"baseurl={join_mirror(config, f'centos/$stream/{repo}/$basearch/os')}",
+                        cls.gpgurls()
+                    ),
+                    Repo(
+                        f"{repo.lower()}-debuginfo",
+                        f"baseurl={join_mirror(config, 'centos-debuginfo/$stream/$basearch')}",
+                        cls.gpgurls(),
+                        enabled=False,
+                    ),
+                    Repo(
+                        f"{repo.lower()}-source",
+                        f"baseurl={join_mirror(config, f'centos/$stream/{repo}/Source')}",
+                        cls.gpgurls(),
+                        enabled=False,
+                    ),
                 ]
             else:
                 if repo == "extras":
                     return [
-                        Repo(repo.lower(), f"baseurl={urllib.parse.urljoin(config.mirror, f'SIGs/$stream/{repo}/$basearch/extras-common')}", cls.gpgurls()),
-                        Repo(f"{repo.lower()}-source", f"baseurl={urllib.parse.urljoin(config.mirror, f'SIGs/$stream/{repo}/source/extras-common')}", cls.gpgurls(), enabled=False),
+                        Repo(
+                            repo.lower(),
+                            f"baseurl={join_mirror(config, f'SIGs/$stream/{repo}/$basearch/extras-common')}",
+                            cls.gpgurls(),
+                        ),
+                        Repo(
+                            f"{repo.lower()}-source",
+                            f"baseurl={join_mirror(config, f'SIGs/$stream/{repo}/source/extras-common')}",
+                            cls.gpgurls(),
+                            enabled=False,
+                        ),
                     ]
 
                 return [
-                    Repo(repo.lower(), f"baseurl={urllib.parse.urljoin(config.mirror, f'$stream/{repo}/$basearch/os')}", cls.gpgurls()),
-                    Repo(f"{repo.lower()}-debuginfo", f"baseurl={urllib.parse.urljoin(config.mirror, f'$stream/{repo}/$basearch/debug/tree')}", cls.gpgurls(), enabled=False),
-                    Repo(f"{repo.lower()}-source", f"baseurl={urllib.parse.urljoin(config.mirror, f'$stream/{repo}/source/tree')}", cls.gpgurls(), enabled=False),
+                    Repo(
+                        repo.lower(),
+                        f"baseurl={join_mirror(config, f'$stream/{repo}/$basearch/os')}",
+                        cls.gpgurls(),
+                    ),
+                    Repo(
+                        f"{repo.lower()}-debuginfo",
+                        f"baseurl={join_mirror(config, f'$stream/{repo}/$basearch/debug/tree')}",
+                        cls.gpgurls(),
+                        enabled=False,
+                    ),
+                    Repo(
+                        f"{repo.lower()}-source",
+                        f"baseurl={join_mirror(config, f'$stream/{repo}/source/tree')}",
+                        cls.gpgurls(),
+                        enabled=False,
+                    ),
                 ]
         else:
             if int(config.release) <= 8:
                 return [
-                    Repo(repo.lower(), f"mirrorlist=http://mirrorlist.centos.org/?release=$stream&arch=$basearch&repo={repo}", cls.gpgurls()),
+                    Repo(
+                        repo.lower(),
+                        f"mirrorlist=http://mirrorlist.centos.org/?release=$stream&arch=$basearch&repo={repo}",
+                        cls.gpgurls(),
+                    ),
                     # These can't be retrieved from the mirrorlist.
-                    Repo(f"{repo.lower()}-debuginfo", "baseurl=http://debuginfo.centos.org/$stream/$basearch", cls.gpgurls(), enabled=False),
-                    Repo(f"{repo.lower()}-source", f"baseurl=https://vault.centos.org/centos/$stream/{repo}/Source", cls.gpgurls(), enabled=False),
+                    Repo(
+                        f"{repo.lower()}-debuginfo",
+                        "baseurl=http://debuginfo.centos.org/$stream/$basearch",
+                        cls.gpgurls(),
+                        enabled=False,
+                    ),
+                    Repo(
+                        f"{repo.lower()}-source",
+                        f"baseurl=https://vault.centos.org/centos/$stream/{repo}/Source",
+                        cls.gpgurls(),
+                        enabled=False,
+                    ),
                 ]
             else:
                 url = "metalink=https://mirrors.centos.org/metalink"
 
                 if repo == "extras":
                     return [
-                        Repo(repo.lower(), f"{url}?arch=$basearch&repo=centos-extras-sig-extras-common-$stream", cls.gpgurls()),
-                        Repo(f"{repo.lower()}-source", f"{url}?arch=source&repo=centos-extras-sig-extras-common-source-$stream", cls.gpgurls(), enabled=False),
+                        Repo(
+                            repo.lower(),
+                            f"{url}?arch=$basearch&repo=centos-extras-sig-extras-common-$stream",
+                            cls.gpgurls(),
+                        ),
+                        Repo(
+                            f"{repo.lower()}-source",
+                            f"{url}?arch=source&repo=centos-extras-sig-extras-common-source-$stream",
+                            cls.gpgurls(),
+                            enabled=False,
+                        ),
                     ]
 
                 return [
                     Repo(repo.lower(), f"{url}?arch=$basearch&repo=centos-{repo.lower()}-$stream", cls.gpgurls()),
-                    Repo(f"{repo.lower()}-debuginfo", f"{url}?arch=$basearch&repo=centos-{repo.lower()}-debug-$stream", cls.gpgurls(), enabled=False),
-                    Repo(f"{repo.lower()}-source", f"{url}?arch=source&repo=centos-{repo.lower()}-source-$stream", cls.gpgurls(), enabled=False),
+                    Repo(
+                        f"{repo.lower()}-debuginfo",
+                        f"{url}?arch=$basearch&repo=centos-{repo.lower()}-debug-$stream",
+                        cls.gpgurls(),
+                        enabled=False,
+                    ),
+                    Repo(
+                        f"{repo.lower()}-source",
+                        f"{url}?arch=source&repo=centos-{repo.lower()}-source-$stream",
+                        cls.gpgurls(),
+                        enabled=False,
+                    ),
                 ]
 
     @classmethod
@@ -200,11 +274,31 @@ class CentosInstaller(DistributionInstaller):
         repos = []
 
         if config.mirror:
-            for repo, dir in (("epel", "epel"), ("epel-next", "epel/next"), ("epel-testing", "epel/testing"), ("epel-next-testing", "epel/testing/next")):
+            for repo, dir in (
+                ("epel", "epel"),
+                ("epel-next", "epel/next"),
+                ("epel-testing", "epel/testing"),
+                ("epel-next-testing", "epel/testing/next")
+            ):
                 repos += [
-                    Repo(repo, f"baseurl={urllib.parse.urljoin(config.mirror, f'{dir}/$releasever/Everything/$basearch')}", gpgurls, enabled=False),
-                    Repo(f"{repo}-debuginfo", f"baseurl={urllib.parse.urljoin(config.mirror, f'{dir}/$releasever/Everything/$basearch/debug')}", gpgurls, enabled=False),
-                    Repo(f"{repo}-source", f"baseurl={urllib.parse.urljoin(config.mirror, f'{dir}/$releasever/Everything/source/tree')}", gpgurls, enabled=False),
+                    Repo(
+                        repo,
+                        f"baseurl={join_mirror(config, f'{dir}/$releasever/Everything/$basearch')}",
+                        gpgurls,
+                        enabled=False,
+                    ),
+                    Repo(
+                        f"{repo}-debuginfo",
+                        f"baseurl={join_mirror(config, f'{dir}/$releasever/Everything/$basearch/debug')}",
+                        gpgurls,
+                        enabled=False,
+                    ),
+                    Repo(
+                        f"{repo}-source",
+                        f"baseurl={join_mirror(config, f'{dir}/$releasever/Everything/source/tree')}",
+                        gpgurls,
+                        enabled=False,
+                    ),
                 ]
         else:
             url = "metalink=https://mirrors.fedoraproject.org/metalink?arch=$basearch"
@@ -220,8 +314,18 @@ class CentosInstaller(DistributionInstaller):
                 Repo("epel-testing-debuginfo", f"{url}&repo=testing-debug-epel$releasever", gpgurls, enabled=False),
                 Repo("epel-testing-source", f"{url}&repo=testing-source-epel$releasever", gpgurls, enabled=False),
                 Repo("epel-next-testing", f"{url}&repo=epel-testing-next-$releasever", gpgurls, enabled=False),
-                Repo("epel-next-testing-debuginfo", f"{url}&repo=epel-testing-next-debug-$releasever", gpgurls, enabled=False),
-                Repo("epel-next-testing-source", f"{url}&repo=epel-testing-next-source-$releasever", gpgurls, enabled=False),
+                Repo(
+                    "epel-next-testing-debuginfo",
+                    f"{url}&repo=epel-testing-next-debug-$releasever",
+                    gpgurls,
+                    enabled=False,
+                ),
+                Repo(
+                    "epel-next-testing-source",
+                    f"{url}&repo=epel-testing-next-source-$releasever",
+                    gpgurls,
+                    enabled=False,
+                ),
             ]
 
         return repos
@@ -246,30 +350,90 @@ class CentosInstaller(DistributionInstaller):
                 if config.mirror:
                     if int(config.release) <= 8:
                         repos += [
-                            Repo(f"{sig}-{c}", f"baseurl={urllib.parse.urljoin(config.mirror, f'centos/$stream/{sig}/$basearch/{c}')}", gpgurls, enabled=False),
-                            Repo(f"{sig}-{c}-debuginfo", f"baseurl={urllib.parse.urljoin(config.mirror, f'$stream/{sig}/$basearch')}", gpgurls, enabled=False),
-                            Repo(f"{sig}-{c}-source", f"baseurl={urllib.parse.urljoin(config.mirror, f'centos/$stream/{sig}/Source')}", gpgurls, enabled=False),
+                            Repo(
+                                f"{sig}-{c}",
+                                f"baseurl={join_mirror(config, f'centos/$stream/{sig}/$basearch/{c}')}",
+                                gpgurls,
+                                enabled=False,
+                            ),
+                            Repo(
+                                f"{sig}-{c}-debuginfo",
+                                f"baseurl={join_mirror(config, f'$stream/{sig}/$basearch')}",
+                                gpgurls,
+                                enabled=False,
+                            ),
+                            Repo(
+                                f"{sig}-{c}-source",
+                                f"baseurl={join_mirror(config, f'centos/$stream/{sig}/Source')}",
+                                gpgurls,
+                                enabled=False,
+                            ),
                         ]
                     else:
                         repos += [
-                            Repo(f"{sig}-{c}", f"baseurl={urllib.parse.urljoin(config.mirror, f'SIGs/$stream/{sig}/$basearch/{c}')}", gpgurls, enabled=False),
-                            Repo(f"{sig}-{c}-debuginfo", f"baseurl={urllib.parse.urljoin(config.mirror, f'SIGs/$stream/{sig}/$basearch/{c}/debug')}", gpgurls, enabled=False),
-                            Repo(f"{sig}-{c}-source", f"baseurl={urllib.parse.urljoin(config.mirror, f'SIGs/$stream/{sig}/source/{c}')}", gpgurls, enabled=False),
+                            Repo(
+                                f"{sig}-{c}",
+                                f"baseurl={join_mirror(config, f'SIGs/$stream/{sig}/$basearch/{c}')}",
+                                gpgurls,
+                                enabled=False,
+                            ),
+                            Repo(
+                                f"{sig}-{c}-debuginfo",
+                                f"baseurl={join_mirror(config, f'SIGs/$stream/{sig}/$basearch/{c}/debug')}",
+                                gpgurls,
+                                enabled=False,
+                            ),
+                            Repo(
+                                f"{sig}-{c}-source",
+                                f"baseurl={join_mirror(config, f'SIGs/$stream/{sig}/source/{c}')}",
+                                gpgurls,
+                                enabled=False,
+                            ),
                         ]
                 else:
                     if int(config.release) <= 8:
                         repos += [
-                            Repo(f"{sig}-{c}", f"mirrorlist=http://mirrorlist.centos.org/?release=$stream&arch=$basearch&repo={sig}-{c}", gpgurls, enabled=False),
+                            Repo(
+                                f"{sig}-{c}",
+                                f"mirrorlist=http://mirrorlist.centos.org/?release=$stream&arch=$basearch&repo={sig}-{c}",
+                                gpgurls,
+                                enabled=False,
+                            ),
                             # These can't be retrieved from the mirrorlist.
-                            Repo(f"{sig}-{c}-debuginfo", f"baseurl=http://debuginfo.centos.org/centos/$stream/{sig}/$basearch", gpgurls, enabled=False),
-                            Repo(f"{sig}-{c}-source", f"baseurl=https://vault.centos.org/$stream/{sig}/Source/{c}", gpgurls, enabled=False),
+                            Repo(
+                                f"{sig}-{c}-debuginfo",
+                                f"baseurl=http://debuginfo.centos.org/centos/$stream/{sig}/$basearch",
+                                gpgurls,
+                                enabled=False,
+                            ),
+                            Repo(
+                                f"{sig}-{c}-source",
+                                f"baseurl=https://vault.centos.org/$stream/{sig}/Source/{c}",
+                                gpgurls,
+                                enabled=False,
+                            ),
                         ]
                     else:
                         url = "metalink=https://mirrors.centos.org/metalink"
                         repos += [
-                            Repo(f"{sig}-{c}", f"{url}?arch=$basearch&repo=centos-{sig}-sig-{c}-$stream", gpgurls, enabled=False),
-                            Repo(f"{sig}-{c}-debuginfo", f"{url}?arch=$basearch&repo=centos-{sig}-sig-{c}-debug-$stream", gpgurls, enabled=False),
-                            Repo(f"{sig}-{c}-source", f"{url}?arch=source&repo=centos-{sig}-sig-{c}-source-$stream", gpgurls, enabled=False),
+                            Repo(
+                                f"{sig}-{c}",
+                                f"{url}?arch=$basearch&repo=centos-{sig}-sig-{c}-$stream",
+                                gpgurls,
+                                enabled=False,
+                            ),
+                            Repo(
+                                f"{sig}-{c}-debuginfo",
+                                f"{url}?arch=$basearch&repo=centos-{sig}-sig-{c}-debug-$stream",
+                                gpgurls,
+                                enabled=False,
+                            ),
+                            Repo(
+                                f"{sig}-{c}-source",
+                                f"{url}?arch=source&repo=centos-{sig}-sig-{c}-source-$stream",
+                                gpgurls,
+                                enabled=False,
+                            ),
                         ]
 
                     repos += [
index 4133cc6e3035325aa770492b292bc5a43ed17672..1bf86d53242657756b1fd5db5db6b14c6dd87bdc 100644 (file)
@@ -110,7 +110,12 @@ class FedoraInstaller(DistributionInstaller):
             if state.config.release != "rawhide":
                 repos += [
                     Repo("updates", f"{url}&repo=updates-released-f$releasever", gpgurls),
-                    Repo("updates-debuginfo", f"{url}&repo=updates-released-debug-f$releasever", gpgurls, enabled=False),
+                    Repo(
+                        "updates-debuginfo",
+                        f"{url}&repo=updates-released-debug-f$releasever",
+                        gpgurls,
+                        enabled=False,
+                    ),
                     Repo("updates-source", f"{url}&repo=updates-released-source-f$releasever", gpgurls, enabled=False),
                 ]
 
index 89d707b3892bd65046c150898523f2c5b767e51f..ff659b51c5c87c87aed5b1b256a0acd53cbe18f1 100644 (file)
@@ -95,7 +95,9 @@ class OpensuseInstaller(DistributionInstaller):
         else:
             repos = [Repo("repo-oss", f"baseurl={release_url}", fetch_gpgurls(release_url) if not zypper else ())]
             if updates_url is not None:
-                repos += [Repo("repo-update", f"baseurl={updates_url}", fetch_gpgurls(updates_url) if not zypper else ())]
+                repos += [
+                    Repo("repo-update", f"baseurl={updates_url}", fetch_gpgurls(updates_url) if not zypper else ())
+                ]
 
         if zypper:
             setup_zypper(state, repos)
index 50a0162c2a168dd64b05a7f838f16bd934fe7960..ca1f201952b446ec6317f90b526f46436b34580f 100644 (file)
@@ -60,7 +60,9 @@ def apt_cmd(state: MkosiState, command: str) -> list[PathString]:
     debarch = state.config.distribution.architecture(state.config.architecture)
 
     trustedkeys = state.pkgmngr / "etc/apt/trusted.gpg"
-    trustedkeys = trustedkeys if trustedkeys.exists() else f"/usr/share/keyrings/{state.config.distribution}-archive-keyring.gpg"
+    trustedkeys = (
+        trustedkeys if trustedkeys.exists() else f"/usr/share/keyrings/{state.config.distribution}-archive-keyring.gpg"
+    )
     trustedkeys_dir = state.pkgmngr / "etc/apt/trusted.gpg.d"
     trustedkeys_dir = trustedkeys_dir if trustedkeys_dir.exists() else "/usr/share/keyrings"
 
index eb72c758f89dcd37e909aee991b93d2f0488fc3b..a03a9a3cdfd4e63caa314366aaf66be37c9e8e53 100644 (file)
@@ -5,7 +5,8 @@ import contextvars
 import logging
 import os
 import sys
-from typing import Any, Iterator, NoReturn, Optional
+from collections.abc import Iterator
+from typing import Any, NoReturn, Optional
 
 # This global should be initialized after parsing arguments
 ARG_DEBUG = contextvars.ContextVar("debug", default=False)
index 0a5e5fc4a57df7946d3053894d32cb61603f2959..b88494973b485c86db032ef5cbb4eb412137ade5 100644 (file)
@@ -124,7 +124,13 @@ def mount_usr(tree: Optional[Path], umount: bool = True) -> Iterator[None]:
         # If we mounted over /usr, trying to use umount will fail with "target is busy", because umount is
         # being called from /usr, which we're trying to unmount. To work around this issue, we do a lazy
         # unmount.
-        with mount(what=tree / "usr", where=Path("/usr"), operation="--bind", read_only=True, umount=umount, lazy=True):
+        with mount(
+            what=tree / "usr",
+            where=Path("/usr"),
+            operation="--bind",
+            read_only=True,
+            umount=umount, lazy=True
+        ):
             yield
     finally:
         os.environ["PATH"] = old
index 13c51f6a882c72125c742766b6740ff43be7bba8..f8a4db1ee76ef2084ec8b853bf2cef483342f2da 100644 (file)
@@ -1,9 +1,9 @@
 import dataclasses
 import json
 import subprocess
-from collections.abc import Sequence
+from collections.abc import Mapping, Sequence
 from pathlib import Path
-from typing import Any, Mapping, Optional
+from typing import Any, Optional
 
 from mkosi.log import die
 from mkosi.run import run
index 8019bf4122912b60ee1ece15f24c1a4d605c01fe..5423df18064182e007ae0e2046908707bb3cf5b5 100644 (file)
@@ -12,8 +12,9 @@ import subprocess
 import sys
 import tempfile
 import uuid
+from collections.abc import Iterator
 from pathlib import Path
-from typing import Iterator, Optional
+from typing import Optional
 
 from mkosi.architecture import Architecture
 from mkosi.config import (
@@ -227,7 +228,11 @@ def run_qemu(args: MkosiArgs, config: MkosiConfig) -> None:
         die(f"{config.output_format} images cannot be booted with the '{config.qemu_firmware}' firmware")
 
     accel = "tcg"
-    auto = config.qemu_kvm == ConfigFeature.auto and config.architecture.is_native() and qemu_check_kvm_support(log=True)
+    auto = (
+        config.qemu_kvm == ConfigFeature.auto and
+        config.architecture.is_native() and
+        qemu_check_kvm_support(log=True)
+    )
     if config.qemu_kvm == ConfigFeature.enabled or auto:
         accel = "kvm"
 
@@ -278,8 +283,13 @@ def run_qemu(args: MkosiArgs, config: MkosiConfig) -> None:
 
     if config.architecture.supports_smbios():
         for k, v in config.credentials.items():
-            cmdline += ["-smbios", f"type=11,value=io.systemd.credential.binary:{k}={base64.b64encode(v.encode()).decode()}"]
-        cmdline += ["-smbios", f"type=11,value=io.systemd.stub.kernel-cmdline-extra={' '.join(config.kernel_command_line_extra)}"]
+            cmdline += [
+                "-smbios", f"type=11,value=io.systemd.credential.binary:{k}={base64.b64encode(v.encode()).decode()}"
+            ]
+        cmdline += [
+            "-smbios",
+            f"type=11,value=io.systemd.stub.kernel-cmdline-extra={' '.join(config.kernel_command_line_extra)}"
+        ]
 
     # QEMU has built-in logic to look for the BIOS firmware so we don't need to do anything special for that.
     if firmware == QemuFirmware.uefi:
@@ -334,7 +344,10 @@ def run_qemu(args: MkosiArgs, config: MkosiConfig) -> None:
             elif "-kernel" not in args.cmdline:
                 kernel = config.output_dir / config.output_split_kernel
                 if not kernel.exists():
-                    die("No kernel found, please install a kernel in the image or provide a -kernel argument to mkosi qemu")
+                    die(
+                        "No kernel found, please install a kernel in the image "
+                        "or provide a -kernel argument to mkosi qemu"
+                    )
             else:
                 kernel = None
 
@@ -364,7 +377,11 @@ def run_qemu(args: MkosiArgs, config: MkosiConfig) -> None:
                         "-device", "virtio-scsi-pci,id=scsi",
                         "-device", f"scsi-{'cd' if config.qemu_cdrom else 'hd'},drive=mkosi,bootindex=1"]
 
-        if firmware == QemuFirmware.uefi and config.qemu_swtpm != ConfigFeature.disabled and shutil.which("swtpm") is not None:
+        if (
+            firmware == QemuFirmware.uefi and
+            config.qemu_swtpm != ConfigFeature.disabled and
+            shutil.which("swtpm") is not None
+        ):
             sock = stack.enter_context(start_swtpm())
             cmdline += ["-chardev", f"socket,id=chrtpm,path={sock}",
                         "-tpmdev", "emulator,id=tpm0,chardev=chrtpm"]
index 8f77a25980452d912c03297835678e42fcc890cc..0637124092384c9c08cf3024d4ec083c9ad49e1c 100644 (file)
@@ -17,9 +17,10 @@ import sys
 import tempfile
 import textwrap
 import threading
+from collections.abc import Awaitable, Mapping, Sequence
 from pathlib import Path
 from types import TracebackType
-from typing import Any, Awaitable, Mapping, Optional, Sequence, Tuple, Type
+from typing import Any, Optional
 
 from mkosi.log import ARG_DEBUG, ARG_DEBUG_SHELL, die
 from mkosi.types import _FILE, CompletedProcess, PathString, Popen
@@ -58,7 +59,10 @@ def read_subrange(path: Path) -> int:
         die(f"No mapping found for {user or uid} in {path}")
 
     if int(count) < SUBRANGE:
-        die(f"subuid/subgid range length must be at least {SUBRANGE}, got {count} for {user or uid} from line '{line}'")
+        die(
+            f"subuid/subgid range length must be at least {SUBRANGE}, "
+            f"got {count} for {user or uid} from line '{line}'"
+        )
 
     return int(start)
 
@@ -142,7 +146,7 @@ def foreground(*, new_process_group: bool = True) -> None:
         signal.signal(signal.SIGTTOU, old)
 
 
-def ensure_exc_info() -> Tuple[Type[BaseException], BaseException, TracebackType]:
+def ensure_exc_info() -> tuple[type[BaseException], BaseException, TracebackType]:
     exctype, exc, tb = sys.exc_info()
     assert exctype
     assert exc
@@ -339,7 +343,8 @@ def bwrap(
             result = run([*cmdline, *cmd], env=env, log=False, stdin=stdin, stdout=stdout, input=input)
         except subprocess.CalledProcessError as e:
             if log:
-                logging.error(f"\"{shlex.join(os.fspath(s) for s in cmd)}\" returned non-zero exit code {e.returncode}.")
+                c = shlex.join(os.fspath(s) for s in cmd)
+                logging.error(f"\"{c}\" returned non-zero exit code {e.returncode}.")
             if ARG_DEBUG_SHELL.get():
                 run([*cmdline, "sh"], stdin=sys.stdin, check=False, env=env, log=False)
             raise e
@@ -465,7 +470,7 @@ class MkosiAsyncioThread(threading.Thread):
 
     def __exit__(
         self,
-        type: Optional[Type[BaseException]],
+        type: Optional[type[BaseException]],
         value: Optional[BaseException],
         traceback: Optional[TracebackType],
     ) -> None:
index 11de8203a3cb2fe79ed6a781275a5f7645f6d234..3a8403117ad6c9dd2f429646e3300ff74744f795 100644 (file)
@@ -67,8 +67,7 @@ def sort_packages(packages: Iterable[str]) -> list[str]:
     """Sorts packages: normal first, paths second, conditional third"""
 
     m = {"(": 2, "/": 1}
-    sort = lambda name: (m.get(name[0], 0), name)
-    return sorted(packages, key=sort)
+    return sorted(packages, key=lambda name: (m.get(name[0], 0), name))
 
 
 def flatten(lists: Iterable[Iterable[T]]) -> list[T]:
@@ -150,7 +149,10 @@ def qemu_check_vsock_support(log: bool) -> bool:
             return False
         elif e.errno in (errno.EPERM, errno.EACCES):
             if log:
-                logging.warning("Permission denied to access /dev/vhost-vsock. Not adding a vsock device to the virtual machine.")
+                logging.warning(
+                    "Permission denied to access /dev/vhost-vsock. "
+                    "Not adding a vsock device to the virtual machine."
+                )
             return False
 
         raise e
index 7f4608a277958211242349ecdb415be4e19edb33..1ecdf3c9c0b0842023198f8c0cba5697cad50d0d 100644 (file)
@@ -26,13 +26,13 @@ def test_compression_enum_creation() -> None:
 
 
 def test_compression_enum_bool() -> None:
-    assert bool(Compression.none) == False
-    assert bool(Compression.zst)  == True
-    assert bool(Compression.xz)   == True
-    assert bool(Compression.bz2)  == True
-    assert bool(Compression.gz)   == True
-    assert bool(Compression.lz4)  == True
-    assert bool(Compression.lzma) == True
+    assert not bool(Compression.none)
+    assert bool(Compression.zst)
+    assert bool(Compression.xz)
+    assert bool(Compression.bz2)
+    assert bool(Compression.gz)
+    assert bool(Compression.lz4)
+    assert bool(Compression.lzma)
 
 
 def test_compression_enum_str() -> None:
index b6f8e8c94f8e143ed19dfe86a52d92e689d5586f..98ec9d2849c64cf2e539305e006e56bda6ec5708 100644 (file)
@@ -67,8 +67,14 @@ def test_generic_version_spec() -> None:
         2
     )
 )
-def test_generic_version_strverscmp_improved_doc(s1: tuple[int, GenericVersion], s2: tuple[int, GenericVersion]) -> None:
-    """Example from the doc string of strverscmp_improved in systemd/src/fundamental/string-util-fundamental.c"""
+def test_generic_version_strverscmp_improved_doc(
+    s1: tuple[int, GenericVersion],
+    s2: tuple[int, GenericVersion],
+) -> None:
+    """Example from the doc string of strverscmp_improved.
+
+    strverscmp_improved can be found in systemd/src/fundamental/string-util-fundamental.c
+    """
     i1, v1 = s1
     i2, v2 = s2
     assert (v1 == v2) == (i1 == i2)