]> git.ipfire.org Git - thirdparty/mkosi.git/commitdiff
Start moving functions that copy files into the image into a new install module
authorJoerg Behrmann <behrmann@physik.fu-berlin.de>
Sun, 18 Sep 2022 15:20:54 +0000 (17:20 +0200)
committerJoerg Behrmann <behrmann@physik.fu-berlin.de>
Thu, 24 Nov 2022 14:51:33 +0000 (15:51 +0100)
mkosi/__init__.py
mkosi/gentoo.py
mkosi/install.py [new file with mode: 0644]
tests/test_init.py
tests/test_install.py [new file with mode: 0644]

index 14d5a4a791669729b806200189b1cfb989e07033..c5305375122beb4181c71476f4a79c31361245a3 100644 (file)
@@ -19,7 +19,7 @@ import getpass
 import glob
 import hashlib
 import http.server
-import importlib.resources
+import importlib
 import itertools
 import json
 import math
@@ -28,7 +28,6 @@ import platform
 import re
 import shlex
 import shutil
-import stat
 import string
 import subprocess
 import sys
@@ -103,9 +102,19 @@ from .backend import (
     tmp_dir,
     warn,
 )
+from .install import (
+    add_dropin_config,
+    add_dropin_config_from_resource,
+    copy_file,
+    copy_file_object,
+    copy_path,
+    install_skeleton_trees,
+    open_close,
+    write_resource,
+)
 from .manifest import Manifest
 from .mounts import mount, mount_api_vfs, mount_bind, mount_overlay, mount_tmpfs
-from .syscall import blkpg_add_partition, blkpg_del_partition, reflink
+from .syscall import blkpg_add_partition, blkpg_del_partition
 
 complete_step = MkosiPrinter.complete_step
 color_error = MkosiPrinter.color_error
@@ -154,33 +163,6 @@ DRACUT_SYSTEMD_EXTRAS = [
 ]
 
 
-def write_resource(
-        where: Path, resource: str, key: str, *, executable: bool = False, mode: Optional[int] = None
-) -> None:
-    text = importlib.resources.read_text(resource, key)
-    where.write_text(text)
-    if mode is not None:
-        where.chmod(mode)
-    elif executable:
-        make_executable(where)
-
-
-def add_dropin_config(root: Path, unit: str, name: str, content: str) -> None:
-    """Add a dropin config `name.conf` in /etc/systemd/system for `unit`."""
-    dropin = root / f"etc/systemd/system/{unit}.d/{name}.conf"
-    dropin.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
-    dropin.write_text(dedent(content))
-    dropin.chmod(0o644)
-
-
-def add_dropin_config_from_resource(
-    root: Path, unit: str, name: str, resource: str, key: str
-) -> None:
-    dropin = root / f"etc/systemd/system/{unit}.d/{name}.conf"
-    dropin.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
-    write_resource(dropin, resource, key, mode=0o644)
-
-
 T = TypeVar("T")
 V = TypeVar("V")
 
@@ -593,93 +575,6 @@ def format_bytes(num_bytes: int) -> str:
     return f"{num_bytes}B"
 
 
