from collections.abc import Collection, Iterable, Iterator, Sequence
from contextlib import AbstractContextManager
from pathlib import Path
-from typing import Any, Callable, ClassVar, Generic, Optional, Protocol, TypeVar, Union, cast
+from typing import Any, Callable, ClassVar, Generic, Protocol, TypeVar, cast
from mkosi.distribution import Distribution, detect_distribution
from mkosi.log import ARG_DEBUG, ARG_DEBUG_SANDBOX, ARG_DEBUG_SHELL, complete_step, die
D = TypeVar("D", bound=DataclassInstance)
SE = TypeVar("SE", bound=StrEnum)
-ConfigParseCallback = Callable[[Optional[str], Optional[T]], Optional[T]]
+ConfigParseCallback = Callable[[str | None, T | None], T | None]
ConfigMatchCallback = Callable[[str, T], bool]
ConfigDefaultCallback = Callable[[dict[str, Any]], T]
@dataclasses.dataclass(frozen=True)
class ConfigTree:
source: Path
- target: Optional[Path]
+ target: Path | None
def with_prefix(self, prefix: PathString = "/") -> tuple[Path, Path]:
return (
class Drive:
id: str
size: int
- directory: Optional[Path]
- options: Optional[str]
+ directory: Path | None
+ options: str | None
file_id: str
flags: list[DriveFlag]
return a
- def to_efi(self) -> Optional[str]:
+ def to_efi(self) -> str | None:
return {
Architecture.x86: "ia32",
Architecture.x86_64: "x64",
Architecture.loongarch64: "loongarch64",
}.get(self) # fmt: skip
- def to_grub(self) -> Optional[str]:
+ def to_grub(self) -> str | None:
return {
Architecture.x86_64: "x86_64",
Architecture.x86: "i386",
return re.sub(r"&(?P<specifier>[&a-zA-Z])", replacer, text)
-def try_parse_boolean(s: str) -> Optional[bool]:
+def try_parse_boolean(s: str) -> bool | None:
"Parse 1/true/yes/y/t/on as true and 0/false/no/n/f/off/None as false"
s_l = s.lower()
return sorted(parse_path(os.fspath(p), resolve=resolve, secret=secret) for p in path.glob(glob))
-def config_parse_key(value: Optional[str], old: Optional[str]) -> Optional[Path]:
+def config_parse_key(value: str | None, old: str | None) -> Path | None:
if not value:
return None
return parse_path(value, secret=True) if Path(value).exists() else Path(value)
-def config_parse_certificate(value: Optional[str], old: Optional[str]) -> Optional[Path]:
+def config_parse_certificate(value: str | None, old: str | None) -> Path | None:
if not value:
return None
return config_match_list
-def config_parse_string(value: Optional[str], old: Optional[str]) -> Optional[str]:
+def config_parse_string(value: str | None, old: str | None) -> str | None:
return value or None
return value.get(k, None) == v
-def config_parse_boolean(value: Optional[str], old: Optional[bool]) -> Optional[bool]:
+def config_parse_boolean(value: str | None, old: bool | None) -> bool | None:
if value is None:
return False
return ConfigFeature.enabled if parse_boolean(value) else ConfigFeature.disabled
-def config_parse_feature(value: Optional[str], old: Optional[ConfigFeature]) -> Optional[ConfigFeature]:
+def config_parse_feature(value: str | None, old: ConfigFeature | None) -> ConfigFeature | None:
if value is None:
return ConfigFeature.auto
return value == parse_feature(match)
-def config_parse_compression(value: Optional[str], old: Optional[Compression]) -> Optional[Compression]:
+def config_parse_compression(value: str | None, old: Compression | None) -> Compression | None:
if not value:
return None
return Compression.zstd if parse_boolean(value) else Compression.none
-def config_parse_uuid(value: Optional[str], old: Optional[str]) -> Optional[uuid.UUID]:
+def config_parse_uuid(value: str | None, old: str | None) -> uuid.UUID | None:
if not value:
return None
die(f"{value} is not a valid UUID")
-def config_parse_source_date_epoch(value: Optional[str], old: Optional[int]) -> Optional[int]:
+def config_parse_source_date_epoch(value: str | None, old: int | None) -> int | None:
if not value:
return None
return timestamp
-def config_parse_compress_level(value: Optional[str], old: Optional[int]) -> Optional[int]:
+def config_parse_compress_level(value: str | None, old: int | None) -> int | None:
if not value:
return None
return level
-def config_parse_mode(value: Optional[str], old: Optional[int]) -> Optional[int]:
+def config_parse_mode(value: str | None, old: int | None) -> int | None:
if not value:
return None
def config_default_release(namespace: dict[str, Any]) -> str:
- hd: Union[Distribution, str, None]
- hr: Optional[str]
+ hd: Distribution | str | None
+ hr: str | None
if (
(d := os.getenv("MKOSI_HOST_DISTRIBUTION"))
)
-def config_default_source_date_epoch(namespace: dict[str, Any]) -> Optional[int]:
+def config_default_source_date_epoch(namespace: dict[str, Any]) -> int | None:
for env in namespace["environment"]:
if s := startswith(env, "SOURCE_DATE_EPOCH="):
break
return config_parse_source_date_epoch(s, None)
-def config_default_proxy_url(namespace: dict[str, Any]) -> Optional[str]:
+def config_default_proxy_url(namespace: dict[str, Any]) -> str | None:
names = ("http_proxy", "https_proxy", "HTTP_PROXY", "HTTPS_PROXY")
for env in namespace["environment"]:
return None
-def config_default_proxy_exclude(namespace: dict[str, Any]) -> Optional[list[str]]:
+def config_default_proxy_exclude(namespace: dict[str, Any]) -> list[str] | None:
names = ("no_proxy", "NO_PROXY")
for env in namespace["environment"]:
return None
-def config_default_proxy_peer_certificate(namespace: dict[str, Any]) -> Optional[Path]:
+def config_default_proxy_peer_certificate(namespace: dict[str, Any]) -> Path | None:
for p in (Path("/etc/pki/tls/certs/ca-bundle.crt"), Path("/etc/ssl/certs/ca-certificates.crt")):
if p.exists():
return p
def config_make_enum_parser(type: type[SE]) -> ConfigParseCallback[SE]:
- def config_parse_enum(value: Optional[str], old: Optional[SE]) -> Optional[SE]:
+ def config_parse_enum(value: str | None, old: SE | None) -> SE | None:
return make_enum_parser(type)(value) if value else None
return config_parse_enum
def config_make_enum_parser_with_boolean(type: type[SE], *, yes: SE, no: SE) -> ConfigParseCallback[SE]:
- def config_parse_enum(value: Optional[str], old: Optional[SE]) -> Optional[SE]:
+ def config_parse_enum(value: str | None, old: SE | None) -> SE | None:
if not value:
return None
def config_make_list_parser(
*,
- delimiter: Optional[str] = None,
+ delimiter: str | None = None,
parse: Callable[[str], T] = str, # type: ignore # see mypy#3737
unescape: bool = False,
reset: bool = True,
- key: Optional[Callable[[T], Any]] = None,
+ key: Callable[[T], Any] | None = None,
) -> ConfigParseCallback[list[T]]:
- def config_parse_list(value: Optional[str], old: Optional[list[T]]) -> Optional[list[T]]:
+ def config_parse_list(value: str | None, old: list[T] | None) -> list[T] | None:
new = old.copy() if old else []
if value is None:
def config_make_dict_parser(
*,
- delimiter: Optional[str] = None,
+ delimiter: str | None = None,
parse: Callable[[str], tuple[str, PathString]],
unescape: bool = False,
allow_paths: bool = False,
reset: bool = True,
) -> ConfigParseCallback[dict[str, PathString]]:
def config_parse_dict(
- value: Optional[str],
- old: Optional[dict[str, PathString]],
- ) -> Optional[dict[str, PathString]]:
+ value: str | None,
+ old: dict[str, PathString] | None,
+ ) -> dict[str, PathString] | None:
new = old.copy() if old else {}
if value is None:
absolute: bool = False,
constants: Sequence[str] = (),
) -> ConfigParseCallback[Path]:
- def config_parse_path(value: Optional[str], old: Optional[Path]) -> Optional[Path]:
+ def config_parse_path(value: str | None, old: Path | None) -> Path | None:
if not value:
return None
def config_make_filename_parser(hint: str) -> ConfigParseCallback[str]:
- def config_parse_filename(value: Optional[str], old: Optional[str]) -> Optional[str]:
+ def config_parse_filename(value: str | None, old: str | None) -> str | None:
if not value:
return None
def config_parse_root_password(
- value: Optional[str], old: Optional[tuple[str, bool]]
-) -> Optional[tuple[str, bool]]:
+ value: str | None,
+ old: tuple[str, bool] | None,
+) -> tuple[str, bool] | None:
if not value:
return None
return result
-def config_parse_bytes(value: Optional[str], old: Optional[int] = None) -> Optional[int]:
+def config_parse_bytes(value: str | None, old: int | None = None) -> int | None:
if not value:
return None
return parse_bytes(value)
-def config_parse_number(value: Optional[str], old: Optional[int] = None) -> Optional[int]:
+def config_parse_number(value: str | None, old: int | None = None) -> int | None:
if not value:
return None
)
-def config_parse_sector_size(value: Optional[str], old: Optional[int]) -> Optional[int]:
+def config_parse_sector_size(value: str | None, old: int | None) -> int | None:
if not value:
return None
return size
-def config_parse_vsock_cid(value: Optional[str], old: Optional[int]) -> Optional[int]:
+def config_parse_vsock_cid(value: str | None, old: int | None) -> int | None:
if not value:
return None
return cid
-def config_parse_minimum_version(value: Optional[str], old: Optional[str]) -> Optional[str]:
+def config_parse_minimum_version(value: str | None, old: str | None) -> str | None:
if not value:
return old
return f"{self.type}:{self.source}" if self.source else str(self.type)
-def config_parse_key_source(value: Optional[str], old: Optional[KeySource]) -> Optional[KeySource]:
+def config_parse_key_source(value: str | None, old: KeySource | None) -> KeySource | None:
if not value:
return KeySource(type=KeySourceType.file)
def config_parse_certificate_source(
- value: Optional[str],
- old: Optional[CertificateSource],
-) -> Optional[CertificateSource]:
+ value: str | None,
+ old: CertificateSource | None,
+) -> CertificateSource | None:
if not value:
return CertificateSource(type=CertificateSourceType.file)
def config_parse_artifact_output_list(
- value: Optional[str], old: Optional[list[ArtifactOutput]]
-) -> Optional[list[ArtifactOutput]]:
+ value: str | None, old: list[ArtifactOutput] | None
+) -> list[ArtifactOutput] | None:
if not value:
return []
dest: str
section: str
parse: ConfigParseCallback[T] = config_parse_string # type: ignore # see mypy#3737
- match: Optional[ConfigMatchCallback[T]] = None
+ match: ConfigMatchCallback[T] | None = None
name: str = ""
- default: Optional[T] = None
- default_factory: Optional[ConfigDefaultCallback[T]] = None
+ default: T | None = None
+ default_factory: ConfigDefaultCallback[T] | None = None
default_factory_depends: tuple[str, ...] = tuple()
path_suffixes: tuple[str, ...] = ()
recursive_path_suffixes: tuple[str, ...] = ()
scope: SettingScope = SettingScope.local
# settings for argparse
- short: Optional[str] = None
+ short: str | None = None
long: str = ""
- choices: Optional[list[str]] = None
- metavar: Optional[str] = None
- const: Optional[Any] = None
- help: Optional[str] = None
+ choices: list[str] | None = None
+ metavar: str | None = None
+ const: Any | None = None
+ help: str | None = None
# backward compatibility
compat_names: tuple[str, ...] = ()
)
-def parse_chdir(path: str) -> Optional[Path]:
+def parse_chdir(path: str) -> Path | None:
if not path:
# The current directory should be ignored
return None
self,
option_strings: Sequence[str],
dest: str,
- nargs: Union[int, str, None] = None,
+ nargs: int | str | None = None,
default: Any = argparse.SUPPRESS,
- help: Optional[str] = argparse.SUPPRESS,
+ help: str | None = argparse.SUPPRESS,
) -> None:
super().__init__(option_strings, dest, nargs=nargs, default=default, help=help)
self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
- values: Union[str, Sequence[Any], None],
- option_string: Optional[str] = None,
+ values: str | Sequence[Any] | None,
+ option_string: str | None = None,
) -> None:
logging.warning(f"{option_string} is no longer supported")
self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
- values: Union[str, Sequence[Any], None] = None,
- option_string: Optional[str] = None,
+ values: str | Sequence[Any] | None = None,
+ option_string: str | None = None,
) -> None:
page(parser.format_help(), namespace.pager)
parser.exit()
verb: Verb
cmdline: list[str]
force: int
- directory: Optional[Path]
+ directory: Path | None
debug: bool
debug_shell: bool
debug_workspace: bool
return dataclasses.asdict(self, dict_factory=dict_with_capitalised_keys_factory)
@classmethod
- def from_json(cls, s: Union[str, dict[str, Any], SupportsRead[str], SupportsRead[bytes]]) -> "Args":
+ def from_json(cls, s: str | dict[str, Any] | SupportsRead[str] | SupportsRead[bytes]) -> "Args":
"""Instantiate a Args object from a (partial) JSON dump."""
if isinstance(s, str):
profiles: list[str]
files: list[Path]
dependencies: list[str]
- minimum_version: Optional[str]
+ minimum_version: str | None
pass_environment: list[str]
distribution: Distribution
release: str
architecture: Architecture
- mirror: Optional[str]
- snapshot: Optional[str]
- local_mirror: Optional[str]
+ mirror: str | None
+ snapshot: str | None
+ local_mirror: str | None
repository_key_check: bool
repository_key_fetch: bool
repositories: list[str]
manifest_format: list[ManifestFormat]
output: str
output_extension: str
- output_size: Optional[int]
+ output_size: int | None
compress_output: Compression
compress_level: int
- output_dir: Optional[Path]
- output_mode: Optional[int]
- image_id: Optional[str]
- image_version: Optional[str]
+ output_dir: Path | None
+ output_mode: int | None
+ image_id: str | None
+ image_version: str | None
oci_labels: dict[str, str]
oci_annotations: dict[str, str]
split_artifacts: list[ArtifactOutput]
repart_dirs: list[Path]
- sysupdate_dir: Optional[Path]
- sector_size: Optional[int]
+ sysupdate_dir: Path | None
+ sector_size: int | None
overlay: bool
seed: uuid.UUID
remove_packages: list[str]
remove_files: list[str]
clean_package_metadata: ConfigFeature
- source_date_epoch: Optional[int]
+ source_date_epoch: int | None
configure_scripts: list[Path]
sync_scripts: list[Path]
initrd_volatile_packages: list[str]
microcode_host: bool
devicetrees: list[str]
- splash: Optional[Path]
+ splash: Path | None
kernel_command_line: list[str]
kernel_modules_include: list[str]
kernel_modules_exclude: list[str]
kernel_modules_initrd_exclude: list[str]
kernel_modules_initrd_include_host: bool
- locale: Optional[str]
- locale_messages: Optional[str]
- keymap: Optional[str]
- timezone: Optional[str]
- hostname: Optional[str]
- root_password: Optional[tuple[str, bool]]
- root_shell: Optional[str]
- machine_id: Optional[uuid.UUID]
+ locale: str | None
+ locale_messages: str | None
+ keymap: str | None
+ timezone: str | None
+ hostname: str | None
+ root_password: tuple[str, bool] | None
+ root_shell: str | None
+ machine_id: uuid.UUID | None
autologin: bool
make_initrd: bool
secure_boot: bool
secure_boot_auto_enroll: bool
- secure_boot_key: Optional[Path]
+ secure_boot_key: Path | None
secure_boot_key_source: KeySource
- secure_boot_certificate: Optional[Path]
+ secure_boot_certificate: Path | None
secure_boot_certificate_source: CertificateSource
secure_boot_sign_tool: SecureBootSignTool
verity: Verity
- verity_key: Optional[Path]
+ verity_key: Path | None
verity_key_source: KeySource
- verity_certificate: Optional[Path]
+ verity_certificate: Path | None
verity_certificate_source: CertificateSource
sign_expected_pcr: ConfigFeature
- sign_expected_pcr_key: Optional[Path]
+ sign_expected_pcr_key: Path | None
sign_expected_pcr_key_source: KeySource
- sign_expected_pcr_certificate: Optional[Path]
+ sign_expected_pcr_certificate: Path | None
sign_expected_pcr_certificate_source: CertificateSource
- passphrase: Optional[Path]
+ passphrase: Path | None
checksum: bool
sign: bool
openpgp_tool: str
- key: Optional[str]
+ key: str | None
- tools_tree: Optional[Path]
+ tools_tree: Path | None
tools_tree_certificates: bool
extra_search_paths: list[Path]
incremental: Incremental
cacheonly: Cacheonly
sandbox_trees: list[ConfigTree]
- workspace_dir: Optional[Path]
- cache_dir: Optional[Path]
+ workspace_dir: Path | None
+ cache_dir: Path | None
cache_key: str
- package_cache_dir: Optional[Path]
- build_dir: Optional[Path]
+ package_cache_dir: Path | None
+ build_dir: Path | None
build_key: str
use_subvolumes: ConfigFeature
repart_offline: bool
environment_files: list[Path]
with_tests: bool
with_network: bool
- proxy_url: Optional[str]
+ proxy_url: str | None
proxy_exclude: list[str]
- proxy_peer_certificate: Optional[Path]
- proxy_client_certificate: Optional[Path]
- proxy_client_key: Optional[Path]
+ proxy_peer_certificate: Path | None
+ proxy_client_certificate: Path | None
+ proxy_client_key: Path | None
make_scripts_executable: bool
- nspawn_settings: Optional[Path]
+ nspawn_settings: Path | None
ephemeral: bool
credentials: dict[str, PathString]
kernel_command_line_extra: list[str]
register: ConfigFeature
storage_target_mode: ConfigFeature
runtime_trees: list[ConfigTree]
- runtime_size: Optional[int]
+ runtime_size: int | None
runtime_network: Network
runtime_build_sources: bool
bind_user: bool
- ssh_key: Optional[Path]
- ssh_certificate: Optional[Path]
- machine: Optional[str]
- forward_journal: Optional[Path]
+ ssh_key: Path | None
+ ssh_certificate: Path | None
+ machine: str | None
+ forward_journal: Path | None
vmm: Vmm
console: ConsoleMode
tpm: ConfigFeature
removable: bool
firmware: Firmware
- firmware_variables: Optional[Path]
- linux: Optional[str]
+ firmware_variables: Path | None
+ linux: str | None
drives: list[Drive]
qemu_args: list[str]
@classmethod
def from_partial_json(
cls,
- s: Union[str, dict[str, Any], SupportsRead[str], SupportsRead[bytes]],
+ s: str | dict[str, Any] | SupportsRead[str] | SupportsRead[bytes],
) -> dict[str, Any]:
"""Instantiate a Config object from a (partial) JSON dump."""
if isinstance(s, str):
return {(tk := key_transformer(k)): value_transformer(tk, v) for k, v in j.items()}
@classmethod
- def from_json(cls, s: Union[str, dict[str, Any], SupportsRead[str], SupportsRead[bytes]]) -> "Config":
+ def from_json(cls, s: str | dict[str, Any] | SupportsRead[str] | SupportsRead[bytes]) -> "Config":
return dataclasses.replace(
cls.default(), **{k: v for k, v in cls.from_partial_json(s).items() if k in cls.fields()}
)
- def find_binary(self, *names: PathString, tools: bool = True) -> Optional[Path]:
+ def find_binary(self, *names: PathString, tools: bool = True) -> Path | None:
return find_binary(*names, root=self.tools() if tools else Path("/"), extra=self.extra_search_paths)
def sandbox(
devices: bool = False,
relaxed: bool = False,
tools: bool = True,
- scripts: Optional[Path] = None,
- overlay: Optional[Path] = None,
+ scripts: Path | None = None,
+ overlay: Path | None = None,
options: Sequence[PathString] = (),
) -> AbstractContextManager[list[PathString]]:
opt: list[PathString] = [*options]
We have our own parser instead of using configparser as the latter does not support specifying the same
setting multiple times in the same configuration file.
"""
- section: Optional[str] = None
- setting: Optional[str] = None
- value: Optional[str] = None
+ section: str | None = None
+ setting: str | None = None
+ value: str | None = None
for line in textwrap.dedent(path.read_text()).splitlines():
comment = line.find("#")
help=argparse.SUPPRESS,
)
- last_section: Optional[str] = None
+ last_section: str | None = None
for s in SETTINGS:
if s.section != last_section:
self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
- values: Union[str, Sequence[Any], None],
- option_string: Optional[str] = None,
+ values: str | Sequence[Any] | None,
+ option_string: str | None = None,
) -> None:
assert option_string is not None
with chdir(path if path.is_dir() else Path.cwd()):
self.parse_config_one(path if path.is_file() else Path.cwd(), parse_profiles=path.is_dir())
- def finalize_value(self, setting: ConfigSetting[T]) -> Optional[T]:
+ def finalize_value(self, setting: ConfigSetting[T]) -> T | None:
# If a value was specified on the CLI, it always takes priority. If the setting is a collection of
# values, we merge the value from the CLI with the value from the configuration, making sure that the
# value from the CLI always takes priority.
- if (v := cast(Optional[T], self.cli.get(setting.dest))) is not None:
+ if (v := cast(T | None, self.cli.get(setting.dest))) is not None:
cfg_value = self.config.get(setting.dest)
# We either have no corresponding value in the config files
# or the values was assigned the empty string on the CLI
if (
setting.dest not in self.cli
and setting.dest in self.config
- and (v := cast(Optional[T], self.config[setting.dest])) is not None
+ and (v := cast(T | None, self.config[setting.dest])) is not None
):
return v
return default
def match_config(self, path: Path, asserts: bool = False) -> bool:
- condition_triggered: Optional[bool] = None
- match_triggered: Optional[bool] = None
+ condition_triggered: bool | None = None
+ match_triggered: bool | None = None
skip = False
# If the config file does not exist, we assume it matches so that we look at the other files in the
return match_triggered is not False
def parse_config_one(self, path: Path, parse_profiles: bool = False, parse_local: bool = False) -> bool:
- s: Optional[ConfigSetting[object]] # Hint to mypy that we might assign None
+ s: ConfigSetting[object] | None # Hint to mypy that we might assign None
assert path.is_absolute()
extras = path.is_dir()
main: ParseContext,
finalized: dict[str, Any],
*,
- configdir: Optional[Path],
+ configdir: Path | None,
resources: Path,
) -> Config:
context = ParseContext(resources)
return Config.from_dict(context.finalize())
-def finalize_configdir(directory: Optional[Path]) -> Optional[Path]:
+def finalize_configdir(directory: Path | None) -> Path | None:
"""Allow locating all mkosi configuration in a mkosi/ subdirectory
instead of in the top-level directory of a git repository.
"""
argv: Sequence[str] = (),
*,
resources: Path = Path("/"),
-) -> tuple[Args, Optional[Config], tuple[Config, ...]]:
+) -> tuple[Args, Config | None, tuple[Config, ...]]:
argv = list(argv)
context = ParseContext(resources)
# To make this work, we can't use default_factory as it is evaluated too early, so
# we check here to see if dependencies were explicitly provided and if not we gather
# the list of default dependencies while we parse the subimages.
- dependencies: Optional[list[str]] = (
+ dependencies: list[str] | None = (
None if "dependencies" in context.cli or "dependencies" in context.config else []
)
return term if sys.stderr.isatty() else "dumb"
-def finalize_git_config(proxy_url: Optional[str], env: dict[str, str]) -> dict[str, str]:
+def finalize_git_config(proxy_url: str | None, env: dict[str, str]) -> dict[str, str]:
if proxy_url is None:
return {}
return "yes" if b else "no"
-def none_to_na(s: Optional[object]) -> str:
+def none_to_na(s: object | None) -> str:
return "n/a" if s is None else str(s)
-def none_to_random(s: Optional[object]) -> str:
+def none_to_random(s: object | None) -> str:
return "random" if s is None else str(s)
-def none_to_none(s: Optional[object]) -> str:
+def none_to_none(s: object | None) -> str:
return "none" if s is None else str(s)
-def none_to_default(s: Optional[object]) -> str:
+def none_to_default(s: object | None) -> str:
return "default" if s is None else str(s)
return f"{num_bytes}B"
-def format_bytes_or_none(num_bytes: Optional[int]) -> str:
+def format_bytes_or_none(num_bytes: int | None) -> str:
return format_bytes(num_bytes) if num_bytes is not None else "none"
return f"{oct_value:>04o}"
-def format_octal_or_default(oct_value: Optional[int]) -> str:
+def format_octal_or_default(oct_value: int | None) -> str:
return format_octal(oct_value) if oct_value is not None else "default"
return super().default(o)
-def dump_json(dict: dict[str, Any], indent: Optional[int] = 4) -> str:
+def dump_json(dict: dict[str, Any], indent: int | None = 4) -> str:
return json.dumps(dict, cls=JsonEncoder, indent=indent, sort_keys=True)
E = TypeVar("E", bound=StrEnum)
-def json_type_transformer(refcls: Union[type[Args], type[Config]]) -> Callable[[str, Any], Any]:
+def json_type_transformer(refcls: type[Args] | type[Config]) -> Callable[[str, Any], Any]:
fields_by_name = {field.name: field for field in dataclasses.fields(refcls)}
def path_transformer(path: str, fieldtype: type[Path]) -> Path:
return Path(path)
- def optional_path_transformer(path: Optional[str], fieldtype: type[Optional[Path]]) -> Optional[Path]:
+ def optional_path_transformer(path: str | None, fieldtype: type[Path | None]) -> Path | None:
return Path(path) if path is not None else None
def path_list_transformer(pathlist: list[str], fieldtype: type[list[Path]]) -> list[Path]:
return uuid.UUID(uuidstr)
def optional_uuid_transformer(
- uuidstr: Optional[str], fieldtype: type[Optional[uuid.UUID]]
- ) -> Optional[uuid.UUID]:
+ uuidstr: str | None, fieldtype: type[uuid.UUID | None]
+ ) -> uuid.UUID | None:
return uuid.UUID(uuidstr) if uuidstr is not None else None
def root_password_transformer(
- rootpw: Optional[list[Union[str, bool]]], fieldtype: type[Optional[tuple[str, bool]]]
- ) -> Optional[tuple[str, bool]]:
+ rootpw: list[str | bool] | None, fieldtype: type[tuple[str, bool] | None]
+ ) -> tuple[str, bool] | None:
if rootpw is None:
return None
return (cast(str, rootpw[0]), cast(bool, rootpw[1]))
def enum_transformer(enumval: str, fieldtype: type[E]) -> E:
return fieldtype(enumval)
- def optional_enum_transformer(enumval: Optional[str], fieldtype: type[Optional[E]]) -> Optional[E]:
+ def optional_enum_transformer(enumval: str | None, fieldtype: type[E | None]) -> E | None:
return typing.get_args(fieldtype)[0](enumval) if enumval is not None else None
def enum_list_transformer(enumlist: list[str], fieldtype: type[list[E]]) -> list[E]:
return ret
def generic_version_transformer(
- version: Optional[str],
- fieldtype: type[Optional[GenericVersion]],
- ) -> Optional[GenericVersion]:
+ version: str | None,
+ fieldtype: type[GenericVersion | None],
+ ) -> GenericVersion | None:
return GenericVersion(version) if version is not None else None
def certificate_source_transformer(
# to shut up the type checkers and rely on the tests.
transformers: dict[Any, Callable[[Any, Any], Any]] = {
Path: path_transformer,
- Optional[Path]: optional_path_transformer,
+ Path | None: optional_path_transformer,
list[Path]: path_list_transformer,
uuid.UUID: uuid_transformer,
- Optional[uuid.UUID]: optional_uuid_transformer,
- Optional[tuple[str, bool]]: root_password_transformer,
+ uuid.UUID | None: optional_uuid_transformer,
+ tuple[str, bool] | None: root_password_transformer,
list[ConfigTree]: config_tree_transformer,
Architecture: enum_transformer,
BiosBootloader: enum_transformer,
SecureBootSignTool: enum_transformer,
Incremental: enum_transformer,
BuildSourcesEphemeral: enum_transformer,
- Optional[Distribution]: optional_enum_transformer,
+ Distribution | None: optional_enum_transformer,
list[ManifestFormat]: enum_list_transformer,
Verb: enum_transformer,
DocFormat: enum_transformer,
}
def json_transformer(key: str, val: Any) -> Any:
- fieldtype: Optional[dataclasses.Field[Any]] = fields_by_name.get(key)
+ fieldtype: dataclasses.Field[Any] | None = fields_by_name.get(key)
# It is unlikely that the type of a field will be None only, so let's not bother with a different
# sentinel value
if fieldtype is None:
config: Config,
root: Path,
fatal: bool = True,
-) -> Optional[tuple[Path, str, Path, Path]]:
+) -> tuple[Path, str, Path, Path] | None:
if config.selinux_relabel == ConfigFeature.disabled:
return None
def systemd_pty_forward(
config: Config,
*,
- background: Optional[str] = None,
- title: Optional[str] = None,
+ background: str | None = None,
+ title: str | None = None,
) -> list[str]:
tint_bg = parse_boolean(config.environment.get("SYSTEMD_TINT_BACKGROUND", "1")) and parse_boolean(
os.environ.get("SYSTEMD_TINT_BACKGROUND", "1")