]> git.ipfire.org Git - thirdparty/mkosi.git/commitdiff
Move functions out off __init__
authorJoerg Behrmann <behrmann@physik.fu-berlin.de>
Fri, 16 Sep 2022 17:30:13 +0000 (19:30 +0200)
committerJoerg Behrmann <behrmann@physik.fu-berlin.de>
Thu, 24 Nov 2022 13:55:33 +0000 (14:55 +0100)
Some are moved to backend for lack of a better place for now, most mount-related
functions are moved to a new mounts module, though some are left in __init__ for
later, when the dust settles and cyclic imports may become easier to avoid.

mkosi/__init__.py
mkosi/backend.py
mkosi/mounts.py [new file with mode: 0644]

index 99d164ab324e5b7455c216e2458c37b10f072660..20c4a82ca8e22a1c3b73413accd9568250e75044 100644 (file)
@@ -5,7 +5,6 @@ from __future__ import annotations
 import argparse
 import ast
 import base64
-import collections
 import configparser
 import contextlib
 import crypt
@@ -47,7 +46,6 @@ from typing import (
     BinaryIO,
     Callable,
     ContextManager,
-    Deque,
     Dict,
     Iterable,
     Iterator,
@@ -80,8 +78,10 @@ from .backend import (
     PartitionTable,
     SourceFileTransfer,
     Verb,
+    add_packages,
     chown_to_running_user,
     die,
+    disable_pam_securetty,
     is_centos_variant,
     is_epel_variant,
     is_rpm_distribution,
@@ -94,14 +94,17 @@ from .backend import (
     root_home,
     run,
     run_workspace_command,
+    scandir_recursive,
     set_umask,
     should_compress_fs,
     should_compress_output,
+    sort_packages,
     spawn,
     tmp_dir,
     warn,
 )
 from .manifest import Manifest
+from .mounts import mount, mount_api_vfs, mount_bind, mount_overlay, mount_tmpfs
 from .syscall import blkpg_add_partition, blkpg_del_partition, reflink
 
 complete_step = MkosiPrinter.complete_step
@@ -1419,57 +1422,6 @@ def prepare_tmp(config: MkosiConfig, dev: Optional[Path], cached: bool) -> None:
         mkfs_generic(config, "tmp", "/var/tmp", dev)
 
 
-def stat_is_whiteout(st: os.stat_result) -> bool:
-    return stat.S_ISCHR(st.st_mode) and st.st_rdev == 0
-
-
-def delete_whiteout_files(path: Path) -> None:
-    """Delete any char(0,0) device nodes underneath @path
-
-    Overlayfs uses such files to mark "whiteouts" (files present in
-    the lower layers, but removed in the upper one).
-    """
-
-    with complete_step("Removing overlay whiteout files…"):
-        for entry in cast(Iterator[os.DirEntry[str]], scandir_recursive(path)):
-            if stat_is_whiteout(entry.stat(follow_symlinks=False)):
-                os.unlink(entry.path)
-
-
-@contextlib.contextmanager
-def mount(
-        what: PathString,
-        where: Path,
-        operation: Optional[str] = None,
-        options: Sequence[str] = (),
-        type: Optional[str] = None,
-        read_only: bool = False,
-) -> Iterator[Path]:
-    os.makedirs(where, 0o755, True)
-
-    if read_only:
-        options = ["ro", *options]
-
-    cmd: List[PathString] = ["mount", "--no-mtab"]
-
-    if operation:
-        cmd += [operation]
-
-    cmd += [what, where]
-
-    if type:
-        cmd += ["--types", type]
-
-    if options:
-        cmd += ["--options", ",".join(options)]
-
-    try:
-        run(cmd)
-        yield where
-    finally:
-        run(["umount", "--no-mtab", "--recursive", where])
-
-
 def mount_loop(config: MkosiConfig, dev: Path, where: Path, read_only: bool = False) -> ContextManager[Path]:
     options = []
     if not config.output_format.is_squashfs():
@@ -1482,54 +1434,6 @@ def mount_loop(config: MkosiConfig, dev: Path, where: Path, read_only: bool = Fa
     return mount(dev, where, options=options, read_only=read_only)
 
 
-def mount_bind(what: Path, where: Optional[Path] = None) -> ContextManager[Path]:
-    if where is None:
-        where = what
-
-    os.makedirs(what, 0o755, True)
-    os.makedirs(where, 0o755, True)
-    return mount(what, where, operation="--bind")
-
-
-def mount_tmpfs(where: Path) -> ContextManager[Path]:
-    return mount("tmpfs", where, type="tmpfs")
-
-
-@contextlib.contextmanager
-def mount_overlay(
-    base_image: Path,  # the path to the mounted base image root
-    root: Path,        # the path to the destination image root
-    read_only: bool = False,
-) -> Iterator[Path]:
-    """Set up the overlay mount on `root` with `base_image` as the lower layer.
-
-    Sadly the overlay cannot be mounted onto the root directly, because the
-    workdir must be on the same filesystem as "upperdir", but cannot be its
-    subdirectory. Thus, we set up the overlay and then bind-mount the overlay
-    structure into the expected location.
-    """
-
-    workdir = tempfile.TemporaryDirectory(dir=root, prefix='overlayfs-workdir')
-    realroot = root / 'mkosi-real-root'
-
-    options = [f'lowerdir={base_image}',
-               f'upperdir={realroot}',
-               f'workdir={workdir.name}']
-
-    try:
-        overlay = mount("overlay", realroot, options=options, type="overlay", read_only=read_only)
-        with workdir, overlay, mount_bind(realroot, root):
-            yield root
-    finally:
-        with complete_step("Cleaning up overlayfs"):
-            # Let's now move the contents of realroot into root
-            for entry in os.scandir(realroot):
-                os.rename(realroot / entry.name, root / entry.name)
-            realroot.rmdir()
-
-            delete_whiteout_files(root)
-
-
 @contextlib.contextmanager
 def mount_image(
     state: MkosiState,
@@ -1625,17 +1529,6 @@ def configure_hostname(state: MkosiState, cached: bool) -> None:
             etc_hostname.write_text(state.config.hostname + "\n")
 
 
-@contextlib.contextmanager
-def mount_api_vfs(root: Path) -> Iterator[None]:
-    subdirs = ("proc", "dev", "sys")
-
-    with complete_step("Mounting API VFS…", "Unmounting API VFS…"), contextlib.ExitStack() as stack:
-        for subdir in subdirs:
-            stack.enter_context(mount_bind(Path("/") / subdir, root / subdir))
-
-        yield
-
-
 @contextlib.contextmanager
 def mount_cache(state: MkosiState) -> Iterator[None]:
     if state.installer is not None:
@@ -1778,15 +1671,6 @@ def prepare_tree(state: MkosiState, cached: bool) -> None:
             state.root.joinpath("etc/systemd/network").mkdir(mode=0o755)
 
 
-def disable_pam_securetty(root: Path) -> None:
-    def _rm_securetty(line: str) -> str:
-        if "pam_securetty.so" in line:
-            return ""
-        return line
-
-    patch_file(root / "etc/pam.d/login", _rm_securetty)
-
-
 def url_exists(url: str) -> bool:
     req = urllib.request.Request(url, method="HEAD")
     try:
@@ -1802,31 +1686,6 @@ def make_executable(path: Path) -> None:
     os.chmod(path, st.st_mode | stat.S_IEXEC)
 
 
-def add_packages(
-    config: MkosiConfig, packages: Set[str], *names: str, conditional: Optional[str] = None
-) -> None:
-
-    """Add packages in @names to @packages, if enabled by --base-packages.
-
-    If @conditional is specified, rpm-specific syntax for boolean
-    dependencies will be used to include @names if @conditional is
-    satisfied.
-    """
-    assert config.base_packages is True or config.base_packages is False or config.base_packages == "conditional"
-
-    if config.base_packages is True or (config.base_packages == "conditional" and conditional):
-        for name in names:
-            packages.add(f"({name} if {conditional})" if conditional else name)
-
-
-def sort_packages(packages: Iterable[str]) -> List[str]:
-    """Sorts packages: normal first, paths second, conditional third"""
-
-    m = {"(": 2, "/": 1}
-    sort = lambda name: (m.get(name[0], 0), name)
-    return sorted(packages, key=sort)
-
-
 def make_rpm_list(state: MkosiState, packages: Set[str]) -> Set[str]:
     packages = packages.copy()
 
@@ -3507,22 +3366,6 @@ def make_tar(state: MkosiState) -> Optional[BinaryIO]:
     return f
 
 
-def scandir_recursive(
-        root: Path,
-        filter: Optional[Callable[[os.DirEntry[str]], T]] = None,
-) -> Iterator[T]:
-    """Recursively walk the tree starting at @root, optionally apply filter, yield non-none values"""
-    queue: Deque[Union[str, Path]] = collections.deque([root])
-
-    while queue:
-        for entry in os.scandir(queue.pop()):
-            pred = filter(entry) if filter is not None else entry
-            if pred is not None:
-                yield cast(T, pred)
-            if entry.is_dir(follow_symlinks=False):
-                queue.append(entry.path)
-
-
 def find_files(root: Path) -> Iterator[Path]:
     """Generate a list of all filepaths relative to @root"""
     yield from scandir_recursive(root,
index 021a4797aef2ef5dc9ba1cacff8220aae8246e64..755607305d26925b74bfa138ae666493e98c1c4c 100644 (file)
@@ -3,6 +3,7 @@
 from __future__ import annotations
 
 import argparse
+import collections
 import contextlib
 import dataclasses
 import enum
@@ -27,7 +28,9 @@ from typing import (
     TYPE_CHECKING,
     Any,
     Callable,
+    Deque,
     Dict,
+    Iterable,
     Iterator,
     List,
     Mapping,
@@ -36,6 +39,7 @@ from typing import (
     Sequence,
     Set,
     Type,
+    TypeVar,
     Union,
     cast,
 )
@@ -47,9 +51,8 @@ from .syscall import (
     block_reread_partition_table,
 )
 
-
-
-
+T = TypeVar("T")
+V = TypeVar("V")
 PathString = Union[Path, str]
 
 
@@ -1036,3 +1039,56 @@ def safe_tar_extract(tar: tarfile.TarFile, path: Path=Path("."), *, numeric_owne
             raise MkosiException(f"Attempted path traversal in tar file {tar.name!r}") from e
 
     tar.extractall(path, numeric_owner=numeric_owner)
+
+
+complete_step = MkosiPrinter.complete_step
+
+
+def disable_pam_securetty(root: Path) -> None:
+    def _rm_securetty(line: str) -> str:
+        if "pam_securetty.so" in line:
+            return ""
+        return line
+
+    patch_file(root / "etc/pam.d/login", _rm_securetty)
+
+
+def add_packages(
+    config: MkosiConfig, packages: Set[str], *names: str, conditional: Optional[str] = None
+) -> None:
+
+    """Add packages in @names to @packages, if enabled by --base-packages.
+
+    If @conditional is specified, rpm-specific syntax for boolean
+    dependencies will be used to include @names if @conditional is
+    satisfied.
+    """
+    assert config.base_packages is True or config.base_packages is False or config.base_packages == "conditional"
+
+    if config.base_packages is True or (config.base_packages == "conditional" and conditional):
+        for name in names:
+            packages.add(f"({name} if {conditional})" if conditional else name)
+
+
+def sort_packages(packages: Iterable[str]) -> List[str]:
+    """Sorts packages: normal first, paths second, conditional third"""
+
+    m = {"(": 2, "/": 1}
+    sort = lambda name: (m.get(name[0], 0), name)
+    return sorted(packages, key=sort)
+
+
+def scandir_recursive(
+    root: Path,
+    filter: Optional[Callable[[os.DirEntry[str]], T]] = None,
+) -> Iterator[T]:
+    """Recursively walk the tree starting at @root, optionally apply filter, yield non-none values"""
+    queue: Deque[Union[str, Path]] = collections.deque([root])
+
+    while queue:
+        for entry in os.scandir(queue.pop()):
+            pred = filter(entry) if filter is not None else entry
+            if pred is not None:
+                yield cast(T, pred)
+            if entry.is_dir(follow_symlinks=False):
+                queue.append(entry.path)
diff --git a/mkosi/mounts.py b/mkosi/mounts.py
new file mode 100644 (file)
index 0000000..6b2959a
--- /dev/null
@@ -0,0 +1,122 @@
+# SPDX-License-Identifier: LGPL-2.1+
+
+import contextlib
+import os
+import stat
+import tempfile
+from pathlib import Path
+from typing import ContextManager, Iterator, List, Optional, Sequence, Union, cast
+
+from .backend import complete_step, run, scandir_recursive
+
+PathString = Union[Path, str]
+
+
+def stat_is_whiteout(st: os.stat_result) -> bool:
+    return stat.S_ISCHR(st.st_mode) and st.st_rdev == 0
+
+
+def delete_whiteout_files(path: Path) -> None:
+    """Delete any char(0,0) device nodes underneath @path
+
+    Overlayfs uses such files to mark "whiteouts" (files present in
+    the lower layers, but removed in the upper one).
+    """
+
+    with complete_step("Removing overlay whiteout files…"):
+        for entry in cast(Iterator[os.DirEntry[str]], scandir_recursive(path)):
+            if stat_is_whiteout(entry.stat(follow_symlinks=False)):
+                os.unlink(entry.path)
+
+
+@contextlib.contextmanager
+def mount(
+        what: PathString,
+        where: Path,
+        operation: Optional[str] = None,
+        options: Sequence[str] = (),
+        type: Optional[str] = None,
+        read_only: bool = False,
+) -> Iterator[Path]:
+    os.makedirs(where, 0o755, True)
+
+    if read_only:
+        options = ["ro", *options]
+
+    cmd: List[PathString] = ["mount", "--no-mtab"]
+
+    if operation:
+        cmd += [operation]
+
+    cmd += [what, where]
+
+    if type:
+        cmd += ["--types", type]
+
+    if options:
+        cmd += ["--options", ",".join(options)]
+
+    try:
+        run(cmd)
+        yield where
+    finally:
+        run(["umount", "--no-mtab", "--recursive", where])
+
+
+def mount_bind(what: Path, where: Optional[Path] = None) -> ContextManager[Path]:
+    if where is None:
+        where = what
+
+    os.makedirs(what, 0o755, True)
+    os.makedirs(where, 0o755, True)
+    return mount(what, where, operation="--bind")
+
+
+def mount_tmpfs(where: Path) -> ContextManager[Path]:
+    return mount("tmpfs", where, type="tmpfs")
+
+
+@contextlib.contextmanager
+def mount_overlay(
+    base_image: Path,  # the path to the mounted base image root
+    root: Path,        # the path to the destination image root
+    read_only: bool = False,
+) -> Iterator[Path]:
+    """Set up the overlay mount on `root` with `base_image` as the lower layer.
+
+    Sadly the overlay cannot be mounted onto the root directly, because the
+    workdir must be on the same filesystem as "upperdir", but cannot be its
+    subdirectory. Thus, we set up the overlay and then bind-mount the overlay
+    structure into the expected location.
+    """
+
+    workdir = tempfile.TemporaryDirectory(dir=root, prefix='overlayfs-workdir')
+    realroot = root / 'mkosi-real-root'
+
+    options = [f'lowerdir={base_image}',
+               f'upperdir={realroot}',
+               f'workdir={workdir.name}']
+
+    try:
+        overlay = mount("overlay", realroot, options=options, type="overlay", read_only=read_only)
+        with workdir, overlay, mount_bind(realroot, root):
+            yield root
+    finally:
+        with complete_step("Cleaning up overlayfs"):
+            # Let's now move the contents of realroot into root
+            for entry in os.scandir(realroot):
+                os.rename(realroot / entry.name, root / entry.name)
+            realroot.rmdir()
+
+            delete_whiteout_files(root)
+
+
+@contextlib.contextmanager
+def mount_api_vfs(root: Path) -> Iterator[None]:
+    subdirs = ("proc", "dev", "sys")
+
+    with complete_step("Mounting API VFS…", "Unmounting API VFS…"), contextlib.ExitStack() as stack:
+        for subdir in subdirs:
+            stack.enter_context(mount_bind(Path("/") / subdir, root / subdir))
+
+        yield