From: Joerg Behrmann Date: Wed, 21 Sep 2022 15:02:47 +0000 (+0200) Subject: Soothe some CodeQL warnings X-Git-Tag: v14~28 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=67249a17330cce1e403a4dd9d923d39fdcf1bbca;p=thirdparty%2Fmkosi.git Soothe some CodeQL warnings --- diff --git a/mkosi/__init__.py b/mkosi/__init__.py index 472934252..9561e7bf3 100644 --- a/mkosi/__init__.py +++ b/mkosi/__init__.py @@ -34,14 +34,12 @@ import string import subprocess import sys import tempfile -import textwrap import time import urllib.parse import urllib.request import uuid from pathlib import Path -from subprocess import DEVNULL, PIPE -from textwrap import dedent +from textwrap import dedent, wrap from typing import ( IO, TYPE_CHECKING, @@ -677,12 +675,14 @@ def btrfs_subvol_create(path: Path, mode: int = 0o755) -> None: def btrfs_subvol_delete(path: Path) -> None: # Extract the path of the subvolume relative to the filesystem - c = run(["btrfs", "subvol", "show", path], stdout=PIPE, stderr=DEVNULL, text=True) + c = run(["btrfs", "subvol", "show", path], + stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True) subvol_path = c.stdout.splitlines()[0] # Make the subvolume RW again if it was set RO by btrfs_subvol_delete run(["btrfs", "property", "set", path, "ro", "false"]) # Recursively delete the direct children of the subvolume - c = run(["btrfs", "subvol", "list", "-o", path], stdout=PIPE, stderr=DEVNULL, text=True) + c = run(["btrfs", "subvol", "list", "-o", path], + stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True) for line in c.stdout.splitlines(): if not line: continue @@ -690,7 +690,8 @@ def btrfs_subvol_delete(path: Path) -> None: child_path = path / cast(str, os.path.relpath(child_subvol_path, subvol_path)) btrfs_subvol_delete(child_path) # Delete the subvolume now that all its descendants have been deleted - run(["btrfs", "subvol", "delete", path], stdout=DEVNULL, stderr=DEVNULL) + run(["btrfs", "subvol", "delete", path], + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) def btrfs_subvol_make_ro(path: Path, b: bool = True) -> None: @@ -720,7 +721,8 @@ def is_generated_root(config: Union[argparse.Namespace, MkosiConfig]) -> bool: def disable_cow(path: PathString) -> None: """Disable copy-on-write if applicable on filesystem""" - run(["chattr", "+C", path], stdout=DEVNULL, stderr=DEVNULL, check=False) + run(["chattr", "+C", path], + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False) def root_partition_description( @@ -947,7 +949,7 @@ def flock(file: BinaryIO) -> Iterator[None]: @contextlib.contextmanager def get_loopdev(f: BinaryIO) -> Iterator[BinaryIO]: with complete_step(f"Attaching {f.name} as loopback…", "Detaching {}") as output: - c = run(["losetup", "--find", "--show", "--partscan", f.name], stdout=PIPE, text=True) + c = run(["losetup", "--find", "--show", "--partscan", f.name], stdout=subprocess.PIPE, text=True) loopdev = Path(c.stdout.strip()) output += [loopdev] @@ -2400,7 +2402,8 @@ def install_centos_variant(state: MkosiState) -> None: def debootstrap_knows_arg(arg: str) -> bool: - return bytes("invalid option", "UTF-8") not in run(["debootstrap", arg], stdout=PIPE, check=False).stdout + return bytes("invalid option", "UTF-8") not in run(["debootstrap", arg], + stdout=subprocess.PIPE, check=False).stdout @contextlib.contextmanager @@ -2469,7 +2472,7 @@ def add_apt_auxiliary_repos(state: MkosiState, repos: Set[str]) -> None: def add_apt_package_if_exists(state: MkosiState, extra_packages: Set[str], package: str) -> None: - if invoke_apt(state, "cache", "search", ["--names-only", f"^{package}$"], stdout=PIPE).stdout.strip() != "": + if invoke_apt(state, "cache", "search", ["--names-only", f"^{package}$"], stdout=subprocess.PIPE).stdout.strip(): add_packages(state.config, extra_packages, package) @@ -3267,7 +3270,7 @@ def copy_git_files(src: Path, dest: Path, *, source_file_transfer: SourceFileTra uid = int(os.getenv("SUDO_UID", 0)) - c = run(["git", "-C", src, "ls-files", "-z", *what_files], stdout=PIPE, text=False, user=uid) + c = run(["git", "-C", src, "ls-files", "-z", *what_files], stdout=subprocess.PIPE, text=False, user=uid) files: Set[str] = {x.decode("utf-8") for x in c.stdout.rstrip(b"\0").split(b"\0")} # Add the .git/ directory in as well. @@ -3280,7 +3283,7 @@ def copy_git_files(src: Path, dest: Path, *, source_file_transfer: SourceFileTra files.add(fr) # Get submodule files - c = run(["git", "-C", src, "submodule", "status", "--recursive"], stdout=PIPE, text=True, user=uid) + c = run(["git", "-C", src, "submodule", "status", "--recursive"], stdout=subprocess.PIPE, text=True, user=uid) submodules = {x.split()[1] for x in c.stdout.splitlines()} # workaround for git ls-files returning the path of submodules that we will @@ -3291,7 +3294,7 @@ def copy_git_files(src: Path, dest: Path, *, source_file_transfer: SourceFileTra sm = Path(sm) c = run( ["git", "-C", src / sm, "ls-files", "-z"] + what_files, - stdout=PIPE, + stdout=subprocess.PIPE, text=False, user=uid, ) @@ -3699,7 +3702,7 @@ def make_verity(state: MkosiState, dev: Optional[Path]) -> Tuple[Optional[Binary with complete_step("Generating verity hashes…"): f: BinaryIO = cast(BinaryIO, tempfile.NamedTemporaryFile(dir=state.config.output.parent, prefix=".mkosi-")) - c = run(["veritysetup", "format", dev, f.name], stdout=PIPE) + c = run(["veritysetup", "format", dev, f.name], stdout=subprocess.PIPE) for line in c.stdout.decode("utf-8").split("\n"): if line.startswith("Root hash:"): @@ -4700,7 +4703,6 @@ class BooleanAction(argparse.Action): values: Union[str, Sequence[Any], None, bool], option_string: Optional[str] = None, ) -> None: - new_value = self.default if isinstance(values, str): try: new_value = parse_boolean(values) @@ -4783,12 +4785,10 @@ class CustomHelpFormatter(argparse.HelpFormatter): """ lines = text.splitlines() subindent = ' ' if lines[0].endswith(':') else '' - return list(itertools.chain.from_iterable( - textwrap.wrap(line, width, - break_long_words=False, - break_on_hyphens=False, - subsequent_indent=subindent) - for line in lines)) + return list(itertools.chain.from_iterable(wrap(line, width, + break_long_words=False, break_on_hyphens=False, + subsequent_indent=subindent) for line in lines)) + class ArgumentParserMkosi(argparse.ArgumentParser): """ArgumentParser with support for mkosi configuration file(s) @@ -5935,7 +5935,8 @@ def unlink_try_hard(path: Optional[PathString]) -> None: path = Path(path) try: - return path.unlink() + path.unlink() + return except FileNotFoundError: return except Exception: @@ -6965,7 +6966,7 @@ def setup_ssh(state: MkosiState, cached: bool) -> Optional[TextIO]: ["ssh-keygen", "-f", f.name, "-N", state.config.password or "", "-C", "mkosi", "-t", "ed25519"], input="y\n", text=True, - stdout=DEVNULL, + stdout=subprocess.DEVNULL, ) copy_file(f"{f.name}.pub", authorized_keys) @@ -7834,7 +7835,9 @@ def run_qemu(config: MkosiConfig) -> None: def interface_exists(dev: str) -> bool: - return run(["ip", "link", "show", dev], stdout=DEVNULL, stderr=DEVNULL, check=False).returncode == 0 + rc = run(["ip", "link", "show", dev], + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False).returncode + return rc == 0 def find_address(config: MkosiConfig) -> Tuple[str, str]: @@ -7854,7 +7857,8 @@ def find_address(config: MkosiConfig) -> Tuple[str, str]: else: die(f"Container/VM interface ve-{name}/vt-{name} not found") - link = json.loads(run(["ip", "-j", "link", "show", "dev", dev], stdout=PIPE, text=True).stdout)[0] + link = json.loads(run(["ip", "-j", "link", "show", "dev", dev], + stdout=subprocess.PIPE, text=True).stdout)[0] if link["operstate"] == "DOWN": raise MkosiException( f"{dev} is not enabled. Make sure systemd-networkd is running so it can manage the interface." @@ -7862,11 +7866,11 @@ def find_address(config: MkosiConfig) -> Tuple[str, str]: # Trigger IPv6 neighbor discovery of which we can access the results via 'ip neighbor'. This allows us to # find out the link-local IPv6 address of the container/VM via which we can connect to it. - run(["ping", "-c", "1", "-w", "15", f"ff02::1%{dev}"], stdout=DEVNULL) + run(["ping", "-c", "1", "-w", "15", f"ff02::1%{dev}"], stdout=subprocess.DEVNULL) for _ in range(50): neighbors = json.loads( - run(["ip", "-j", "neighbor", "show", "dev", dev], stdout=PIPE, text=True).stdout + run(["ip", "-j", "neighbor", "show", "dev", dev], stdout=subprocess.PIPE, text=True).stdout ) for neighbor in neighbors: @@ -8009,7 +8013,7 @@ def bump_image_version(config: MkosiConfig) -> None: new_version = ".".join(v[:-1] + [str(m + 1)]) print(f"Increasing last component of version by one, bumping '{config.image_version}' → '{new_version}'.") - open("mkosi.version", "w").write(new_version + "\n") + Path("mkosi.version").write_text(new_version + "\n") def expand_paths(paths: Sequence[str]) -> List[Path]: diff --git a/tests/test_backend.py b/tests/test_backend.py index 0bae4af2b..3e5dd0517 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -3,8 +3,7 @@ import os from pathlib import Path -import mkosi.backend as backend -from mkosi.backend import Distribution, PackageType, set_umask +from mkosi.backend import Distribution, PackageType, PartitionTable, set_umask, workspace def test_distribution() -> None: @@ -27,21 +26,21 @@ def test_set_umask() -> None: def test_workspace() -> None: - assert backend.workspace(Path("/home/folder/mkosi/mkosi")) == Path("/home/folder/mkosi") - assert backend.workspace(Path("/home/../home/folder/mkosi/mkosi")) == Path("/home/../home/folder/mkosi") - assert backend.workspace(Path("/")) == Path("/") - assert backend.workspace(Path()) == Path() + assert workspace(Path("/home/folder/mkosi/mkosi")) == Path("/home/folder/mkosi") + assert workspace(Path("/home/../home/folder/mkosi/mkosi")) == Path("/home/../home/folder/mkosi") + assert workspace(Path("/")) == Path("/") + assert workspace(Path()) == Path() def test_footer_size() -> None: - table = backend.PartitionTable() + table = PartitionTable() assert table.footer_size() == 16896 assert table.footer_size(max_partitions=64) == 8704 assert table.footer_size(max_partitions=1) == 1024 assert table.footer_size(max_partitions=0) == 512 def test_first_partition_offset() -> None: - table = backend.PartitionTable() + table = PartitionTable() table.grain = 4096 # Grain = 4096, first_lba = None. @@ -75,7 +74,7 @@ def test_first_partition_offset() -> None: def test_last_partition_offset() -> None: - table = backend.PartitionTable() + table = PartitionTable() table.grain = 4096 table.last_partition_sector = 32 @@ -96,7 +95,7 @@ def test_last_partition_offset() -> None: def test_disk_size() -> None: - table = backend.PartitionTable() + table = PartitionTable() table.grain = 4096 table.last_partition_sector = 0 table.first_lba = 64 diff --git a/tests/test_config_parser.py b/tests/test_config_parser.py index 6d51e3ff3..74ed847b7 100644 --- a/tests/test_config_parser.py +++ b/tests/test_config_parser.py @@ -204,7 +204,7 @@ class MkosiConfig: fname = f"{prio:03d}_{fname}" config_parser = configparser.RawConfigParser() # This is still an open issue on: https://github.com/python/mypy/issues/2427 - config_parser.optionxform = lambda optionstr: str(optionstr) # type: ignore + config_parser.optionxform = str # type: ignore # Replace lists in dict before calling config_parser write file config_all_normalized = copy.deepcopy(config) @@ -812,7 +812,7 @@ class MkosiConfigIniLists1(MkosiConfigOne): ini_lines = [ "[Content]", "Packages=[ vim,!vi", - " ca-certificates, bzip ]" "", + " ca-certificates, bzip ] ", "[Output]", "KernelCommandLine=console=ttyS1", " driver.feature=1", @@ -919,7 +919,7 @@ def test_builtin(tested_config: Any, tmpdir: Path) -> None: with change_cwd(tmpdir): if "--all" in tested_config.cli_arguments: with pytest.raises(MkosiException): - args = mkosi.parse_args(tested_config.cli_arguments) + mkosi.parse_args(tested_config.cli_arguments) else: args = mkosi.parse_args(tested_config.cli_arguments) assert tested_config == args