import subprocess
import sys
import tempfile
-import textwrap
import time
import urllib.parse
import urllib.request
import uuid
from pathlib import Path
-from subprocess import DEVNULL, PIPE
-from textwrap import dedent
+from textwrap import dedent, wrap
from typing import (
IO,
TYPE_CHECKING,
def btrfs_subvol_delete(path: Path) -> None:
# Extract the path of the subvolume relative to the filesystem
- c = run(["btrfs", "subvol", "show", path], stdout=PIPE, stderr=DEVNULL, text=True)
+ c = run(["btrfs", "subvol", "show", path],
+ stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True)
subvol_path = c.stdout.splitlines()[0]
# Make the subvolume RW again if it was set RO by btrfs_subvol_delete
run(["btrfs", "property", "set", path, "ro", "false"])
# Recursively delete the direct children of the subvolume
- c = run(["btrfs", "subvol", "list", "-o", path], stdout=PIPE, stderr=DEVNULL, text=True)
+ c = run(["btrfs", "subvol", "list", "-o", path],
+ stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True)
for line in c.stdout.splitlines():
if not line:
continue
child_path = path / cast(str, os.path.relpath(child_subvol_path, subvol_path))
btrfs_subvol_delete(child_path)
# Delete the subvolume now that all its descendants have been deleted
- run(["btrfs", "subvol", "delete", path], stdout=DEVNULL, stderr=DEVNULL)
+ run(["btrfs", "subvol", "delete", path],
+ stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def btrfs_subvol_make_ro(path: Path, b: bool = True) -> None:
def disable_cow(path: PathString) -> None:
"""Disable copy-on-write if applicable on filesystem"""
- run(["chattr", "+C", path], stdout=DEVNULL, stderr=DEVNULL, check=False)
+ run(["chattr", "+C", path],
+ stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False)
def root_partition_description(
@contextlib.contextmanager
def get_loopdev(f: BinaryIO) -> Iterator[BinaryIO]:
with complete_step(f"Attaching {f.name} as loopback…", "Detaching {}") as output:
- c = run(["losetup", "--find", "--show", "--partscan", f.name], stdout=PIPE, text=True)
+ c = run(["losetup", "--find", "--show", "--partscan", f.name], stdout=subprocess.PIPE, text=True)
loopdev = Path(c.stdout.strip())
output += [loopdev]
def debootstrap_knows_arg(arg: str) -> bool:
- return bytes("invalid option", "UTF-8") not in run(["debootstrap", arg], stdout=PIPE, check=False).stdout
+ return bytes("invalid option", "UTF-8") not in run(["debootstrap", arg],
+ stdout=subprocess.PIPE, check=False).stdout
@contextlib.contextmanager
def add_apt_package_if_exists(state: MkosiState, extra_packages: Set[str], package: str) -> None:
- if invoke_apt(state, "cache", "search", ["--names-only", f"^{package}$"], stdout=PIPE).stdout.strip() != "":
+ if invoke_apt(state, "cache", "search", ["--names-only", f"^{package}$"], stdout=subprocess.PIPE).stdout.strip():
add_packages(state.config, extra_packages, package)
uid = int(os.getenv("SUDO_UID", 0))
- c = run(["git", "-C", src, "ls-files", "-z", *what_files], stdout=PIPE, text=False, user=uid)
+ c = run(["git", "-C", src, "ls-files", "-z", *what_files], stdout=subprocess.PIPE, text=False, user=uid)
files: Set[str] = {x.decode("utf-8") for x in c.stdout.rstrip(b"\0").split(b"\0")}
# Add the .git/ directory in as well.
files.add(fr)
# Get submodule files
- c = run(["git", "-C", src, "submodule", "status", "--recursive"], stdout=PIPE, text=True, user=uid)
+ c = run(["git", "-C", src, "submodule", "status", "--recursive"], stdout=subprocess.PIPE, text=True, user=uid)
submodules = {x.split()[1] for x in c.stdout.splitlines()}
# workaround for git ls-files returning the path of submodules that we will
sm = Path(sm)
c = run(
["git", "-C", src / sm, "ls-files", "-z"] + what_files,
- stdout=PIPE,
+ stdout=subprocess.PIPE,
text=False,
user=uid,
)
with complete_step("Generating verity hashes…"):
f: BinaryIO = cast(BinaryIO, tempfile.NamedTemporaryFile(dir=state.config.output.parent, prefix=".mkosi-"))
- c = run(["veritysetup", "format", dev, f.name], stdout=PIPE)
+ c = run(["veritysetup", "format", dev, f.name], stdout=subprocess.PIPE)
for line in c.stdout.decode("utf-8").split("\n"):
if line.startswith("Root hash:"):
values: Union[str, Sequence[Any], None, bool],
option_string: Optional[str] = None,
) -> None:
- new_value = self.default
if isinstance(values, str):
try:
new_value = parse_boolean(values)
"""
lines = text.splitlines()
subindent = ' ' if lines[0].endswith(':') else ''
- return list(itertools.chain.from_iterable(
- textwrap.wrap(line, width,
- break_long_words=False,
- break_on_hyphens=False,
- subsequent_indent=subindent)
- for line in lines))
+ return list(itertools.chain.from_iterable(wrap(line, width,
+ break_long_words=False, break_on_hyphens=False,
+ subsequent_indent=subindent) for line in lines))
+
class ArgumentParserMkosi(argparse.ArgumentParser):
"""ArgumentParser with support for mkosi configuration file(s)
path = Path(path)
try:
- return path.unlink()
+ path.unlink()
+ return
except FileNotFoundError:
return
except Exception:
["ssh-keygen", "-f", f.name, "-N", state.config.password or "", "-C", "mkosi", "-t", "ed25519"],
input="y\n",
text=True,
- stdout=DEVNULL,
+ stdout=subprocess.DEVNULL,
)
copy_file(f"{f.name}.pub", authorized_keys)
def interface_exists(dev: str) -> bool:
- return run(["ip", "link", "show", dev], stdout=DEVNULL, stderr=DEVNULL, check=False).returncode == 0
+ rc = run(["ip", "link", "show", dev],
+ stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False).returncode
+ return rc == 0
def find_address(config: MkosiConfig) -> Tuple[str, str]:
else:
die(f"Container/VM interface ve-{name}/vt-{name} not found")
- link = json.loads(run(["ip", "-j", "link", "show", "dev", dev], stdout=PIPE, text=True).stdout)[0]
+ link = json.loads(run(["ip", "-j", "link", "show", "dev", dev],
+ stdout=subprocess.PIPE, text=True).stdout)[0]
if link["operstate"] == "DOWN":
raise MkosiException(
f"{dev} is not enabled. Make sure systemd-networkd is running so it can manage the interface."
# Trigger IPv6 neighbor discovery of which we can access the results via 'ip neighbor'. This allows us to
# find out the link-local IPv6 address of the container/VM via which we can connect to it.
- run(["ping", "-c", "1", "-w", "15", f"ff02::1%{dev}"], stdout=DEVNULL)
+ run(["ping", "-c", "1", "-w", "15", f"ff02::1%{dev}"], stdout=subprocess.DEVNULL)
for _ in range(50):
neighbors = json.loads(
- run(["ip", "-j", "neighbor", "show", "dev", dev], stdout=PIPE, text=True).stdout
+ run(["ip", "-j", "neighbor", "show", "dev", dev], stdout=subprocess.PIPE, text=True).stdout
)
for neighbor in neighbors:
new_version = ".".join(v[:-1] + [str(m + 1)])
print(f"Increasing last component of version by one, bumping '{config.image_version}' → '{new_version}'.")
- open("mkosi.version", "w").write(new_version + "\n")
+ Path("mkosi.version").write_text(new_version + "\n")
def expand_paths(paths: Sequence[str]) -> List[Path]: