import glob
import hashlib
import http.server
-import importlib.resources
+import importlib
import itertools
import json
import math
import re
import shlex
import shutil
-import stat
import string
import subprocess
import sys
tmp_dir,
warn,
)
+from .install import (
+ add_dropin_config,
+ add_dropin_config_from_resource,
+ copy_file,
+ copy_file_object,
+ copy_path,
+ install_skeleton_trees,
+ open_close,
+ write_resource,
+)
from .manifest import Manifest
from .mounts import mount, mount_api_vfs, mount_bind, mount_overlay, mount_tmpfs
-from .syscall import blkpg_add_partition, blkpg_del_partition, reflink
+from .syscall import blkpg_add_partition, blkpg_del_partition
complete_step = MkosiPrinter.complete_step
color_error = MkosiPrinter.color_error
]
-def write_resource(
- where: Path, resource: str, key: str, *, executable: bool = False, mode: Optional[int] = None
-) -> None:
- text = importlib.resources.read_text(resource, key)
- where.write_text(text)
- if mode is not None:
- where.chmod(mode)
- elif executable:
- make_executable(where)
-
-
-def add_dropin_config(root: Path, unit: str, name: str, content: str) -> None:
- """Add a dropin config `name.conf` in /etc/systemd/system for `unit`."""
- dropin = root / f"etc/systemd/system/{unit}.d/{name}.conf"
- dropin.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
- dropin.write_text(dedent(content))
- dropin.chmod(0o644)
-
-
-def add_dropin_config_from_resource(
- root: Path, unit: str, name: str, resource: str, key: str
-) -> None:
- dropin = root / f"etc/systemd/system/{unit}.d/{name}.conf"
- dropin.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
- write_resource(dropin, resource, key, mode=0o644)
-
-
T = TypeVar("T")
V = TypeVar("V")
return f"{num_bytes}B"
-@contextlib.contextmanager
-def open_close(path: PathString, flags: int, mode: int = 0o664) -> Iterator[int]:
- fd = os.open(path, flags | os.O_CLOEXEC, mode)
- try:
- yield fd
- finally:
- os.close(fd)
-
-
-def copy_fd(oldfd: int, newfd: int) -> None:
- try:
- reflink(oldfd, newfd)
- except OSError as e:
- if e.errno not in {errno.EXDEV, errno.EOPNOTSUPP, errno.ENOTTY}:
- raise
- # While mypy handles this correctly, Pyright doesn't yet.
- shutil.copyfileobj(open(oldfd, "rb", closefd=False), cast(Any, open(newfd, "wb", closefd=False)))
-
-
-def copy_file_object(oldobject: BinaryIO, newobject: BinaryIO) -> None:
- try:
- reflink(oldobject.fileno(), newobject.fileno())
- except OSError as e:
- if e.errno not in {errno.EXDEV, errno.EOPNOTSUPP, errno.ENOTTY}:
- raise
- shutil.copyfileobj(oldobject, newobject)
- newobject.flush()
-
-
-def copy_file(oldpath: PathString, newpath: PathString) -> None:
- oldpath = Path(oldpath)
- newpath = Path(newpath)
-
- if oldpath.is_symlink():
- src = os.readlink(oldpath) # TODO: use oldpath.readlink() with python3.9+
- newpath.symlink_to(src)
- return
-
- with open_close(oldpath, os.O_RDONLY) as oldfd:
- st = os.stat(oldfd)
-
- try:
- with open_close(newpath, os.O_WRONLY | os.O_CREAT | os.O_EXCL, st.st_mode) as newfd:
- copy_fd(oldfd, newfd)
- except FileExistsError:
- newpath.unlink()
- with open_close(newpath, os.O_WRONLY | os.O_CREAT, st.st_mode) as newfd:
- copy_fd(oldfd, newfd)
- shutil.copystat(oldpath, newpath, follow_symlinks=False)
-
-
-def symlink_f(target: str, path: Path) -> None:
- try:
- path.symlink_to(target)
- except FileExistsError:
- os.unlink(path)
- path.symlink_to(target)
-
-
-def copy_path(oldpath: PathString, newpath: Path, *, copystat: bool = True) -> None:
- try:
- newpath.mkdir(exist_ok=True)
- except FileExistsError:
- # something that is not a directory already exists
- newpath.unlink()
- newpath.mkdir()
-
- for entry in os.scandir(oldpath):
- newentry = newpath / entry.name
- if entry.is_dir(follow_symlinks=False):
- copy_path(entry.path, newentry)
- elif entry.is_symlink():
- target = os.readlink(entry.path)
- symlink_f(target, newentry)
- shutil.copystat(entry.path, newentry, follow_symlinks=False)
- else:
- st = entry.stat(follow_symlinks=False)
- if stat.S_ISREG(st.st_mode):
- copy_file(entry.path, newentry)
- else:
- print("Ignoring", entry.path)
- continue
-
- if copystat:
- shutil.copystat(oldpath, newpath, follow_symlinks=True)
-
-
@complete_step("Detaching namespace")
def init_namespace() -> None:
unshare(CLONE_NEWNS)
return False
-def make_executable(path: Path) -> None:
- st = path.stat()
- os.chmod(path, st.st_mode | stat.S_IEXEC)
-
-
def make_rpm_list(state: MkosiState, packages: Set[str]) -> Set[str]:
packages = packages.copy()
# installation has completed.
link_rpm_db(state.root)
+
def remove_packages(state: MkosiState) -> None:
"""Remove packages listed in config.remove_packages"""
shutil.unpack_archive(cast(str, tree), state.root)
-def install_skeleton_trees(state: MkosiState, cached: bool, *, late: bool=False) -> None:
- if not state.config.skeleton_trees:
- return
-
- if cached:
- return
-
- if not late and state.config.distribution in (Distribution.debian, Distribution.ubuntu):
- return
-
- with complete_step("Copying in skeleton file trees…"):
- for tree in state.config.skeleton_trees:
- if tree.is_dir():
- copy_path(tree, state.root, copystat=False)
- else:
- # unpack_archive() groks Paths, but mypy doesn't know this.
- # Pretend that tree is a str.
- shutil.unpack_archive(cast(str, tree), state.root)
-
-
def copy_git_files(src: Path, dest: Path, *, source_file_transfer: SourceFileTransfer) -> None:
what_files = ["--exclude-standard", "--cached"]
if source_file_transfer == SourceFileTransfer.copy_git_others:
from textwrap import dedent
from typing import Dict, Generator, List, Sequence
-from . import copy_path, open_close, unlink_try_hard
+from . import unlink_try_hard
from .backend import (
ARG_DEBUG,
MkosiConfig,
run_workspace_command,
safe_tar_extract,
)
+from .install import copy_path, open_close
ARCHITECTURES = {
"x86_64": ("amd64", "arch/x86/boot/bzImage"),
--- /dev/null
+# SPDX-License-Identifier: LGPL-2.1+
+
+import contextlib
+import errno
+import importlib.resources
+import os
+import shutil
+import stat
+from pathlib import Path
+from textwrap import dedent
+from typing import Any, BinaryIO, Iterator, Optional, cast
+
+from .backend import Distribution, MkosiState, PathString, complete_step
+from .syscall import reflink
+
+
+def make_executable(path: Path) -> None:
+ st = path.stat()
+ os.chmod(path, st.st_mode | stat.S_IEXEC)
+
+
+def write_resource(
+ where: Path, resource: str, key: str, *, executable: bool = False, mode: Optional[int] = None
+) -> None:
+ text = importlib.resources.read_text(resource, key)
+ where.write_text(text)
+ if mode is not None:
+ where.chmod(mode)
+ elif executable:
+ make_executable(where)
+
+
+def add_dropin_config(root: Path, unit: str, name: str, content: str) -> None:
+ """Add a dropin config `name.conf` in /etc/systemd/system for `unit`."""
+ dropin = root / f"etc/systemd/system/{unit}.d/{name}.conf"
+ dropin.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
+ dropin.write_text(dedent(content))
+ dropin.chmod(0o644)
+
+
+def add_dropin_config_from_resource(
+ root: Path, unit: str, name: str, resource: str, key: str
+) -> None:
+ dropin = root / f"etc/systemd/system/{unit}.d/{name}.conf"
+ dropin.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
+ write_resource(dropin, resource, key, mode=0o644)
+
+
+@contextlib.contextmanager
+def open_close(path: PathString, flags: int, mode: int = 0o664) -> Iterator[int]:
+ fd = os.open(path, flags | os.O_CLOEXEC, mode)
+ try:
+ yield fd
+ finally:
+ os.close(fd)
+
+
+def copy_fd(oldfd: int, newfd: int) -> None:
+ try:
+ reflink(oldfd, newfd)
+ except OSError as e:
+ if e.errno not in {errno.EXDEV, errno.EOPNOTSUPP, errno.ENOTTY}:
+ raise
+ # While mypy handles this correctly, Pyright doesn't yet.
+ shutil.copyfileobj(open(oldfd, "rb", closefd=False), cast(Any, open(newfd, "wb", closefd=False)))
+
+
+def copy_file_object(oldobject: BinaryIO, newobject: BinaryIO) -> None:
+ try:
+ reflink(oldobject.fileno(), newobject.fileno())
+ except OSError as e:
+ if e.errno not in {errno.EXDEV, errno.EOPNOTSUPP, errno.ENOTTY}:
+ raise
+ shutil.copyfileobj(oldobject, newobject)
+ newobject.flush()
+
+
+def copy_file(oldpath: PathString, newpath: PathString) -> None:
+ oldpath = Path(oldpath)
+ newpath = Path(newpath)
+
+ if oldpath.is_symlink():
+ src = os.readlink(oldpath) # TODO: use oldpath.readlink() with python3.9+
+ newpath.symlink_to(src)
+ return
+
+ with open_close(oldpath, os.O_RDONLY) as oldfd:
+ st = os.stat(oldfd)
+
+ try:
+ with open_close(newpath, os.O_WRONLY | os.O_CREAT | os.O_EXCL, st.st_mode) as newfd:
+ copy_fd(oldfd, newfd)
+ except FileExistsError:
+ newpath.unlink()
+ with open_close(newpath, os.O_WRONLY | os.O_CREAT, st.st_mode) as newfd:
+ copy_fd(oldfd, newfd)
+ shutil.copystat(oldpath, newpath, follow_symlinks=False)
+
+
+def symlink_f(target: str, path: Path) -> None:
+ try:
+ path.symlink_to(target)
+ except FileExistsError:
+ os.unlink(path)
+ path.symlink_to(target)
+
+
+def copy_path(oldpath: PathString, newpath: Path, *, copystat: bool = True) -> None:
+ try:
+ newpath.mkdir(exist_ok=True)
+ except FileExistsError:
+ # something that is not a directory already exists
+ newpath.unlink()
+ newpath.mkdir()
+
+ for entry in os.scandir(oldpath):
+ newentry = newpath / entry.name
+ if entry.is_dir(follow_symlinks=False):
+ copy_path(entry.path, newentry)
+ elif entry.is_symlink():
+ target = os.readlink(entry.path)
+ symlink_f(target, newentry)
+ shutil.copystat(entry.path, newentry, follow_symlinks=False)
+ else:
+ st = entry.stat(follow_symlinks=False)
+ if stat.S_ISREG(st.st_mode):
+ copy_file(entry.path, newentry)
+ else:
+ print("Ignoring", entry.path)
+ continue
+
+ if copystat:
+ shutil.copystat(oldpath, newpath, follow_symlinks=True)
+
+
+def install_skeleton_trees(state: MkosiState, cached: bool, *, late: bool=False) -> None:
+ if not state.config.skeleton_trees:
+ return
+
+ if cached:
+ return
+
+ if not late and state.config.distribution in (Distribution.debian, Distribution.ubuntu):
+ return
+
+ with complete_step("Copying in skeleton file trees…"):
+ for tree in state.config.skeleton_trees:
+ if tree.is_dir():
+ copy_path(tree, state.root, copystat=False)
+ else:
+ # unpack_archive() groks Paths, but mypy doesn't know this.
+ # Pretend that tree is a str.
+ shutil.unpack_archive(cast(str, tree), state.root)
# SPDX-License-Identifier: LGPL-2.1+
-import filecmp
from pathlib import Path
import pytest
assert mkosi.strip_suffixes(Path("home.xz/test.txt")) == Path("home.xz/test.txt")
-def test_copy_file(tmpdir: Path) -> None:
- dir_path = Path(tmpdir)
- file_1 = Path(dir_path) / "file_1.txt"
- file_2 = Path(dir_path) / "file_2.txt"
- file_1.touch()
- file_2.touch()
-
- # Copying two empty files.
- mkosi.copy_file(file_1, file_2)
- assert filecmp.cmp(file_1, file_2)
-
- # Copying content from one file.
- file_1.write_text("Testing copying content from this file to file_2.")
- mkosi.copy_file(file_1, file_2)
- assert filecmp.cmp(file_1, file_2)
-
- # Giving a non existing path/file.
- with pytest.raises(OSError):
- mkosi.copy_file("nullFilePath", file_1)
-
- # Copying when there's already content in both files.
- file_2.write_text("Testing copying content from file_1 to file_2, with previous data.")
- mkosi.copy_file(file_1, file_2)
- assert filecmp.cmp(file_1, file_2)
-
-
def test_parse_bytes() -> None:
assert mkosi.parse_bytes(None) == 0
assert mkosi.parse_bytes("1") == 512
--- /dev/null
+# SPDX-License-Identifier: LGPL-2.1+
+
+import filecmp
+from pathlib import Path
+
+import pytest
+
+from mkosi.install import copy_file
+
+def test_copy_file(tmpdir: Path) -> None:
+ dir_path = Path(tmpdir)
+ file_1 = Path(dir_path) / "file_1.txt"
+ file_2 = Path(dir_path) / "file_2.txt"
+ file_1.touch()
+ file_2.touch()
+
+ # Copying two empty files.
+ copy_file(file_1, file_2)
+ assert filecmp.cmp(file_1, file_2)
+
+ # Copying content from one file.
+ file_1.write_text("Testing copying content from this file to file_2.")
+ copy_file(file_1, file_2)
+ assert filecmp.cmp(file_1, file_2)
+
+ # Giving a non existing path/file.
+ with pytest.raises(OSError):
+ copy_file("nullFilePath", file_1)
+
+ # Copying when there's already content in both files.
+ file_2.write_text("Testing copying content from file_1 to file_2, with previous data.")
+ copy_file(file_1, file_2)
+ assert filecmp.cmp(file_1, file_2)