-@contextlib.contextmanager
-def open_close(path: PathString, flags: int, mode: int = 0o664) -> Iterator[int]:
-    fd = os.open(path, flags | os.O_CLOEXEC, mode)
-    try:
-        yield fd
-    finally:
-        os.close(fd)
-
-
-def copy_fd(oldfd: int, newfd: int) -> None:
-    try:
-        reflink(oldfd, newfd)
-    except OSError as e:
-        if e.errno not in {errno.EXDEV, errno.EOPNOTSUPP, errno.ENOTTY}:
-            raise
-        # While mypy handles this correctly, Pyright doesn't yet.
-        shutil.copyfileobj(open(oldfd, "rb", closefd=False), cast(Any, open(newfd, "wb", closefd=False)))
-
-
-def copy_file_object(oldobject: BinaryIO, newobject: BinaryIO) -> None:
-    try:
-        reflink(oldobject.fileno(), newobject.fileno())
-    except OSError as e:
-        if e.errno not in {errno.EXDEV, errno.EOPNOTSUPP, errno.ENOTTY}:
-            raise
-        shutil.copyfileobj(oldobject, newobject)
-        newobject.flush()
-
-
-def copy_file(oldpath: PathString, newpath: PathString) -> None:
-    oldpath = Path(oldpath)
-    newpath = Path(newpath)
-
-    if oldpath.is_symlink():
-        src = os.readlink(oldpath)  # TODO: use oldpath.readlink() with python3.9+
-        newpath.symlink_to(src)
-        return
-
-    with open_close(oldpath, os.O_RDONLY) as oldfd:
-        st = os.stat(oldfd)
-
-        try:
-            with open_close(newpath, os.O_WRONLY | os.O_CREAT | os.O_EXCL, st.st_mode) as newfd:
-                copy_fd(oldfd, newfd)
-        except FileExistsError:
-            newpath.unlink()
-            with open_close(newpath, os.O_WRONLY | os.O_CREAT, st.st_mode) as newfd:
-                copy_fd(oldfd, newfd)
-    shutil.copystat(oldpath, newpath, follow_symlinks=False)
-
-
-def symlink_f(target: str, path: Path) -> None:
-    try:
-        path.symlink_to(target)
-    except FileExistsError:
-        os.unlink(path)
-        path.symlink_to(target)
-
-
-def copy_path(oldpath: PathString, newpath: Path, *, copystat: bool = True) -> None:
-    try:
-        newpath.mkdir(exist_ok=True)
-    except FileExistsError:
-        # something that is not a directory already exists
-        newpath.unlink()
-        newpath.mkdir()
-
-    for entry in os.scandir(oldpath):
-        newentry = newpath / entry.name
-        if entry.is_dir(follow_symlinks=False):
-            copy_path(entry.path, newentry)
-        elif entry.is_symlink():
-            target = os.readlink(entry.path)
-            symlink_f(target, newentry)
-            shutil.copystat(entry.path, newentry, follow_symlinks=False)
-        else:
-            st = entry.stat(follow_symlinks=False)
-            if stat.S_ISREG(st.st_mode):
-                copy_file(entry.path, newentry)
-            else:
-                print("Ignoring", entry.path)
-                continue
-
-    if copystat:
-        shutil.copystat(oldpath, newpath, follow_symlinks=True)
-
-
 @complete_step("Detaching namespace")
 def init_namespace() -> None:
     unshare(CLONE_NEWNS)
@@ -1679,11 +1574,6 @@ def url_exists(url: str) -> bool:
     return False
 
 
-def make_executable(path: Path) -> None:
-    st = path.stat()
-    os.chmod(path, st.st_mode | stat.S_IEXEC)
-
-
 def make_rpm_list(state: MkosiState, packages: Set[str]) -> Set[str]:
     packages = packages.copy()
 
@@ -2724,6 +2614,7 @@ def install_distribution(state: MkosiState, cached: bool) -> None:
     # installation has completed.
     link_rpm_db(state.root)
 
+
 def remove_packages(state: MkosiState) -> None:
     """Remove packages listed in config.remove_packages"""
 
@@ -2999,26 +2890,6 @@ def install_extra_trees(state: MkosiState) -> None:
                 shutil.unpack_archive(cast(str, tree), state.root)
 
 
-def install_skeleton_trees(state: MkosiState, cached: bool, *, late: bool=False) -> None:
-    if not state.config.skeleton_trees:
-        return
-
-    if cached:
-        return
-
-    if not late and state.config.distribution in (Distribution.debian, Distribution.ubuntu):
-        return
-
-    with complete_step("Copying in skeleton file trees…"):
-        for tree in state.config.skeleton_trees:
-            if tree.is_dir():
-                copy_path(tree, state.root, copystat=False)
-            else:
-                # unpack_archive() groks Paths, but mypy doesn't know this.
-                # Pretend that tree is a str.
-                shutil.unpack_archive(cast(str, tree), state.root)
-
-
 def copy_git_files(src: Path, dest: Path, *, source_file_transfer: SourceFileTransfer) -> None:
     what_files = ["--exclude-standard", "--cached"]
     if source_file_transfer == SourceFileTransfer.copy_git_others:
index e8fb1696027dce7d3a992f9a7e7ce4665faa98d9..5b5864db8f2e8c485d4112469240598144963001 100644 (file)
@@ -11,7 +11,7 @@ from pathlib import Path
 from textwrap import dedent
 from typing import Dict, Generator, List, Sequence
 
