import uuid
from collections.abc import Iterator, Sequence
from pathlib import Path
-from typing import ContextManager, Optional, TextIO, Union
+from typing import Optional, TextIO, Union
from mkosi.archive import extract_tar, make_cpio, make_tar
from mkosi.config import (
if not need_build_packages(state.config):
return
- with complete_step(f"Installing build packages for {str(state.config.distribution).capitalize()}"), mount_build_overlay(state):
+ # TODO: move to parenthesised context managers once on 3.10
+ pd = str(state.config.distribution).capitalize()
+ with complete_step(f"Installing build packages for {pd}"), mount_build_overlay(state):
state.config.distribution.install_packages(state, state.config.build_packages)
yield
-def mount_build_overlay(state: MkosiState, read_only: bool = False) -> ContextManager[Path]:
+def mount_build_overlay(state: MkosiState, read_only: bool = False) -> contextlib.AbstractContextManager[Path]:
d = state.workspace / "build-overlay"
if not d.is_symlink():
with umask(~0o755):
def find_grub_binary(state: MkosiState, binary: str) -> Optional[Path]:
path = ":".join(os.fspath(p) for p in [state.root / "usr/bin", state.root / "usr/sbin"])
- assert "grub" in binary and not "grub2" in binary
+ assert "grub" in binary and "grub2" not in binary
path = shutil.which(binary, path=path) or shutil.which(binary.replace("grub", "grub2"), path=path)
if not path:
kimg = Path(shutil.copy2(state.root / kimg, kdst / "vmlinuz"))
kmods = Path(shutil.copy2(kmods, kdst / "kmods"))
+ distribution = state.config.distribution
+ image = kimg.relative_to(state.root / "efi")
+ cmdline = " ".join(state.config.kernel_command_line)
+ initrd = initrd.relative_to(state.root / "efi")
+ kmods = kmods.relative_to(state.root / "efi")
+
f.write(
textwrap.dedent(
f"""\
- menuentry "{state.config.distribution}-{kver}" {{
- linux /{kimg.relative_to(state.root / "efi")} {root} {" ".join(state.config.kernel_command_line)}
- initrd /{initrd.relative_to(state.root / "efi")} /{kmods.relative_to(state.root / "efi")}
+ menuentry "{distribution}-{kver}" {{
+ linux /{image} {root} {cmdline}
+ initrd /{initrd} /{kmods}
}}
"""
)
"--make-initrd", "yes",
"--bootable", "no",
"--manifest-format", "",
- *(["--source-date-epoch", str(state.config.source_date_epoch)] if state.config.source_date_epoch is not None else []),
+ *(
+ ["--source-date-epoch", str(state.config.source_date_epoch)]
+ if state.config.source_date_epoch is not None else
+ []
+ ),
*(["--locale", state.config.locale] if state.config.locale else []),
*(["--locale-messages", state.config.locale_messages] if state.config.locale_messages else []),
*(["--keymap", state.config.keymap] if state.config.keymap else []),
shutil.copy(state.root / kimg, state.staging / state.config.output_split_kernel)
break
- if state.config.output_format in (OutputFormat.cpio, OutputFormat.uki) and state.config.bootable == ConfigFeature.auto:
+ if (
+ state.config.output_format in (OutputFormat.cpio, OutputFormat.uki) and
+ state.config.bootable == ConfigFeature.auto
+ ):
return
if state.config.architecture.to_efi() is None and state.config.bootable == ConfigFeature.auto:
if state.config.bootloader == Bootloader.uki:
boot_binary = state.root / "efi/EFI/BOOT/BOOTX64.EFI"
elif state.config.image_version:
- boot_binary = state.root / f"efi/EFI/Linux/{image_id}_{state.config.image_version}-{kver}{boot_count}.efi"
+ boot_binary = (
+ state.root / f"efi/EFI/Linux/{image_id}_{state.config.image_version}-{kver}{boot_count}.efi"
+ )
elif roothash:
_, _, h = roothash.partition("=")
boot_binary = state.root / f"efi/EFI/Linux/{image_id}-{kver}-{h}{boot_count}.efi"
# Set the path of the keyring to use based on the environment if possible and fallback to the default
# path. Without this the keyring for the root user will instead be used which will fail for a
# non-root build.
- env = dict(GNUPGHOME=os.environ.get("GNUPGHOME", os.fspath(((Path(os.environ["HOME"]) / ".gnupg")))))
+ env = dict(GNUPGHOME=os.environ.get("GNUPGHOME", os.fspath(Path(os.environ["HOME"]) / ".gnupg")))
if sys.stderr.isatty():
env |= dict(GPGTTY=os.ttyname(sys.stderr.fileno()))
except ValueError:
new_version = version + ".2"
logging.info(
- f"Last component of current version is not a decimal integer, appending '.2', bumping '{version}' → '{new_version}'."
+ "Last component of current version is not a decimal integer, "
+ f"appending '.2', bumping '{version}' → '{new_version}'."
)
else:
new_version = ".".join(v[:-1] + [str(m + 1)])
def needs_build(args: MkosiArgs, config: MkosiConfig) -> bool:
- return args.verb.needs_build() and (args.force > 0 or not (config.output_dir / config.output_with_compression).exists())
+ return (
+ args.verb.needs_build() and
+ (args.force > 0 or not (config.output_dir / config.output_with_compression).exists())
+ )
@contextlib.contextmanager
"--incremental", str(p.incremental),
"--acl", str(p.acl),
"--format", "directory",
- *flatten(["--package", package] for package in itertools.chain(distribution.tools_tree_packages(), p.tools_tree_packages)),
+ *flatten(
+ ["--package", package]
+ for package in itertools.chain(distribution.tools_tree_packages(), p.tools_tree_packages))
+ ,
"--output", f"{distribution}-tools",
"--bootable", "no",
"--manifest-format", "",
continue
(Path("/") / subdir).mkdir(parents=True, exist_ok=True)
- stack.enter_context(mount(what=tree / subdir, where=Path("/") / subdir, operation="--bind", read_only=True))
+ stack.enter_context(
+ mount(what=tree / subdir, where=Path("/") / subdir, operation="--bind", read_only=True)
+ )
yield
import subprocess
import textwrap
import uuid
-from collections.abc import Iterable, Sequence
+from collections.abc import Iterable, Iterator, Sequence
from pathlib import Path
-from typing import Any, Callable, Iterator, Optional, Type, Union, cast
+from typing import Any, Callable, Optional, Union, cast
from mkosi.architecture import Architecture
from mkosi.distributions import Distribution, detect_distribution
def config_default_compression(namespace: argparse.Namespace) -> Compression:
if namespace.output_format in (OutputFormat.cpio, OutputFormat.uki):
- return Compression.xz if namespace.distribution.is_centos_variant() and int(namespace.release) <= 8 else Compression.zst
+ if namespace.distribution.is_centos_variant() and int(namespace.release) <= 8:
+ return Compression.xz
+ else:
+ return Compression.zst
else:
return Compression.none
return config_parse_source_date_epoch(os.environ.get("SOURCE_DATE_EPOCH"), None)
-def make_enum_parser(type: Type[enum.Enum]) -> Callable[[str], enum.Enum]:
+def make_enum_parser(type: type[enum.Enum]) -> Callable[[str], enum.Enum]:
def parse_enum(value: str) -> enum.Enum:
try:
return type(value)
return parse_enum
-def config_make_enum_parser(type: Type[enum.Enum]) -> ConfigParseCallback:
+def config_make_enum_parser(type: type[enum.Enum]) -> ConfigParseCallback:
def config_parse_enum(value: Optional[str], old: Optional[enum.Enum]) -> Optional[enum.Enum]:
return make_enum_parser(type)(value) if value else None
return config_parse_enum
-def config_make_enum_matcher(type: Type[enum.Enum]) -> ConfigMatchCallback:
+def config_make_enum_matcher(type: type[enum.Enum]) -> ConfigMatchCallback:
def config_match_enum(match: str, value: enum.Enum) -> bool:
return make_enum_parser(type)(match) == value
subsequent_indent=subindent) for line in lines)
-def config_make_action(settings: Sequence[MkosiConfigSetting]) -> Type[argparse.Action]:
+def config_make_action(settings: Sequence[MkosiConfigSetting]) -> type[argparse.Action]:
lookup = {s.dest: s for s in settings}
class MkosiAction(argparse.Action):
setting: Optional[str] = None
value: Optional[str] = None
- for l in textwrap.dedent(path.read_text()).splitlines():
+ for line in textwrap.dedent(path.read_text()).splitlines():
# Systemd unit files allow both '#' and ';' to indicate comments so we do the same.
for c in ("#", ";"):
- comment = l.find(c)
+ comment = line.find(c)
if comment >= 0:
- l = l[:comment]
+ line = line[:comment]
- if not l.strip():
+ if not line.strip():
continue
# If we have a section, setting and value, any line that's indented is considered part of the
# setting's value.
- if section and setting and value is not None and l[0].isspace():
- value = f"{value}\n{l.strip()}"
+ if section and setting and value is not None and line[0].isspace():
+ value = f"{value}\n{line.strip()}"
continue
# So the line is not indented, that means we either found a new section or a new setting. Either way,
yield section, setting, value
setting = value = None
- l = l.strip()
+ line = line.strip()
- if l[0] == '[':
- if l[-1] != ']':
- die(f"{l} is not a valid section")
+ if line[0] == '[':
+ if line[-1] != ']':
+ die(f"{line} is not a valid section")
- section = l[1:-1].strip()
+ section = line[1:-1].strip()
if not section:
die("Section name cannot be empty or whitespace")
continue
if not section:
- die(f"Setting {l} is located outside of section")
+ die(f"Setting {line} is located outside of section")
if only_sections and section not in only_sections:
continue
- setting, delimiter, value = l.partition("=")
+ setting, delimiter, value = line.partition("=")
if not delimiter:
die(f"Setting {setting} must be followed by '='")
if not setting:
- die(f"Missing setting name before '=' in {l}")
+ die(f"Missing setting name before '=' in {line}")
setting = setting.strip()
value = value.strip()
if path.exists():
logging.debug(f"Including configuration file {Path.cwd() / path}")
- for section, k, v in parse_ini(path, only_sections=["Distribution", "Output", "Content", "Validation", "Host", "Preset"]):
+ for section, k, v in parse_ini(
+ path, only_sections=["Distribution", "Output", "Content", "Validation", "Host", "Preset"]
+ ):
ns = defaults if k.startswith("@") else namespace
if not (s := settings_lookup_by_name.get(k.removeprefix("@"))):
if "firstboot.locale" not in creds:
creds["firstboot.locale"] = "C.UTF-8"
- if args.ssh and "ssh.authorized_keys.root" not in creds and "SSH_AUTH_SOCK" in os.environ and shutil.which("ssh-add"):
+ if (
+ args.ssh and
+ "ssh.authorized_keys.root" not in creds and
+ "SSH_AUTH_SOCK" in os.environ and shutil.which("ssh-add")
+ ):
key = run(
["ssh-add", "-L"],
stdout=subprocess.PIPE,
if args.incremental and not args.cache_dir:
die("A cache directory must be configured in order to use --incremental")
- # For unprivileged builds we need the userxattr OverlayFS mount option, which is only available in Linux v5.11 and later.
- if (args.build_script is not None or args.base_trees) and GenericVersion(platform.release()) < GenericVersion("5.11") and os.geteuid() != 0:
+ # For unprivileged builds we need the userxattr OverlayFS mount option, which is only available
+ # in Linux v5.11 and later.
+ if (
+ (args.build_script is not None or args.base_trees) and
+ GenericVersion(platform.release()) < GenericVersion("5.11") and
+ os.geteuid() != 0
+ ):
die("This unprivileged build configuration requires at least Linux v5.11")
return MkosiConfig.from_namespace(args)
def summary(args: MkosiArgs, config: MkosiConfig) -> str:
- b = Style.bold
- e = Style.reset
- bold: Callable[..., str] = lambda s: f"{b}{s}{e}"
+ def bold(s: Any) -> str:
+ return f"{Style.bold}{s}{Style.reset}"
maniformats = (" ".join(i.name for i in config.manifest_format)) or "(none)"
env = [f"{k}={v}" for k, v in config.environment.items()]
import importlib
import re
from collections.abc import Sequence
-from typing import TYPE_CHECKING, Optional, Type, cast
+from typing import TYPE_CHECKING, Optional, cast
from mkosi.architecture import Architecture
from mkosi.log import die
def tools_tree_packages(self) -> list[str]:
return self.installer().tools_tree_packages()
- def installer(self) -> Type[DistributionInstaller]:
+ def installer(self) -> type[DistributionInstaller]:
try:
mod = importlib.import_module(f"mkosi.distributions.{self}")
installer = getattr(mod, f"{str(self).title().replace('_','')}Installer")
if not issubclass(installer, DistributionInstaller):
die(f"Distribution installer for {self} is not a subclass of DistributionInstaller")
- return cast(Type[DistributionInstaller], installer)
+ return cast(type[DistributionInstaller], installer)
except (ImportError, AttributeError):
die("No installer for this distribution.")
newdb.symlink_to(os.path.relpath(olddb, start=newdb.parent))
-class CentosInstaller(DistributionInstaller):
+def join_mirror(config: MkosiConfig, link: str) -> str:
+ assert config.mirror is not None
+ return urllib.parse.urljoin(config.mirror, link)
+
+class CentosInstaller(DistributionInstaller):
@classmethod
def filesystem(cls) -> str:
return "xfs"
if config.mirror:
if int(config.release) <= 8:
return [
- Repo(repo.lower(), f"baseurl={urllib.parse.urljoin(config.mirror, f'centos/$stream/{repo}/$basearch/os')}", cls.gpgurls()),
- Repo(f"{repo.lower()}-debuginfo", f"baseurl={urllib.parse.urljoin(config.mirror, 'centos-debuginfo/$stream/$basearch')}", cls.gpgurls(), enabled=False),
- Repo(f"{repo.lower()}-source", f"baseurl={urllib.parse.urljoin(config.mirror, f'centos/$stream/{repo}/Source')}", cls.gpgurls(), enabled=False),
+ Repo(
+ repo.lower(),
+ f"baseurl={join_mirror(config, f'centos/$stream/{repo}/$basearch/os')}",
+ cls.gpgurls()
+ ),
+ Repo(
+ f"{repo.lower()}-debuginfo",
+ f"baseurl={join_mirror(config, 'centos-debuginfo/$stream/$basearch')}",
+ cls.gpgurls(),
+ enabled=False,
+ ),
+ Repo(
+ f"{repo.lower()}-source",
+ f"baseurl={join_mirror(config, f'centos/$stream/{repo}/Source')}",
+ cls.gpgurls(),
+ enabled=False,
+ ),
]
else:
if repo == "extras":
return [
- Repo(repo.lower(), f"baseurl={urllib.parse.urljoin(config.mirror, f'SIGs/$stream/{repo}/$basearch/extras-common')}", cls.gpgurls()),
- Repo(f"{repo.lower()}-source", f"baseurl={urllib.parse.urljoin(config.mirror, f'SIGs/$stream/{repo}/source/extras-common')}", cls.gpgurls(), enabled=False),
+ Repo(
+ repo.lower(),
+ f"baseurl={join_mirror(config, f'SIGs/$stream/{repo}/$basearch/extras-common')}",
+ cls.gpgurls(),
+ ),
+ Repo(
+ f"{repo.lower()}-source",
+ f"baseurl={join_mirror(config, f'SIGs/$stream/{repo}/source/extras-common')}",
+ cls.gpgurls(),
+ enabled=False,
+ ),
]
return [
- Repo(repo.lower(), f"baseurl={urllib.parse.urljoin(config.mirror, f'$stream/{repo}/$basearch/os')}", cls.gpgurls()),
- Repo(f"{repo.lower()}-debuginfo", f"baseurl={urllib.parse.urljoin(config.mirror, f'$stream/{repo}/$basearch/debug/tree')}", cls.gpgurls(), enabled=False),
- Repo(f"{repo.lower()}-source", f"baseurl={urllib.parse.urljoin(config.mirror, f'$stream/{repo}/source/tree')}", cls.gpgurls(), enabled=False),
+ Repo(
+ repo.lower(),
+ f"baseurl={join_mirror(config, f'$stream/{repo}/$basearch/os')}",
+ cls.gpgurls(),
+ ),
+ Repo(
+ f"{repo.lower()}-debuginfo",
+ f"baseurl={join_mirror(config, f'$stream/{repo}/$basearch/debug/tree')}",
+ cls.gpgurls(),
+ enabled=False,
+ ),
+ Repo(
+ f"{repo.lower()}-source",
+ f"baseurl={join_mirror(config, f'$stream/{repo}/source/tree')}",
+ cls.gpgurls(),
+ enabled=False,
+ ),
]
else:
if int(config.release) <= 8:
return [
- Repo(repo.lower(), f"mirrorlist=http://mirrorlist.centos.org/?release=$stream&arch=$basearch&repo={repo}", cls.gpgurls()),
+ Repo(
+ repo.lower(),
+ f"mirrorlist=http://mirrorlist.centos.org/?release=$stream&arch=$basearch&repo={repo}",
+ cls.gpgurls(),
+ ),
# These can't be retrieved from the mirrorlist.
- Repo(f"{repo.lower()}-debuginfo", "baseurl=http://debuginfo.centos.org/$stream/$basearch", cls.gpgurls(), enabled=False),
- Repo(f"{repo.lower()}-source", f"baseurl=https://vault.centos.org/centos/$stream/{repo}/Source", cls.gpgurls(), enabled=False),
+ Repo(
+ f"{repo.lower()}-debuginfo",
+ "baseurl=http://debuginfo.centos.org/$stream/$basearch",
+ cls.gpgurls(),
+ enabled=False,
+ ),
+ Repo(
+ f"{repo.lower()}-source",
+ f"baseurl=https://vault.centos.org/centos/$stream/{repo}/Source",
+ cls.gpgurls(),
+ enabled=False,
+ ),
]
else:
url = "metalink=https://mirrors.centos.org/metalink"
if repo == "extras":
return [
- Repo(repo.lower(), f"{url}?arch=$basearch&repo=centos-extras-sig-extras-common-$stream", cls.gpgurls()),
- Repo(f"{repo.lower()}-source", f"{url}?arch=source&repo=centos-extras-sig-extras-common-source-$stream", cls.gpgurls(), enabled=False),
+ Repo(
+ repo.lower(),
+ f"{url}?arch=$basearch&repo=centos-extras-sig-extras-common-$stream",
+ cls.gpgurls(),
+ ),
+ Repo(
+ f"{repo.lower()}-source",
+ f"{url}?arch=source&repo=centos-extras-sig-extras-common-source-$stream",
+ cls.gpgurls(),
+ enabled=False,
+ ),
]
return [
Repo(repo.lower(), f"{url}?arch=$basearch&repo=centos-{repo.lower()}-$stream", cls.gpgurls()),
- Repo(f"{repo.lower()}-debuginfo", f"{url}?arch=$basearch&repo=centos-{repo.lower()}-debug-$stream", cls.gpgurls(), enabled=False),
- Repo(f"{repo.lower()}-source", f"{url}?arch=source&repo=centos-{repo.lower()}-source-$stream", cls.gpgurls(), enabled=False),
+ Repo(
+ f"{repo.lower()}-debuginfo",
+ f"{url}?arch=$basearch&repo=centos-{repo.lower()}-debug-$stream",
+ cls.gpgurls(),
+ enabled=False,
+ ),
+ Repo(
+ f"{repo.lower()}-source",
+ f"{url}?arch=source&repo=centos-{repo.lower()}-source-$stream",
+ cls.gpgurls(),
+ enabled=False,
+ ),
]
@classmethod
repos = []
if config.mirror:
- for repo, dir in (("epel", "epel"), ("epel-next", "epel/next"), ("epel-testing", "epel/testing"), ("epel-next-testing", "epel/testing/next")):
+ for repo, dir in (
+ ("epel", "epel"),
+ ("epel-next", "epel/next"),
+ ("epel-testing", "epel/testing"),
+ ("epel-next-testing", "epel/testing/next")
+ ):
repos += [
- Repo(repo, f"baseurl={urllib.parse.urljoin(config.mirror, f'{dir}/$releasever/Everything/$basearch')}", gpgurls, enabled=False),
- Repo(f"{repo}-debuginfo", f"baseurl={urllib.parse.urljoin(config.mirror, f'{dir}/$releasever/Everything/$basearch/debug')}", gpgurls, enabled=False),
- Repo(f"{repo}-source", f"baseurl={urllib.parse.urljoin(config.mirror, f'{dir}/$releasever/Everything/source/tree')}", gpgurls, enabled=False),
+ Repo(
+ repo,
+ f"baseurl={join_mirror(config, f'{dir}/$releasever/Everything/$basearch')}",
+ gpgurls,
+ enabled=False,
+ ),
+ Repo(
+ f"{repo}-debuginfo",
+ f"baseurl={join_mirror(config, f'{dir}/$releasever/Everything/$basearch/debug')}",
+ gpgurls,
+ enabled=False,
+ ),
+ Repo(
+ f"{repo}-source",
+ f"baseurl={join_mirror(config, f'{dir}/$releasever/Everything/source/tree')}",
+ gpgurls,
+ enabled=False,
+ ),
]
else:
url = "metalink=https://mirrors.fedoraproject.org/metalink?arch=$basearch"
Repo("epel-testing-debuginfo", f"{url}&repo=testing-debug-epel$releasever", gpgurls, enabled=False),
Repo("epel-testing-source", f"{url}&repo=testing-source-epel$releasever", gpgurls, enabled=False),
Repo("epel-next-testing", f"{url}&repo=epel-testing-next-$releasever", gpgurls, enabled=False),
- Repo("epel-next-testing-debuginfo", f"{url}&repo=epel-testing-next-debug-$releasever", gpgurls, enabled=False),
- Repo("epel-next-testing-source", f"{url}&repo=epel-testing-next-source-$releasever", gpgurls, enabled=False),
+ Repo(
+ "epel-next-testing-debuginfo",
+ f"{url}&repo=epel-testing-next-debug-$releasever",
+ gpgurls,
+ enabled=False,
+ ),
+ Repo(
+ "epel-next-testing-source",
+ f"{url}&repo=epel-testing-next-source-$releasever",
+ gpgurls,
+ enabled=False,
+ ),
]
return repos
if config.mirror:
if int(config.release) <= 8:
repos += [
- Repo(f"{sig}-{c}", f"baseurl={urllib.parse.urljoin(config.mirror, f'centos/$stream/{sig}/$basearch/{c}')}", gpgurls, enabled=False),
- Repo(f"{sig}-{c}-debuginfo", f"baseurl={urllib.parse.urljoin(config.mirror, f'$stream/{sig}/$basearch')}", gpgurls, enabled=False),
- Repo(f"{sig}-{c}-source", f"baseurl={urllib.parse.urljoin(config.mirror, f'centos/$stream/{sig}/Source')}", gpgurls, enabled=False),
+ Repo(
+ f"{sig}-{c}",
+ f"baseurl={join_mirror(config, f'centos/$stream/{sig}/$basearch/{c}')}",
+ gpgurls,
+ enabled=False,
+ ),
+ Repo(
+ f"{sig}-{c}-debuginfo",
+ f"baseurl={join_mirror(config, f'$stream/{sig}/$basearch')}",
+ gpgurls,
+ enabled=False,
+ ),
+ Repo(
+ f"{sig}-{c}-source",
+ f"baseurl={join_mirror(config, f'centos/$stream/{sig}/Source')}",
+ gpgurls,
+ enabled=False,
+ ),
]
else:
repos += [
- Repo(f"{sig}-{c}", f"baseurl={urllib.parse.urljoin(config.mirror, f'SIGs/$stream/{sig}/$basearch/{c}')}", gpgurls, enabled=False),
- Repo(f"{sig}-{c}-debuginfo", f"baseurl={urllib.parse.urljoin(config.mirror, f'SIGs/$stream/{sig}/$basearch/{c}/debug')}", gpgurls, enabled=False),
- Repo(f"{sig}-{c}-source", f"baseurl={urllib.parse.urljoin(config.mirror, f'SIGs/$stream/{sig}/source/{c}')}", gpgurls, enabled=False),
+ Repo(
+ f"{sig}-{c}",
+ f"baseurl={join_mirror(config, f'SIGs/$stream/{sig}/$basearch/{c}')}",
+ gpgurls,
+ enabled=False,
+ ),
+ Repo(
+ f"{sig}-{c}-debuginfo",
+ f"baseurl={join_mirror(config, f'SIGs/$stream/{sig}/$basearch/{c}/debug')}",
+ gpgurls,
+ enabled=False,
+ ),
+ Repo(
+ f"{sig}-{c}-source",
+ f"baseurl={join_mirror(config, f'SIGs/$stream/{sig}/source/{c}')}",
+ gpgurls,
+ enabled=False,
+ ),
]
else:
if int(config.release) <= 8:
repos += [
- Repo(f"{sig}-{c}", f"mirrorlist=http://mirrorlist.centos.org/?release=$stream&arch=$basearch&repo={sig}-{c}", gpgurls, enabled=False),
+ Repo(
+ f"{sig}-{c}",
+ f"mirrorlist=http://mirrorlist.centos.org/?release=$stream&arch=$basearch&repo={sig}-{c}",
+ gpgurls,
+ enabled=False,
+ ),
# These can't be retrieved from the mirrorlist.
- Repo(f"{sig}-{c}-debuginfo", f"baseurl=http://debuginfo.centos.org/centos/$stream/{sig}/$basearch", gpgurls, enabled=False),
- Repo(f"{sig}-{c}-source", f"baseurl=https://vault.centos.org/$stream/{sig}/Source/{c}", gpgurls, enabled=False),
+ Repo(
+ f"{sig}-{c}-debuginfo",
+ f"baseurl=http://debuginfo.centos.org/centos/$stream/{sig}/$basearch",
+ gpgurls,
+ enabled=False,
+ ),
+ Repo(
+ f"{sig}-{c}-source",
+ f"baseurl=https://vault.centos.org/$stream/{sig}/Source/{c}",
+ gpgurls,
+ enabled=False,
+ ),
]
else:
url = "metalink=https://mirrors.centos.org/metalink"
repos += [
- Repo(f"{sig}-{c}", f"{url}?arch=$basearch&repo=centos-{sig}-sig-{c}-$stream", gpgurls, enabled=False),
- Repo(f"{sig}-{c}-debuginfo", f"{url}?arch=$basearch&repo=centos-{sig}-sig-{c}-debug-$stream", gpgurls, enabled=False),
- Repo(f"{sig}-{c}-source", f"{url}?arch=source&repo=centos-{sig}-sig-{c}-source-$stream", gpgurls, enabled=False),
+ Repo(
+ f"{sig}-{c}",
+ f"{url}?arch=$basearch&repo=centos-{sig}-sig-{c}-$stream",
+ gpgurls,
+ enabled=False,
+ ),
+ Repo(
+ f"{sig}-{c}-debuginfo",
+ f"{url}?arch=$basearch&repo=centos-{sig}-sig-{c}-debug-$stream",
+ gpgurls,
+ enabled=False,
+ ),
+ Repo(
+ f"{sig}-{c}-source",
+ f"{url}?arch=source&repo=centos-{sig}-sig-{c}-source-$stream",
+ gpgurls,
+ enabled=False,
+ ),
]
repos += [
if state.config.release != "rawhide":
repos += [
Repo("updates", f"{url}&repo=updates-released-f$releasever", gpgurls),
- Repo("updates-debuginfo", f"{url}&repo=updates-released-debug-f$releasever", gpgurls, enabled=False),
+ Repo(
+ "updates-debuginfo",
+ f"{url}&repo=updates-released-debug-f$releasever",
+ gpgurls,
+ enabled=False,
+ ),
Repo("updates-source", f"{url}&repo=updates-released-source-f$releasever", gpgurls, enabled=False),
]
else:
repos = [Repo("repo-oss", f"baseurl={release_url}", fetch_gpgurls(release_url) if not zypper else ())]
if updates_url is not None:
- repos += [Repo("repo-update", f"baseurl={updates_url}", fetch_gpgurls(updates_url) if not zypper else ())]
+ repos += [
+ Repo("repo-update", f"baseurl={updates_url}", fetch_gpgurls(updates_url) if not zypper else ())
+ ]
if zypper:
setup_zypper(state, repos)
debarch = state.config.distribution.architecture(state.config.architecture)
trustedkeys = state.pkgmngr / "etc/apt/trusted.gpg"
- trustedkeys = trustedkeys if trustedkeys.exists() else f"/usr/share/keyrings/{state.config.distribution}-archive-keyring.gpg"
+ trustedkeys = (
+ trustedkeys if trustedkeys.exists() else f"/usr/share/keyrings/{state.config.distribution}-archive-keyring.gpg"
+ )
trustedkeys_dir = state.pkgmngr / "etc/apt/trusted.gpg.d"
trustedkeys_dir = trustedkeys_dir if trustedkeys_dir.exists() else "/usr/share/keyrings"
import logging
import os
import sys
-from typing import Any, Iterator, NoReturn, Optional
+from collections.abc import Iterator
+from typing import Any, NoReturn, Optional
# This global should be initialized after parsing arguments
ARG_DEBUG = contextvars.ContextVar("debug", default=False)
# If we mounted over /usr, trying to use umount will fail with "target is busy", because umount is
# being called from /usr, which we're trying to unmount. To work around this issue, we do a lazy
# unmount.
- with mount(what=tree / "usr", where=Path("/usr"), operation="--bind", read_only=True, umount=umount, lazy=True):
+ with mount(
+ what=tree / "usr",
+ where=Path("/usr"),
+ operation="--bind",
+ read_only=True,
+ umount=umount, lazy=True
+ ):
yield
finally:
os.environ["PATH"] = old
import dataclasses
import json
import subprocess
-from collections.abc import Sequence
+from collections.abc import Mapping, Sequence
from pathlib import Path
-from typing import Any, Mapping, Optional
+from typing import Any, Optional
from mkosi.log import die
from mkosi.run import run
import sys
import tempfile
import uuid
+from collections.abc import Iterator
from pathlib import Path
-from typing import Iterator, Optional
+from typing import Optional
from mkosi.architecture import Architecture
from mkosi.config import (
die(f"{config.output_format} images cannot be booted with the '{config.qemu_firmware}' firmware")
accel = "tcg"
- auto = config.qemu_kvm == ConfigFeature.auto and config.architecture.is_native() and qemu_check_kvm_support(log=True)
+ auto = (
+ config.qemu_kvm == ConfigFeature.auto and
+ config.architecture.is_native() and
+ qemu_check_kvm_support(log=True)
+ )
if config.qemu_kvm == ConfigFeature.enabled or auto:
accel = "kvm"
if config.architecture.supports_smbios():
for k, v in config.credentials.items():
- cmdline += ["-smbios", f"type=11,value=io.systemd.credential.binary:{k}={base64.b64encode(v.encode()).decode()}"]
- cmdline += ["-smbios", f"type=11,value=io.systemd.stub.kernel-cmdline-extra={' '.join(config.kernel_command_line_extra)}"]
+ cmdline += [
+ "-smbios", f"type=11,value=io.systemd.credential.binary:{k}={base64.b64encode(v.encode()).decode()}"
+ ]
+ cmdline += [
+ "-smbios",
+ f"type=11,value=io.systemd.stub.kernel-cmdline-extra={' '.join(config.kernel_command_line_extra)}"
+ ]
# QEMU has built-in logic to look for the BIOS firmware so we don't need to do anything special for that.
if firmware == QemuFirmware.uefi:
elif "-kernel" not in args.cmdline:
kernel = config.output_dir / config.output_split_kernel
if not kernel.exists():
- die("No kernel found, please install a kernel in the image or provide a -kernel argument to mkosi qemu")
+ die(
+ "No kernel found, please install a kernel in the image "
+ "or provide a -kernel argument to mkosi qemu"
+ )
else:
kernel = None
"-device", "virtio-scsi-pci,id=scsi",
"-device", f"scsi-{'cd' if config.qemu_cdrom else 'hd'},drive=mkosi,bootindex=1"]
- if firmware == QemuFirmware.uefi and config.qemu_swtpm != ConfigFeature.disabled and shutil.which("swtpm") is not None:
+ if (
+ firmware == QemuFirmware.uefi and
+ config.qemu_swtpm != ConfigFeature.disabled and
+ shutil.which("swtpm") is not None
+ ):
sock = stack.enter_context(start_swtpm())
cmdline += ["-chardev", f"socket,id=chrtpm,path={sock}",
"-tpmdev", "emulator,id=tpm0,chardev=chrtpm"]
import tempfile
import textwrap
import threading
+from collections.abc import Awaitable, Mapping, Sequence
from pathlib import Path
from types import TracebackType
-from typing import Any, Awaitable, Mapping, Optional, Sequence, Tuple, Type
+from typing import Any, Optional
from mkosi.log import ARG_DEBUG, ARG_DEBUG_SHELL, die
from mkosi.types import _FILE, CompletedProcess, PathString, Popen
die(f"No mapping found for {user or uid} in {path}")
if int(count) < SUBRANGE:
- die(f"subuid/subgid range length must be at least {SUBRANGE}, got {count} for {user or uid} from line '{line}'")
+ die(
+ f"subuid/subgid range length must be at least {SUBRANGE}, "
+ f"got {count} for {user or uid} from line '{line}'"
+ )
return int(start)
signal.signal(signal.SIGTTOU, old)
-def ensure_exc_info() -> Tuple[Type[BaseException], BaseException, TracebackType]:
+def ensure_exc_info() -> tuple[type[BaseException], BaseException, TracebackType]:
exctype, exc, tb = sys.exc_info()
assert exctype
assert exc
result = run([*cmdline, *cmd], env=env, log=False, stdin=stdin, stdout=stdout, input=input)
except subprocess.CalledProcessError as e:
if log:
- logging.error(f"\"{shlex.join(os.fspath(s) for s in cmd)}\" returned non-zero exit code {e.returncode}.")
+ c = shlex.join(os.fspath(s) for s in cmd)
+ logging.error(f"\"{c}\" returned non-zero exit code {e.returncode}.")
if ARG_DEBUG_SHELL.get():
run([*cmdline, "sh"], stdin=sys.stdin, check=False, env=env, log=False)
raise e
def __exit__(
self,
- type: Optional[Type[BaseException]],
+ type: Optional[type[BaseException]],
value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
"""Sorts packages: normal first, paths second, conditional third"""
m = {"(": 2, "/": 1}
- sort = lambda name: (m.get(name[0], 0), name)
- return sorted(packages, key=sort)
+ return sorted(packages, key=lambda name: (m.get(name[0], 0), name))
def flatten(lists: Iterable[Iterable[T]]) -> list[T]:
return False
elif e.errno in (errno.EPERM, errno.EACCES):
if log:
- logging.warning("Permission denied to access /dev/vhost-vsock. Not adding a vsock device to the virtual machine.")
+ logging.warning(
+ "Permission denied to access /dev/vhost-vsock. "
+ "Not adding a vsock device to the virtual machine."
+ )
return False
raise e
def test_compression_enum_bool() -> None:
- assert bool(Compression.none) == False
- assert bool(Compression.zst) == True
- assert bool(Compression.xz) == True
- assert bool(Compression.bz2) == True
- assert bool(Compression.gz) == True
- assert bool(Compression.lz4) == True
- assert bool(Compression.lzma) == True
+ assert not bool(Compression.none)
+ assert bool(Compression.zst)
+ assert bool(Compression.xz)
+ assert bool(Compression.bz2)
+ assert bool(Compression.gz)
+ assert bool(Compression.lz4)
+ assert bool(Compression.lzma)
def test_compression_enum_str() -> None:
2
)
)
-def test_generic_version_strverscmp_improved_doc(s1: tuple[int, GenericVersion], s2: tuple[int, GenericVersion]) -> None:
- """Example from the doc string of strverscmp_improved in systemd/src/fundamental/string-util-fundamental.c"""
+def test_generic_version_strverscmp_improved_doc(
+ s1: tuple[int, GenericVersion],
+ s2: tuple[int, GenericVersion],
+) -> None:
+ """Example from the doc string of strverscmp_improved.
+
+ strverscmp_improved can be found in systemd/src/fundamental/string-util-fundamental.c
+ """
i1, v1 = s1
i2, v2 = s2
assert (v1 == v2) == (i1 == i2)