]> git.ipfire.org Git - thirdparty/mkosi.git/commitdiff
as_file() backport improvements
authorDaanDeMeyer <daan.j.demeyer@gmail.com>
Tue, 1 Jul 2025 19:42:12 +0000 (21:42 +0200)
committerDaanDeMeyer <daan.j.demeyer@gmail.com>
Tue, 1 Jul 2025 20:47:11 +0000 (22:47 +0200)
- Simplify
- Fully type
- Move to mkosi.resources

We're going to extend it in the next commit, so no point in keeping
it the same as upstream anymore.

mkosi/backport.py [deleted file]
mkosi/resources/__init__.py
mkosi/util.py

diff --git a/mkosi/backport.py b/mkosi/backport.py
deleted file mode 100644 (file)
index e03fcf4..0000000
+++ /dev/null
@@ -1,102 +0,0 @@
-# SPDX-License-Identifier: PSF-2.0
-# Copied from https://github.com/python/cpython/blob/main/Lib/importlib/resources/_common.py
-
-# We backport as_file() from python 3.12 here temporarily since it added directory support.
-# TODO: Remove once minimum python version is 3.12.
-
-import contextlib
-import functools
-import os
-import tempfile
-from pathlib import Path
-from typing import no_type_check
-
-
-@no_type_check
-@contextlib.contextmanager
-def _tempfile(
-    reader,
-    suffix="",
-    # gh-93353: Keep a reference to call os.remove() in late Python
-    # finalization.
-    *,
-    _os_remove=os.remove,
-):
-    # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
-    # blocks due to the need to close the temporary file to work on Windows
-    # properly.
-    fd, raw_path = tempfile.mkstemp(suffix=suffix)
-    try:
-        try:
-            os.write(fd, reader())
-        finally:
-            os.close(fd)
-        yield Path(raw_path)
-    finally:
-        try:
-            _os_remove(raw_path)
-        except FileNotFoundError:
-            pass
-
-
-@no_type_check
-def _temp_file(path):
-    return _tempfile(path.read_bytes, suffix=path.name)
-
-
-@no_type_check
-def _is_present_dir(path) -> bool:
-    """
-    Some Traversables implement ``is_dir()`` to raise an
-    exception (i.e. ``FileNotFoundError``) when the
-    directory doesn't exist. This function wraps that call
-    to always return a boolean and only return True
-    if there's a dir and it exists.
-    """
-    with contextlib.suppress(FileNotFoundError):
-        return path.is_dir()
-    return False
-
-
-@no_type_check
-@functools.singledispatch
-def as_file(path):
-    """
-    Given a Traversable object, return that object as a
-    path on the local file system in a context manager.
-    """
-    return _temp_dir(path) if _is_present_dir(path) else _temp_file(path)
-
-
-@no_type_check
-@contextlib.contextmanager
-def _temp_path(dir: tempfile.TemporaryDirectory):
-    """
-    Wrap tempfile.TemporyDirectory to return a pathlib object.
-    """
-    with dir as result:
-        yield Path(result)
-
-
-@no_type_check
-@contextlib.contextmanager
-def _temp_dir(path):
-    """
-    Given a traversable dir, recursively replicate the whole tree
-    to the file system in a context manager.
-    """
-    assert path.is_dir()
-    with _temp_path(tempfile.TemporaryDirectory()) as temp_dir:
-        yield _write_contents(temp_dir, path)
-
-
-@no_type_check
-def _write_contents(target, source):
-    child = target.joinpath(source.name)
-    if source.is_dir():
-        child.mkdir()
-        for item in source.iterdir():
-            _write_contents(child, item)
-    else:
-        child.write_bytes(source.read_bytes())
-    return child
index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..d4b9f980294957aca0c6f0b49f94a74127174ee3 100644 (file)
@@ -0,0 +1,76 @@
+# SPDX-License-Identifier: PSF-2.0
+# Based on code from https://github.com/python/cpython/blob/main/Lib/importlib/resources/_common.py
+
+import contextlib
+import functools
+import os
+import sys
+import tempfile
+from collections.abc import Iterator
+from contextlib import AbstractContextManager
+from pathlib import Path
+
+if sys.version_info >= (3, 11):
+    from importlib.resources.abc import Traversable
+else:
+    from importlib.abc import Traversable
+
+
+@contextlib.contextmanager
+def temporary_file(path: Traversable, suffix: str = "") -> Iterator[Path]:
+    fd, raw_path = tempfile.mkstemp(suffix=suffix)
+    try:
+        try:
+            os.write(fd, path.read_bytes())
+        finally:
+            os.close(fd)
+        yield Path(raw_path)
+    finally:
+        try:
+            os.remove(raw_path)
+        except FileNotFoundError:
+            pass
+
+
+def dir_is_present(path: Traversable) -> bool:
+    """
+    Some Traversables implement ``is_dir()`` to raise an
+    exception (i.e. ``FileNotFoundError``) when the
+    directory doesn't exist. This function wraps that call
+    to always return a boolean and only return True
+    if there's a dir and it exists.
+    """
+    with contextlib.suppress(FileNotFoundError):
+        return path.is_dir()
+    return False
+
+
+@functools.singledispatch
+def as_file(path: Traversable) -> AbstractContextManager[Path]:
+    """
+    Given a Traversable object, return that object as a
+    path on the local file system in a context manager.
+    """
+    return temporary_dir(path) if dir_is_present(path) else temporary_file(path, suffix=path.name)
+
+
+@contextlib.contextmanager
+def temporary_dir(path: Traversable) -> Iterator[Path]:
+    """
+    Given a traversable dir, recursively replicate the whole tree
+    to the file system in a context manager.
+    """
+    assert path.is_dir()
+    with tempfile.TemporaryDirectory() as temp_dir:
+        yield write_contents(Path(temp_dir), path)
+
+
+def write_contents(target: Path, source: Traversable) -> Path:
+    child = target.joinpath(source.name)
+    if source.is_dir():
+        child.mkdir()
+        for item in source.iterdir():
+            write_contents(child, item)
+    else:
+        child.write_bytes(source.read_bytes())
+    return child
index 81710924f0632411f6ff7150afdfea230821b3d2..64712b6a194904f9cae77ba2115864c35f3819da 100644 (file)
@@ -22,8 +22,8 @@ from pathlib import Path
 from types import ModuleType
 from typing import IO, Any, Callable, Optional, Protocol, TypeVar, Union
 
-from mkosi.backport import as_file
 from mkosi.log import die
+from mkosi.resources import as_file
 
 T = TypeVar("T")
 V = TypeVar("V")