From: DaanDeMeyer Date: Tue, 1 Jul 2025 19:42:12 +0000 (+0200) Subject: as_file() backport improvements X-Git-Tag: v26~192^2~4 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=7ad53647c2a548b3391532e48e1ec0bd921f3681;p=thirdparty%2Fmkosi.git as_file() backport improvements - Simplify - Fully type - Move to mkosi.resources We're going to extend it in the next commit, so no point in keeping it the same as upstream anymore. --- diff --git a/mkosi/backport.py b/mkosi/backport.py deleted file mode 100644 index e03fcf42c..000000000 --- a/mkosi/backport.py +++ /dev/null @@ -1,102 +0,0 @@ -# SPDX-License-Identifier: PSF-2.0 -# Copied from https://github.com/python/cpython/blob/main/Lib/importlib/resources/_common.py - -# We backport as_file() from python 3.12 here temporarily since it added directory support. -# TODO: Remove once minimum python version is 3.12. - -import contextlib -import functools -import os -import tempfile -from pathlib import Path -from typing import no_type_check - - -@no_type_check -@contextlib.contextmanager -def _tempfile( - reader, - suffix="", - # gh-93353: Keep a reference to call os.remove() in late Python - # finalization. - *, - _os_remove=os.remove, -): - # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try' - # blocks due to the need to close the temporary file to work on Windows - # properly. - fd, raw_path = tempfile.mkstemp(suffix=suffix) - try: - try: - os.write(fd, reader()) - finally: - os.close(fd) - yield Path(raw_path) - finally: - try: - _os_remove(raw_path) - except FileNotFoundError: - pass - - -@no_type_check -def _temp_file(path): - return _tempfile(path.read_bytes, suffix=path.name) - - -@no_type_check -def _is_present_dir(path) -> bool: - """ - Some Traversables implement ``is_dir()`` to raise an - exception (i.e. ``FileNotFoundError``) when the - directory doesn't exist. This function wraps that call - to always return a boolean and only return True - if there's a dir and it exists. - """ - with contextlib.suppress(FileNotFoundError): - return path.is_dir() - return False - - -@no_type_check -@functools.singledispatch -def as_file(path): - """ - Given a Traversable object, return that object as a - path on the local file system in a context manager. - """ - return _temp_dir(path) if _is_present_dir(path) else _temp_file(path) - - -@no_type_check -@contextlib.contextmanager -def _temp_path(dir: tempfile.TemporaryDirectory): - """ - Wrap tempfile.TemporyDirectory to return a pathlib object. - """ - with dir as result: - yield Path(result) - - -@no_type_check -@contextlib.contextmanager -def _temp_dir(path): - """ - Given a traversable dir, recursively replicate the whole tree - to the file system in a context manager. - """ - assert path.is_dir() - with _temp_path(tempfile.TemporaryDirectory()) as temp_dir: - yield _write_contents(temp_dir, path) - - -@no_type_check -def _write_contents(target, source): - child = target.joinpath(source.name) - if source.is_dir(): - child.mkdir() - for item in source.iterdir(): - _write_contents(child, item) - else: - child.write_bytes(source.read_bytes()) - return child diff --git a/mkosi/resources/__init__.py b/mkosi/resources/__init__.py index e69de29bb..d4b9f9802 100644 --- a/mkosi/resources/__init__.py +++ b/mkosi/resources/__init__.py @@ -0,0 +1,76 @@ +# SPDX-License-Identifier: PSF-2.0 +# Based on code from https://github.com/python/cpython/blob/main/Lib/importlib/resources/_common.py + +import contextlib +import functools +import os +import sys +import tempfile +from collections.abc import Iterator +from contextlib import AbstractContextManager +from pathlib import Path + +if sys.version_info >= (3, 11): + from importlib.resources.abc import Traversable +else: + from importlib.abc import Traversable + + +@contextlib.contextmanager +def temporary_file(path: Traversable, suffix: str = "") -> Iterator[Path]: + fd, raw_path = tempfile.mkstemp(suffix=suffix) + try: + try: + os.write(fd, path.read_bytes()) + finally: + os.close(fd) + yield Path(raw_path) + finally: + try: + os.remove(raw_path) + except FileNotFoundError: + pass + + +def dir_is_present(path: Traversable) -> bool: + """ + Some Traversables implement ``is_dir()`` to raise an + exception (i.e. ``FileNotFoundError``) when the + directory doesn't exist. This function wraps that call + to always return a boolean and only return True + if there's a dir and it exists. + """ + with contextlib.suppress(FileNotFoundError): + return path.is_dir() + return False + + +@functools.singledispatch +def as_file(path: Traversable) -> AbstractContextManager[Path]: + """ + Given a Traversable object, return that object as a + path on the local file system in a context manager. + """ + return temporary_dir(path) if dir_is_present(path) else temporary_file(path, suffix=path.name) + + +@contextlib.contextmanager +def temporary_dir(path: Traversable) -> Iterator[Path]: + """ + Given a traversable dir, recursively replicate the whole tree + to the file system in a context manager. + """ + assert path.is_dir() + with tempfile.TemporaryDirectory() as temp_dir: + yield write_contents(Path(temp_dir), path) + + +def write_contents(target: Path, source: Traversable) -> Path: + child = target.joinpath(source.name) + if source.is_dir(): + child.mkdir() + for item in source.iterdir(): + write_contents(child, item) + else: + child.write_bytes(source.read_bytes()) + return child diff --git a/mkosi/util.py b/mkosi/util.py index 81710924f..64712b6a1 100644 --- a/mkosi/util.py +++ b/mkosi/util.py @@ -22,8 +22,8 @@ from pathlib import Path from types import ModuleType from typing import IO, Any, Callable, Optional, Protocol, TypeVar, Union -from mkosi.backport import as_file from mkosi.log import die +from mkosi.resources import as_file T = TypeVar("T") V = TypeVar("V")