From: Daan De Meyer Date: Thu, 5 Sep 2024 11:45:59 +0000 (+0200) Subject: Move code backported from cpython upstream to backport.py X-Git-Tag: v25~314^2~5 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=8f7c7b366f11d836906b6779e153d93d1f41a87d;p=thirdparty%2Fmkosi.git Move code backported from cpython upstream to backport.py --- diff --git a/mkosi/backport.py b/mkosi/backport.py new file mode 100644 index 000000000..4b0902eef --- /dev/null +++ b/mkosi/backport.py @@ -0,0 +1,97 @@ +# SPDX-License-Identifier: PSF-2.0 +# Copied from https://github.com/python/cpython/blob/main/Lib/importlib/resources/_common.py + +# We backport as_file() from python 3.12 here temporarily since it added directory support. +# TODO: Remove once minimum python version is 3.12. + +import contextlib +import functools +import os +import tempfile +from pathlib import Path +from typing import no_type_check + + +@no_type_check +@contextlib.contextmanager +def _tempfile( + reader, + suffix='', + # gh-93353: Keep a reference to call os.remove() in late Python + # finalization. + *, + _os_remove=os.remove, +): + # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try' + # blocks due to the need to close the temporary file to work on Windows + # properly. + fd, raw_path = tempfile.mkstemp(suffix=suffix) + try: + try: + os.write(fd, reader()) + finally: + os.close(fd) + del reader + yield Path(raw_path) + finally: + try: + _os_remove(raw_path) + except FileNotFoundError: + pass + +@no_type_check +def _temp_file(path): + return _tempfile(path.read_bytes, suffix=path.name) + +@no_type_check +def _is_present_dir(path) -> bool: + """ + Some Traversables implement ``is_dir()`` to raise an + exception (i.e. ``FileNotFoundError``) when the + directory doesn't exist. This function wraps that call + to always return a boolean and only return True + if there's a dir and it exists. + """ + with contextlib.suppress(FileNotFoundError): + return path.is_dir() + return False + +@no_type_check +@functools.singledispatch +def as_file(path): + """ + Given a Traversable object, return that object as a + path on the local file system in a context manager. + """ + return _temp_dir(path) if _is_present_dir(path) else _temp_file(path) + +@no_type_check +@contextlib.contextmanager +def _temp_path(dir: tempfile.TemporaryDirectory): + """ + Wrap tempfile.TemporyDirectory to return a pathlib object. + """ + with dir as result: + yield Path(result) + +@no_type_check +@contextlib.contextmanager +def _temp_dir(path): + """ + Given a traversable dir, recursively replicate the whole tree + to the file system in a context manager. + """ + assert path.is_dir() + with _temp_path(tempfile.TemporaryDirectory()) as temp_dir: + yield _write_contents(temp_dir, path) + +@no_type_check +def _write_contents(target, source): + child = target.joinpath(source.name) + if source.is_dir(): + child.mkdir() + for item in source.iterdir(): + _write_contents(child, item) + else: + child.write_bytes(source.read_bytes()) + return child diff --git a/mkosi/util.py b/mkosi/util.py index b66b7b842..e55b54d3b 100644 --- a/mkosi/util.py +++ b/mkosi/util.py @@ -20,8 +20,9 @@ import tempfile from collections.abc import Hashable, Iterable, Iterator, Mapping, Sequence from pathlib import Path from types import ModuleType -from typing import Any, Callable, Optional, TypeVar, no_type_check +from typing import Any, Callable, Optional, TypeVar +from mkosi.backport import as_file from mkosi.log import die from mkosi.types import PathString @@ -187,97 +188,6 @@ def parents_below(path: Path, below: Path) -> list[Path]: @contextlib.contextmanager def resource_path(mod: ModuleType) -> Iterator[Path]: - - # We backport as_file() from python 3.12 here temporarily since it added directory support. - # TODO: Remove once minimum python version is 3.12. - - # SPDX-License-Identifier: PSF-2.0 - # Copied from https://github.com/python/cpython/blob/main/Lib/importlib/resources/_common.py - - @no_type_check - @contextlib.contextmanager - def _tempfile( - reader, - suffix='', - # gh-93353: Keep a reference to call os.remove() in late Python - # finalization. - *, - _os_remove=os.remove, - ): - # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try' - # blocks due to the need to close the temporary file to work on Windows - # properly. - fd, raw_path = tempfile.mkstemp(suffix=suffix) - try: - try: - os.write(fd, reader()) - finally: - os.close(fd) - del reader - yield Path(raw_path) - finally: - try: - _os_remove(raw_path) - except FileNotFoundError: - pass - - @no_type_check - def _temp_file(path): - return _tempfile(path.read_bytes, suffix=path.name) - - @no_type_check - def _is_present_dir(path) -> bool: - """ - Some Traversables implement ``is_dir()`` to raise an - exception (i.e. ``FileNotFoundError``) when the - directory doesn't exist. This function wraps that call - to always return a boolean and only return True - if there's a dir and it exists. - """ - with contextlib.suppress(FileNotFoundError): - return path.is_dir() - return False - - @no_type_check - @functools.singledispatch - def as_file(path): - """ - Given a Traversable object, return that object as a - path on the local file system in a context manager. - """ - return _temp_dir(path) if _is_present_dir(path) else _temp_file(path) - - @no_type_check - @contextlib.contextmanager - def _temp_path(dir: tempfile.TemporaryDirectory): - """ - Wrap tempfile.TemporyDirectory to return a pathlib object. - """ - with dir as result: - yield Path(result) - - @no_type_check - @contextlib.contextmanager - def _temp_dir(path): - """ - Given a traversable dir, recursively replicate the whole tree - to the file system in a context manager. - """ - assert path.is_dir() - with _temp_path(tempfile.TemporaryDirectory()) as temp_dir: - yield _write_contents(temp_dir, path) - - @no_type_check - def _write_contents(target, source): - child = target.joinpath(source.name) - if source.is_dir(): - child.mkdir() - for item in source.iterdir(): - _write_contents(child, item) - else: - child.write_bytes(source.read_bytes()) - return child - t = importlib.resources.files(mod) with as_file(t) as p: # Make sure any temporary directory that the resources are unpacked in is accessible to the invoking user so