]> git.ipfire.org Git - thirdparty/mkosi.git/commitdiff
Soothe some CodeQL warnings
authorJoerg Behrmann <behrmann@physik.fu-berlin.de>
Wed, 21 Sep 2022 15:02:47 +0000 (17:02 +0200)
committerDaan De Meyer <daan.j.demeyer@gmail.com>
Wed, 21 Sep 2022 16:43:06 +0000 (18:43 +0200)
mkosi/__init__.py
tests/test_backend.py
tests/test_config_parser.py

index 472934252e976c8fa5f9e2dea418b1c3c0eba5bc..9561e7bf3e88ff9f71214ecc5692abec10be2538 100644 (file)
@@ -34,14 +34,12 @@ import string
 import subprocess
 import sys
 import tempfile
-import textwrap
 import time
 import urllib.parse
 import urllib.request
 import uuid
 from pathlib import Path
-from subprocess import DEVNULL, PIPE
-from textwrap import dedent
+from textwrap import dedent, wrap
 from typing import (
     IO,
     TYPE_CHECKING,
@@ -677,12 +675,14 @@ def btrfs_subvol_create(path: Path, mode: int = 0o755) -> None:
 
 def btrfs_subvol_delete(path: Path) -> None:
     # Extract the path of the subvolume relative to the filesystem
-    c = run(["btrfs", "subvol", "show", path], stdout=PIPE, stderr=DEVNULL, text=True)
+    c = run(["btrfs", "subvol", "show", path],
+            stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True)
     subvol_path = c.stdout.splitlines()[0]
     # Make the subvolume RW again if it was set RO by btrfs_subvol_delete
     run(["btrfs", "property", "set", path, "ro", "false"])
     # Recursively delete the direct children of the subvolume
-    c = run(["btrfs", "subvol", "list", "-o", path], stdout=PIPE, stderr=DEVNULL, text=True)
+    c = run(["btrfs", "subvol", "list", "-o", path],
+            stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True)
     for line in c.stdout.splitlines():
         if not line:
             continue
@@ -690,7 +690,8 @@ def btrfs_subvol_delete(path: Path) -> None:
         child_path = path / cast(str, os.path.relpath(child_subvol_path, subvol_path))
         btrfs_subvol_delete(child_path)
     # Delete the subvolume now that all its descendants have been deleted
-    run(["btrfs", "subvol", "delete", path], stdout=DEVNULL, stderr=DEVNULL)
+    run(["btrfs", "subvol", "delete", path],
+        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
 
 
 def btrfs_subvol_make_ro(path: Path, b: bool = True) -> None:
@@ -720,7 +721,8 @@ def is_generated_root(config: Union[argparse.Namespace, MkosiConfig]) -> bool:
 def disable_cow(path: PathString) -> None:
     """Disable copy-on-write if applicable on filesystem"""
 
-    run(["chattr", "+C", path], stdout=DEVNULL, stderr=DEVNULL, check=False)
+    run(["chattr", "+C", path],
+        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False)
 
 
 def root_partition_description(
@@ -947,7 +949,7 @@ def flock(file: BinaryIO) -> Iterator[None]:
 @contextlib.contextmanager
 def get_loopdev(f: BinaryIO) -> Iterator[BinaryIO]:
     with complete_step(f"Attaching {f.name} as loopback…", "Detaching {}") as output:
-        c = run(["losetup", "--find", "--show", "--partscan", f.name], stdout=PIPE, text=True)
+        c = run(["losetup", "--find", "--show", "--partscan", f.name], stdout=subprocess.PIPE, text=True)
         loopdev = Path(c.stdout.strip())
         output += [loopdev]
 
@@ -2400,7 +2402,8 @@ def install_centos_variant(state: MkosiState) -> None:
 
 
 def debootstrap_knows_arg(arg: str) -> bool:
-    return bytes("invalid option", "UTF-8") not in run(["debootstrap", arg], stdout=PIPE, check=False).stdout
+    return bytes("invalid option", "UTF-8") not in run(["debootstrap", arg],
+                                                       stdout=subprocess.PIPE, check=False).stdout
 
 
 @contextlib.contextmanager
@@ -2469,7 +2472,7 @@ def add_apt_auxiliary_repos(state: MkosiState, repos: Set[str]) -> None:
 
 
 def add_apt_package_if_exists(state: MkosiState, extra_packages: Set[str], package: str) -> None:
-    if invoke_apt(state, "cache", "search", ["--names-only", f"^{package}$"], stdout=PIPE).stdout.strip() != "":
+    if invoke_apt(state, "cache", "search", ["--names-only", f"^{package}$"], stdout=subprocess.PIPE).stdout.strip():
         add_packages(state.config, extra_packages, package)
 
 
@@ -3267,7 +3270,7 @@ def copy_git_files(src: Path, dest: Path, *, source_file_transfer: SourceFileTra
 
     uid = int(os.getenv("SUDO_UID", 0))
 
-    c = run(["git", "-C", src, "ls-files", "-z", *what_files], stdout=PIPE, text=False, user=uid)
+    c = run(["git", "-C", src, "ls-files", "-z", *what_files], stdout=subprocess.PIPE, text=False, user=uid)
     files: Set[str] = {x.decode("utf-8") for x in c.stdout.rstrip(b"\0").split(b"\0")}
 
     # Add the .git/ directory in as well.
@@ -3280,7 +3283,7 @@ def copy_git_files(src: Path, dest: Path, *, source_file_transfer: SourceFileTra
                 files.add(fr)
 
     # Get submodule files
-    c = run(["git", "-C", src, "submodule", "status", "--recursive"], stdout=PIPE, text=True, user=uid)
+    c = run(["git", "-C", src, "submodule", "status", "--recursive"], stdout=subprocess.PIPE, text=True, user=uid)
     submodules = {x.split()[1] for x in c.stdout.splitlines()}
 
     # workaround for git ls-files returning the path of submodules that we will
@@ -3291,7 +3294,7 @@ def copy_git_files(src: Path, dest: Path, *, source_file_transfer: SourceFileTra
         sm = Path(sm)
         c = run(
             ["git", "-C", src / sm, "ls-files", "-z"] + what_files,
-            stdout=PIPE,
+            stdout=subprocess.PIPE,
             text=False,
             user=uid,
         )
@@ -3699,7 +3702,7 @@ def make_verity(state: MkosiState, dev: Optional[Path]) -> Tuple[Optional[Binary
 
     with complete_step("Generating verity hashes…"):
         f: BinaryIO = cast(BinaryIO, tempfile.NamedTemporaryFile(dir=state.config.output.parent, prefix=".mkosi-"))
-        c = run(["veritysetup", "format", dev, f.name], stdout=PIPE)
+        c = run(["veritysetup", "format", dev, f.name], stdout=subprocess.PIPE)
 
         for line in c.stdout.decode("utf-8").split("\n"):
             if line.startswith("Root hash:"):
@@ -4700,7 +4703,6 @@ class BooleanAction(argparse.Action):
         values: Union[str, Sequence[Any], None, bool],
         option_string: Optional[str] = None,
     ) -> None:
-        new_value = self.default
         if isinstance(values, str):
             try:
                 new_value = parse_boolean(values)
@@ -4783,12 +4785,10 @@ class CustomHelpFormatter(argparse.HelpFormatter):
         """
         lines = text.splitlines()
         subindent = '    ' if lines[0].endswith(':') else ''
-        return list(itertools.chain.from_iterable(
-            textwrap.wrap(line, width,
-                          break_long_words=False,
-                          break_on_hyphens=False,
-                          subsequent_indent=subindent)
-            for line in lines))
+        return list(itertools.chain.from_iterable(wrap(line, width,
+                                                       break_long_words=False, break_on_hyphens=False,
+                                                       subsequent_indent=subindent) for line in lines))
+
 
 class ArgumentParserMkosi(argparse.ArgumentParser):
     """ArgumentParser with support for mkosi configuration file(s)
@@ -5935,7 +5935,8 @@ def unlink_try_hard(path: Optional[PathString]) -> None:
 
     path = Path(path)
     try:
-        return path.unlink()
+        path.unlink()
+        return
     except FileNotFoundError:
         return
     except Exception:
@@ -6965,7 +6966,7 @@ def setup_ssh(state: MkosiState, cached: bool) -> Optional[TextIO]:
                 ["ssh-keygen", "-f", f.name, "-N", state.config.password or "", "-C", "mkosi", "-t", "ed25519"],
                 input="y\n",
                 text=True,
-                stdout=DEVNULL,
+                stdout=subprocess.DEVNULL,
             )
 
         copy_file(f"{f.name}.pub", authorized_keys)
@@ -7834,7 +7835,9 @@ def run_qemu(config: MkosiConfig) -> None:
 
 
 def interface_exists(dev: str) -> bool:
-    return run(["ip", "link", "show", dev], stdout=DEVNULL, stderr=DEVNULL, check=False).returncode == 0
+    rc = run(["ip", "link", "show", dev],
+             stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False).returncode
+    return rc == 0
 
 
 def find_address(config: MkosiConfig) -> Tuple[str, str]:
@@ -7854,7 +7857,8 @@ def find_address(config: MkosiConfig) -> Tuple[str, str]:
             else:
                 die(f"Container/VM interface ve-{name}/vt-{name} not found")
 
-            link = json.loads(run(["ip", "-j", "link", "show", "dev", dev], stdout=PIPE, text=True).stdout)[0]
+            link = json.loads(run(["ip", "-j", "link", "show", "dev", dev],
+                                  stdout=subprocess.PIPE, text=True).stdout)[0]
             if link["operstate"] == "DOWN":
                 raise MkosiException(
                     f"{dev} is not enabled. Make sure systemd-networkd is running so it can manage the interface."
@@ -7862,11 +7866,11 @@ def find_address(config: MkosiConfig) -> Tuple[str, str]:
 
             # Trigger IPv6 neighbor discovery of which we can access the results via 'ip neighbor'. This allows us to
             # find out the link-local IPv6 address of the container/VM via which we can connect to it.
-            run(["ping", "-c", "1", "-w", "15", f"ff02::1%{dev}"], stdout=DEVNULL)
+            run(["ping", "-c", "1", "-w", "15", f"ff02::1%{dev}"], stdout=subprocess.DEVNULL)
 
             for _ in range(50):
                 neighbors = json.loads(
-                    run(["ip", "-j", "neighbor", "show", "dev", dev], stdout=PIPE, text=True).stdout
+                    run(["ip", "-j", "neighbor", "show", "dev", dev], stdout=subprocess.PIPE, text=True).stdout
                 )
 
                 for neighbor in neighbors:
@@ -8009,7 +8013,7 @@ def bump_image_version(config: MkosiConfig) -> None:
             new_version = ".".join(v[:-1] + [str(m + 1)])
             print(f"Increasing last component of version by one, bumping '{config.image_version}' → '{new_version}'.")
 
-    open("mkosi.version", "w").write(new_version + "\n")
+    Path("mkosi.version").write_text(new_version + "\n")
 
 
 def expand_paths(paths: Sequence[str]) -> List[Path]:
index 0bae4af2b1e09d44916e9657107b6963cf46275a..3e5dd0517548803c7456b136ef0a240925b109d7 100644 (file)
@@ -3,8 +3,7 @@
 import os
 from pathlib import Path
 
-import mkosi.backend as backend
-from mkosi.backend import Distribution, PackageType, set_umask
+from mkosi.backend import Distribution, PackageType, PartitionTable, set_umask, workspace
 
 
 def test_distribution() -> None:
@@ -27,21 +26,21 @@ def test_set_umask() -> None:
 
 
 def test_workspace() -> None:
-    assert backend.workspace(Path("/home/folder/mkosi/mkosi")) == Path("/home/folder/mkosi")
-    assert backend.workspace(Path("/home/../home/folder/mkosi/mkosi")) == Path("/home/../home/folder/mkosi")
-    assert backend.workspace(Path("/")) == Path("/")
-    assert backend.workspace(Path()) == Path()
+    assert workspace(Path("/home/folder/mkosi/mkosi")) == Path("/home/folder/mkosi")
+    assert workspace(Path("/home/../home/folder/mkosi/mkosi")) == Path("/home/../home/folder/mkosi")
+    assert workspace(Path("/")) == Path("/")
+    assert workspace(Path()) == Path()
 
 
 def test_footer_size() -> None:
-    table = backend.PartitionTable()
+    table = PartitionTable()
     assert table.footer_size() == 16896
     assert table.footer_size(max_partitions=64) == 8704
     assert table.footer_size(max_partitions=1) == 1024
     assert table.footer_size(max_partitions=0) == 512
 
 def test_first_partition_offset() -> None:
-    table = backend.PartitionTable()
+    table = PartitionTable()
     table.grain = 4096
 
     # Grain = 4096, first_lba = None.
@@ -75,7 +74,7 @@ def test_first_partition_offset() -> None:
 
 
 def test_last_partition_offset() -> None:
-    table = backend.PartitionTable()
+    table = PartitionTable()
     table.grain = 4096
 
     table.last_partition_sector = 32
@@ -96,7 +95,7 @@ def test_last_partition_offset() -> None:
 
 
 def test_disk_size() -> None:
-    table = backend.PartitionTable()
+    table = PartitionTable()
     table.grain = 4096
     table.last_partition_sector = 0
     table.first_lba = 64
index 6d51e3ff346e2aff010bd4b178d41ace86a1afd3..74ed847b7cc4aaca71cdd9b88e5562d84e77f919 100644 (file)
@@ -204,7 +204,7 @@ class MkosiConfig:
             fname = f"{prio:03d}_{fname}"
         config_parser = configparser.RawConfigParser()
         # This is still an open issue on: https://github.com/python/mypy/issues/2427
-        config_parser.optionxform = lambda optionstr: str(optionstr) # type: ignore
+        config_parser.optionxform = str # type: ignore
 
         # Replace lists in dict before calling config_parser write file
         config_all_normalized = copy.deepcopy(config)
@@ -812,7 +812,7 @@ class MkosiConfigIniLists1(MkosiConfigOne):
         ini_lines = [
             "[Content]",
             "Packages=[ vim,!vi",
-            "  ca-certificates, bzip ]" "",
+            "  ca-certificates, bzip ]  ",
             "[Output]",
             "KernelCommandLine=console=ttyS1",
             "  driver.feature=1",
@@ -919,7 +919,7 @@ def test_builtin(tested_config: Any, tmpdir: Path) -> None:
     with change_cwd(tmpdir):
         if "--all" in tested_config.cli_arguments:
             with pytest.raises(MkosiException):
-                args = mkosi.parse_args(tested_config.cli_arguments)
+                mkosi.parse_args(tested_config.cli_arguments)
         else:
             args = mkosi.parse_args(tested_config.cli_arguments)
             assert tested_config == args