-from . import copy_path, open_close, unlink_try_hard
+from . import unlink_try_hard
 from .backend import (
     ARG_DEBUG,
     MkosiConfig,
@@ -25,6 +25,7 @@ from .backend import (
     run_workspace_command,
     safe_tar_extract,
 )
+from .install import copy_path, open_close
 
 ARCHITECTURES = {
     "x86_64": ("amd64", "arch/x86/boot/bzImage"),
diff --git a/mkosi/install.py b/mkosi/install.py
new file mode 100644 (file)
index 0000000..9a2c3b8
--- /dev/null
@@ -0,0 +1,153 @@
+# SPDX-License-Identifier: LGPL-2.1+
+
+import contextlib
+import errno
+import importlib.resources
+import os
+import shutil
+import stat
+from pathlib import Path
+from textwrap import dedent
+from typing import Any, BinaryIO, Iterator, Optional, cast
+
+from .backend import Distribution, MkosiState, PathString, complete_step
+from .syscall import reflink
+
+
+def make_executable(path: Path) -> None:
+    st = path.stat()
+    os.chmod(path, st.st_mode | stat.S_IEXEC)
+
+
+def write_resource(
+    where: Path, resource: str, key: str, *, executable: bool = False, mode: Optional[int] = None
+) -> None:
+    text = importlib.resources.read_text(resource, key)
+    where.write_text(text)
+    if mode is not None:
+        where.chmod(mode)
+    elif executable:
+        make_executable(where)
+
+
+def add_dropin_config(root: Path, unit: str, name: str, content: str) -> None:
+    """Add a dropin config `name.conf` in /etc/systemd/system for `unit`."""
+    dropin = root / f"etc/systemd/system/{unit}.d/{name}.conf"
+    dropin.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
+    dropin.write_text(dedent(content))
+    dropin.chmod(0o644)
+
+
+def add_dropin_config_from_resource(
+    root: Path, unit: str, name: str, resource: str, key: str
+) -> None:
+    dropin = root / f"etc/systemd/system/{unit}.d/{name}.conf"
+    dropin.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
+    write_resource(dropin, resource, key, mode=0o644)
+
+
+@contextlib.contextmanager
+def open_close(path: PathString, flags: int, mode: int = 0o664) -> Iterator[int]:
+    fd = os.open(path, flags | os.O_CLOEXEC, mode)
+    try:
+        yield fd
+    finally:
+        os.close(fd)
+
+
+def copy_fd(oldfd: int, newfd: int) -> None:
+    try:
+        reflink(oldfd, newfd)
+    except OSError as e:
+        if e.errno not in {errno.EXDEV, errno.EOPNOTSUPP, errno.ENOTTY}:
+            raise
+        # While mypy handles this correctly, Pyright doesn't yet.
+        shutil.copyfileobj(open(oldfd, "rb", closefd=False), cast(Any, open(newfd, "wb", closefd=False)))
+
+
+def copy_file_object(oldobject: BinaryIO, newobject: BinaryIO) -> None:
+    try:
+        reflink(oldobject.fileno(), newobject.fileno())
+    except OSError as e:
+        if e.errno not in {errno.EXDEV, errno.EOPNOTSUPP, errno.ENOTTY}:
+            raise
+        shutil.copyfileobj(oldobject, newobject)
+        newobject.flush()
+
+
+def copy_file(oldpath: PathString, newpath: PathString) -> None:
+    oldpath = Path(oldpath)
+    newpath = Path(newpath)
+
+    if oldpath.is_symlink():
+        src = os.readlink(oldpath)  # TODO: use oldpath.readlink() with python3.9+
+        newpath.symlink_to(src)
+        return
+
+    with open_close(oldpath, os.O_RDONLY) as oldfd:
+        st = os.stat(oldfd)
+
+        try:
+            with open_close(newpath, os.O_WRONLY | os.O_CREAT | os.O_EXCL, st.st_mode) as newfd:
+                copy_fd(oldfd, newfd)
+        except FileExistsError:
+            newpath.unlink()
+            with open_close(newpath, os.O_WRONLY | os.O_CREAT, st.st_mode) as newfd:
+                copy_fd(oldfd, newfd)
+    shutil.copystat(oldpath, newpath, follow_symlinks=False)
+
+
+def symlink_f(target: str, path: Path) -> None:
+    try:
+        path.symlink_to(target)
+    except FileExistsError:
+        os.unlink(path)
+        path.symlink_to(target)
+
+
+def copy_path(oldpath: PathString, newpath: Path, *, copystat: bool = True) -> None:
+    try:
+        newpath.mkdir(exist_ok=True)
+    except FileExistsError:
+        # something that is not a directory already exists
+        newpath.unlink()
+        newpath.mkdir()
+
+    for entry in os.scandir(oldpath):
+        newentry = newpath / entry.name
+        if entry.is_dir(follow_symlinks=False):
+            copy_path(entry.path, newentry)
+        elif entry.is_symlink():
+            target = os.readlink(entry.path)
+            symlink_f(target, newentry)
+            shutil.copystat(entry.path, newentry, follow_symlinks=False)
+        else:
+            st = entry.stat(follow_symlinks=False)
+            if stat.S_ISREG(st.st_mode):
+                copy_file(entry.path, newentry)
+            else:
+                print("Ignoring", entry.path)
+                continue
+
+    if copystat:
+        shutil.copystat(oldpath, newpath, follow_symlinks=True)
+
+
+def install_skeleton_trees(state: MkosiState, cached: bool, *, late: bool=False) -> None:
+    if not state.config.skeleton_trees:
+        return
+
+    if cached:
+        return
+
+    if not late and state.config.distribution in (Distribution.debian, Distribution.ubuntu):
+        return
+
+    with complete_step("Copying in skeleton file trees…"):
+        for tree in state.config.skeleton_trees:
+            if tree.is_dir():
+                copy_path(tree, state.root, copystat=False)
+            else:
+                # unpack_archive() groks Paths, but mypy doesn't know this.
+                # Pretend that tree is a str.
+                shutil.unpack_archive(cast(str, tree), state.root)
index 5cda63385a553211ea12386a64b31e359eba13aa..8b622527c014644aec71e7fbe63051892ede34f1 100644 (file)
@@ -1,6 +1,5 @@
 # SPDX-License-Identifier: LGPL-2.1+
 
-import filecmp
 from pathlib import Path
 
 import pytest
@@ -31,32 +30,6 @@ def test_strip_suffixes() -> None:
     assert mkosi.strip_suffixes(Path("home.xz/test.txt")) == Path("home.xz/test.txt")
 
 
-def test_copy_file(tmpdir: Path) -> None:
-    dir_path = Path(tmpdir)
-    file_1 = Path(dir_path) / "file_1.txt"
-    file_2 = Path(dir_path) / "file_2.txt"
-    file_1.touch()
-    file_2.touch()
-
-    # Copying two empty files.
-    mkosi.copy_file(file_1, file_2)
-    assert filecmp.cmp(file_1, file_2)
-
-    # Copying content from one file.
-    file_1.write_text("Testing copying content from this file to file_2.")
-    mkosi.copy_file(file_1, file_2)
-    assert filecmp.cmp(file_1, file_2)
-
-    # Giving a non existing path/file.
-    with pytest.raises(OSError):
-        mkosi.copy_file("nullFilePath", file_1)
-
-    # Copying when there's already content in both files.
-    file_2.write_text("Testing copying content from file_1 to file_2, with previous data.")
-    mkosi.copy_file(file_1, file_2)
-    assert filecmp.cmp(file_1, file_2)
-
-
 def test_parse_bytes() -> None:
     assert mkosi.parse_bytes(None) == 0
     assert mkosi.parse_bytes("1") == 512
diff --git a/tests/test_install.py b/tests/test_install.py
new file mode 100644 (file)
index 0000000..13258e5
--- /dev/null
@@ -0,0 +1,33 @@
+# SPDX-License-Identifier: LGPL-2.1+
+
+import filecmp
+from pathlib import Path
+
+import pytest
+
+from mkosi.install import copy_file
+
+def test_copy_file(tmpdir: Path) -> None:
+    dir_path = Path(tmpdir)
+    file_1 = Path(dir_path) / "file_1.txt"
+    file_2 = Path(dir_path) / "file_2.txt"
+    file_1.touch()
+    file_2.touch()
+
+    # Copying two empty files.
+    copy_file(file_1, file_2)
+    assert filecmp.cmp(file_1, file_2)
+
+    # Copying content from one file.
+    file_1.write_text("Testing copying content from this file to file_2.")
+    copy_file(file_1, file_2)
+    assert filecmp.cmp(file_1, file_2)
+
+    # Giving a non existing path/file.
+    with pytest.raises(OSError):
+        copy_file("nullFilePath", file_1)
+
+    # Copying when there's already content in both files.
+    file_2.write_text("Testing copying content from file_1 to file_2, with previous data.")
+    copy_file(file_1, file_2)
+    assert filecmp.cmp(file_1, file_2)