import argparse
import ast
import base64
-import collections
import configparser
import contextlib
import crypt
BinaryIO,
Callable,
ContextManager,
- Deque,
Dict,
Iterable,
Iterator,
PartitionTable,
SourceFileTransfer,
Verb,
+ add_packages,
chown_to_running_user,
die,
+ disable_pam_securetty,
is_centos_variant,
is_epel_variant,
is_rpm_distribution,
root_home,
run,
run_workspace_command,
+ scandir_recursive,
set_umask,
should_compress_fs,
should_compress_output,
+ sort_packages,
spawn,
tmp_dir,
warn,
)
from .manifest import Manifest
+from .mounts import mount, mount_api_vfs, mount_bind, mount_overlay, mount_tmpfs
from .syscall import blkpg_add_partition, blkpg_del_partition, reflink
complete_step = MkosiPrinter.complete_step
mkfs_generic(config, "tmp", "/var/tmp", dev)
-def stat_is_whiteout(st: os.stat_result) -> bool:
- return stat.S_ISCHR(st.st_mode) and st.st_rdev == 0
-
-
-def delete_whiteout_files(path: Path) -> None:
- """Delete any char(0,0) device nodes underneath @path
-
- Overlayfs uses such files to mark "whiteouts" (files present in
- the lower layers, but removed in the upper one).
- """
-
- with complete_step("Removing overlay whiteout files…"):
- for entry in cast(Iterator[os.DirEntry[str]], scandir_recursive(path)):
- if stat_is_whiteout(entry.stat(follow_symlinks=False)):
- os.unlink(entry.path)
-
-
-@contextlib.contextmanager
-def mount(
- what: PathString,
- where: Path,
- operation: Optional[str] = None,
- options: Sequence[str] = (),
- type: Optional[str] = None,
- read_only: bool = False,
-) -> Iterator[Path]:
- os.makedirs(where, 0o755, True)
-
- if read_only:
- options = ["ro", *options]
-
- cmd: List[PathString] = ["mount", "--no-mtab"]
-
- if operation:
- cmd += [operation]
-
- cmd += [what, where]
-
- if type:
- cmd += ["--types", type]
-
- if options:
- cmd += ["--options", ",".join(options)]
-
- try:
- run(cmd)
- yield where
- finally:
- run(["umount", "--no-mtab", "--recursive", where])
-
-
def mount_loop(config: MkosiConfig, dev: Path, where: Path, read_only: bool = False) -> ContextManager[Path]:
options = []
if not config.output_format.is_squashfs():
return mount(dev, where, options=options, read_only=read_only)
-def mount_bind(what: Path, where: Optional[Path] = None) -> ContextManager[Path]:
- if where is None:
- where = what
-
- os.makedirs(what, 0o755, True)
- os.makedirs(where, 0o755, True)
- return mount(what, where, operation="--bind")
-
-
-def mount_tmpfs(where: Path) -> ContextManager[Path]:
- return mount("tmpfs", where, type="tmpfs")
-
-
-@contextlib.contextmanager
-def mount_overlay(
- base_image: Path, # the path to the mounted base image root
- root: Path, # the path to the destination image root
- read_only: bool = False,
-) -> Iterator[Path]:
- """Set up the overlay mount on `root` with `base_image` as the lower layer.
-
- Sadly the overlay cannot be mounted onto the root directly, because the
- workdir must be on the same filesystem as "upperdir", but cannot be its
- subdirectory. Thus, we set up the overlay and then bind-mount the overlay
- structure into the expected location.
- """
-
- workdir = tempfile.TemporaryDirectory(dir=root, prefix='overlayfs-workdir')
- realroot = root / 'mkosi-real-root'
-
- options = [f'lowerdir={base_image}',
- f'upperdir={realroot}',
- f'workdir={workdir.name}']
-
- try:
- overlay = mount("overlay", realroot, options=options, type="overlay", read_only=read_only)
- with workdir, overlay, mount_bind(realroot, root):
- yield root
- finally:
- with complete_step("Cleaning up overlayfs"):
- # Let's now move the contents of realroot into root
- for entry in os.scandir(realroot):
- os.rename(realroot / entry.name, root / entry.name)
- realroot.rmdir()
-
- delete_whiteout_files(root)
-
-
@contextlib.contextmanager
def mount_image(
state: MkosiState,
etc_hostname.write_text(state.config.hostname + "\n")
-@contextlib.contextmanager
-def mount_api_vfs(root: Path) -> Iterator[None]:
- subdirs = ("proc", "dev", "sys")
-
- with complete_step("Mounting API VFS…", "Unmounting API VFS…"), contextlib.ExitStack() as stack:
- for subdir in subdirs:
- stack.enter_context(mount_bind(Path("/") / subdir, root / subdir))
-
- yield
-
-
@contextlib.contextmanager
def mount_cache(state: MkosiState) -> Iterator[None]:
if state.installer is not None:
state.root.joinpath("etc/systemd/network").mkdir(mode=0o755)
-def disable_pam_securetty(root: Path) -> None:
- def _rm_securetty(line: str) -> str:
- if "pam_securetty.so" in line:
- return ""
- return line
-
- patch_file(root / "etc/pam.d/login", _rm_securetty)
-
-
def url_exists(url: str) -> bool:
req = urllib.request.Request(url, method="HEAD")
try:
os.chmod(path, st.st_mode | stat.S_IEXEC)
-def add_packages(
- config: MkosiConfig, packages: Set[str], *names: str, conditional: Optional[str] = None
-) -> None:
-
- """Add packages in @names to @packages, if enabled by --base-packages.
-
- If @conditional is specified, rpm-specific syntax for boolean
- dependencies will be used to include @names if @conditional is
- satisfied.
- """
- assert config.base_packages is True or config.base_packages is False or config.base_packages == "conditional"
-
- if config.base_packages is True or (config.base_packages == "conditional" and conditional):
- for name in names:
- packages.add(f"({name} if {conditional})" if conditional else name)
-
-
-def sort_packages(packages: Iterable[str]) -> List[str]:
- """Sorts packages: normal first, paths second, conditional third"""
-
- m = {"(": 2, "/": 1}
- sort = lambda name: (m.get(name[0], 0), name)
- return sorted(packages, key=sort)
-
-
def make_rpm_list(state: MkosiState, packages: Set[str]) -> Set[str]:
packages = packages.copy()
return f
-def scandir_recursive(
- root: Path,
- filter: Optional[Callable[[os.DirEntry[str]], T]] = None,
-) -> Iterator[T]:
- """Recursively walk the tree starting at @root, optionally apply filter, yield non-none values"""
- queue: Deque[Union[str, Path]] = collections.deque([root])
-
- while queue:
- for entry in os.scandir(queue.pop()):
- pred = filter(entry) if filter is not None else entry
- if pred is not None:
- yield cast(T, pred)
- if entry.is_dir(follow_symlinks=False):
- queue.append(entry.path)
-
-
def find_files(root: Path) -> Iterator[Path]:
"""Generate a list of all filepaths relative to @root"""
yield from scandir_recursive(root,
from __future__ import annotations
import argparse
+import collections
import contextlib
import dataclasses
import enum
TYPE_CHECKING,
Any,
Callable,
+ Deque,
Dict,
+ Iterable,
Iterator,
List,
Mapping,
Sequence,
Set,
Type,
+ TypeVar,
Union,
cast,
)
block_reread_partition_table,
)
-
-
-
+T = TypeVar("T")
+V = TypeVar("V")
PathString = Union[Path, str]
raise MkosiException(f"Attempted path traversal in tar file {tar.name!r}") from e
tar.extractall(path, numeric_owner=numeric_owner)
+
+
+complete_step = MkosiPrinter.complete_step
+
+
+def disable_pam_securetty(root: Path) -> None:
+ def _rm_securetty(line: str) -> str:
+ if "pam_securetty.so" in line:
+ return ""
+ return line
+
+ patch_file(root / "etc/pam.d/login", _rm_securetty)
+
+
+def add_packages(
+ config: MkosiConfig, packages: Set[str], *names: str, conditional: Optional[str] = None
+) -> None:
+
+ """Add packages in @names to @packages, if enabled by --base-packages.
+
+ If @conditional is specified, rpm-specific syntax for boolean
+ dependencies will be used to include @names if @conditional is
+ satisfied.
+ """
+ assert config.base_packages is True or config.base_packages is False or config.base_packages == "conditional"
+
+ if config.base_packages is True or (config.base_packages == "conditional" and conditional):
+ for name in names:
+ packages.add(f"({name} if {conditional})" if conditional else name)
+
+
+def sort_packages(packages: Iterable[str]) -> List[str]:
+ """Sorts packages: normal first, paths second, conditional third"""
+
+ m = {"(": 2, "/": 1}
+ sort = lambda name: (m.get(name[0], 0), name)
+ return sorted(packages, key=sort)
+
+
+def scandir_recursive(
+ root: Path,
+ filter: Optional[Callable[[os.DirEntry[str]], T]] = None,
+) -> Iterator[T]:
+ """Recursively walk the tree starting at @root, optionally apply filter, yield non-none values"""
+ queue: Deque[Union[str, Path]] = collections.deque([root])
+
+ while queue:
+ for entry in os.scandir(queue.pop()):
+ pred = filter(entry) if filter is not None else entry
+ if pred is not None:
+ yield cast(T, pred)
+ if entry.is_dir(follow_symlinks=False):
+ queue.append(entry.path)
--- /dev/null
+# SPDX-License-Identifier: LGPL-2.1+
+
+import contextlib
+import os
+import stat
+import tempfile
+from pathlib import Path
+from typing import ContextManager, Iterator, List, Optional, Sequence, Union, cast
+
+from .backend import complete_step, run, scandir_recursive
+
+PathString = Union[Path, str]
+
+
+def stat_is_whiteout(st: os.stat_result) -> bool:
+ return stat.S_ISCHR(st.st_mode) and st.st_rdev == 0
+
+
+def delete_whiteout_files(path: Path) -> None:
+ """Delete any char(0,0) device nodes underneath @path
+
+ Overlayfs uses such files to mark "whiteouts" (files present in
+ the lower layers, but removed in the upper one).
+ """
+
+ with complete_step("Removing overlay whiteout files…"):
+ for entry in cast(Iterator[os.DirEntry[str]], scandir_recursive(path)):
+ if stat_is_whiteout(entry.stat(follow_symlinks=False)):
+ os.unlink(entry.path)
+
+
+@contextlib.contextmanager
+def mount(
+ what: PathString,
+ where: Path,
+ operation: Optional[str] = None,
+ options: Sequence[str] = (),
+ type: Optional[str] = None,
+ read_only: bool = False,
+) -> Iterator[Path]:
+ os.makedirs(where, 0o755, True)
+
+ if read_only:
+ options = ["ro", *options]
+
+ cmd: List[PathString] = ["mount", "--no-mtab"]
+
+ if operation:
+ cmd += [operation]
+
+ cmd += [what, where]
+
+ if type:
+ cmd += ["--types", type]
+
+ if options:
+ cmd += ["--options", ",".join(options)]
+
+ try:
+ run(cmd)
+ yield where
+ finally:
+ run(["umount", "--no-mtab", "--recursive", where])
+
+
+def mount_bind(what: Path, where: Optional[Path] = None) -> ContextManager[Path]:
+ if where is None:
+ where = what
+
+ os.makedirs(what, 0o755, True)
+ os.makedirs(where, 0o755, True)
+ return mount(what, where, operation="--bind")
+
+
+def mount_tmpfs(where: Path) -> ContextManager[Path]:
+ return mount("tmpfs", where, type="tmpfs")
+
+
+@contextlib.contextmanager
+def mount_overlay(
+ base_image: Path, # the path to the mounted base image root
+ root: Path, # the path to the destination image root
+ read_only: bool = False,
+) -> Iterator[Path]:
+ """Set up the overlay mount on `root` with `base_image` as the lower layer.
+
+ Sadly the overlay cannot be mounted onto the root directly, because the
+ workdir must be on the same filesystem as "upperdir", but cannot be its
+ subdirectory. Thus, we set up the overlay and then bind-mount the overlay
+ structure into the expected location.
+ """
+
+ workdir = tempfile.TemporaryDirectory(dir=root, prefix='overlayfs-workdir')
+ realroot = root / 'mkosi-real-root'
+
+ options = [f'lowerdir={base_image}',
+ f'upperdir={realroot}',
+ f'workdir={workdir.name}']
+
+ try:
+ overlay = mount("overlay", realroot, options=options, type="overlay", read_only=read_only)
+ with workdir, overlay, mount_bind(realroot, root):
+ yield root
+ finally:
+ with complete_step("Cleaning up overlayfs"):
+ # Let's now move the contents of realroot into root
+ for entry in os.scandir(realroot):
+ os.rename(realroot / entry.name, root / entry.name)
+ realroot.rmdir()
+
+ delete_whiteout_files(root)
+
+
+@contextlib.contextmanager
+def mount_api_vfs(root: Path) -> Iterator[None]:
+ subdirs = ("proc", "dev", "sys")
+
+ with complete_step("Mounting API VFS…", "Unmounting API VFS…"), contextlib.ExitStack() as stack:
+ for subdir in subdirs:
+ stack.enter_context(mount_bind(Path("/") / subdir, root / subdir))
+
+ yield