--- /dev/null
+import os
+import pathlib
+import zipfile
+import tempfile
+import functools
+import contextlib
+
+
+def from_package(package):
+ """
+ Return a Traversable object for the given package.
+
+ """
+ spec = package.__spec__
+ return from_traversable_resources(spec) or fallback_resources(spec)
+
+
+def from_traversable_resources(spec):
+ """
+ If the spec.loader implements TraversableResources,
+ directly or implicitly, it will have a ``files()`` method.
+ """
+ with contextlib.suppress(AttributeError):
+ return spec.loader.files()
+
+
+def fallback_resources(spec):
+ package_directory = pathlib.Path(spec.origin).parent
+ try:
+ archive_path = spec.loader.archive
+ rel_path = package_directory.relative_to(archive_path)
+ return zipfile.Path(archive_path, str(rel_path) + '/')
+ except Exception:
+ pass
+ return package_directory
+
+
+@contextlib.contextmanager
+def _tempfile(reader, suffix=''):
+ # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
+ # blocks due to the need to close the temporary file to work on Windows
+ # properly.
+ fd, raw_path = tempfile.mkstemp(suffix=suffix)
+ try:
+ os.write(fd, reader())
+ os.close(fd)
+ yield pathlib.Path(raw_path)
+ finally:
+ try:
+ os.remove(raw_path)
+ except FileNotFoundError:
+ pass
+
+
+@functools.singledispatch
+@contextlib.contextmanager
+def as_file(path):
+ """
+ Given a Traversable object, return that object as a
+ path on the local file system in a context manager.
+ """
+ with _tempfile(path.read_bytes, suffix=path.name) as local:
+ yield local
+
+
+@as_file.register(pathlib.Path)
+@contextlib.contextmanager
+def _(path):
+ """
+ Degenerate behavior for pathlib.Path objects.
+ """
+ yield path
_frozen_importlib_external = _bootstrap_external
import abc
import warnings
+from typing import Protocol, runtime_checkable
def _register(abstract_cls, *classes):
_register(ResourceReader, machinery.SourceFileLoader)
+
+
+@runtime_checkable
+class Traversable(Protocol):
+ """
+ An object with a subset of pathlib.Path methods suitable for
+ traversing directories and opening files.
+ """
+
+ @abc.abstractmethod
+ def iterdir(self):
+ """
+ Yield Traversable objects in self
+ """
+
+ @abc.abstractmethod
+ def read_bytes(self):
+ """
+ Read contents of self as bytes
+ """
+
+ @abc.abstractmethod
+ def read_text(self, encoding=None):
+ """
+ Read contents of self as bytes
+ """
+
+ @abc.abstractmethod
+ def is_dir(self):
+ """
+ Return True if self is a dir
+ """
+
+ @abc.abstractmethod
+ def is_file(self):
+ """
+ Return True if self is a file
+ """
+
+ @abc.abstractmethod
+ def joinpath(self, child):
+ """
+ Return Traversable child in self
+ """
+
+ @abc.abstractmethod
+ def __truediv__(self, child):
+ """
+ Return Traversable child in self
+ """
+
+ @abc.abstractmethod
+ def open(self, mode='r', *args, **kwargs):
+ """
+ mode may be 'r' or 'rb' to open as text or binary. Return a handle
+ suitable for reading (same as pathlib.Path.open).
+
+ When opening as text, accepts encoding parameters such as those
+ accepted by io.TextIOWrapper.
+ """
+
+ @abc.abstractproperty
+ def name(self):
+ # type: () -> str
+ """
+ The base name of this object without any parent references.
+ """
+
+
+class TraversableResources(ResourceReader):
+ @abc.abstractmethod
+ def files(self):
+ """Return a Traversable object for the loaded package."""
+
+ def open_resource(self, resource):
+ return self.files().joinpath(resource).open('rb')
+
+ def resource_path(self, resource):
+ raise FileNotFoundError(resource)
+
+ def is_resource(self, path):
+ return self.files().joinpath(path).isfile()
+
+ def contents(self):
+ return (item.name for item in self.files().iterdir())
import os
-import tempfile
from . import abc as resources_abc
+from . import _common
+from ._common import as_file
from contextlib import contextmanager, suppress
from importlib import import_module
from importlib.abc import ResourceLoader
from io import BytesIO, TextIOWrapper
from pathlib import Path
from types import ModuleType
-from typing import Iterable, Iterator, Optional, Union # noqa: F401
+from typing import ContextManager, Iterable, Optional, Union
from typing import cast
from typing.io import BinaryIO, TextIO
__all__ = [
'Package',
'Resource',
+ 'as_file',
'contents',
+ 'files',
'is_resource',
'open_binary',
'open_text',
Resource = Union[str, os.PathLike]
+def _resolve(name) -> ModuleType:
+ """If name is a string, resolve to a module."""
+ if hasattr(name, '__spec__'):
+ return name
+ return import_module(name)
+
+
def _get_package(package) -> ModuleType:
"""Take a package name or module object and return the module.
- If a name, the module is imported. If the passed or imported module
+ If a name, the module is imported. If the resolved module
object is not a package, raise an exception.
"""
- if hasattr(package, '__spec__'):
- if package.__spec__.submodule_search_locations is None:
- raise TypeError('{!r} is not a package'.format(
- package.__spec__.name))
- else:
- return package
- else:
- module = import_module(package)
- if module.__spec__.submodule_search_locations is None:
- raise TypeError('{!r} is not a package'.format(package))
- else:
- return module
+ module = _resolve(package)
+ if module.__spec__.submodule_search_locations is None:
+ raise TypeError('{!r} is not a package'.format(package))
+ return module
def _normalize_path(path) -> str:
parent, file_name = os.path.split(path)
if parent:
raise ValueError('{!r} must be only a file name'.format(path))
- else:
- return file_name
+ return file_name
def _get_resource_reader(
reader = _get_resource_reader(package)
if reader is not None:
return reader.open_resource(resource)
- _check_location(package)
- absolute_package_path = os.path.abspath(package.__spec__.origin)
+ absolute_package_path = os.path.abspath(
+ package.__spec__.origin or 'non-existent file')
package_path = os.path.dirname(absolute_package_path)
full_path = os.path.join(package_path, resource)
try:
message = '{!r} resource not found in {!r}'.format(
resource, package_name)
raise FileNotFoundError(message)
- else:
- return BytesIO(data)
+ return BytesIO(data)
def open_text(package: Package,
encoding: str = 'utf-8',
errors: str = 'strict') -> TextIO:
"""Return a file-like object opened for text reading of the resource."""
- resource = _normalize_path(resource)
- package = _get_package(package)
- reader = _get_resource_reader(package)
- if reader is not None:
- return TextIOWrapper(reader.open_resource(resource), encoding, errors)
- _check_location(package)
- absolute_package_path = os.path.abspath(package.__spec__.origin)
- package_path = os.path.dirname(absolute_package_path)
- full_path = os.path.join(package_path, resource)
- try:
- return open(full_path, mode='r', encoding=encoding, errors=errors)
- except OSError:
- # Just assume the loader is a resource loader; all the relevant
- # importlib.machinery loaders are and an AttributeError for
- # get_data() will make it clear what is needed from the loader.
- loader = cast(ResourceLoader, package.__spec__.loader)
- data = None
- if hasattr(package.__spec__.loader, 'get_data'):
- with suppress(OSError):
- data = loader.get_data(full_path)
- if data is None:
- package_name = package.__spec__.name
- message = '{!r} resource not found in {!r}'.format(
- resource, package_name)
- raise FileNotFoundError(message)
- else:
- return TextIOWrapper(BytesIO(data), encoding, errors)
+ return TextIOWrapper(
+ open_binary(package, resource), encoding=encoding, errors=errors)
def read_binary(package: Package, resource: Resource) -> bytes:
"""Return the binary contents of the resource."""
- resource = _normalize_path(resource)
- package = _get_package(package)
with open_binary(package, resource) as fp:
return fp.read()
The decoding-related arguments have the same semantics as those of
bytes.decode().
"""
- resource = _normalize_path(resource)
- package = _get_package(package)
with open_text(package, resource, encoding, errors) as fp:
return fp.read()
-@contextmanager
-def path(package: Package, resource: Resource) -> Iterator[Path]:
+def files(package: Package) -> resources_abc.Traversable:
+ """
+ Get a Traversable resource from a package
+ """
+ return _common.from_package(_get_package(package))
+
+
+def path(
+ package: Package, resource: Resource,
+ ) -> 'ContextManager[Path]':
"""A context manager providing a file path object to the resource.
If the resource does not already exist on its own on the file system,
raised if the file was deleted prior to the context manager
exiting).
"""
- resource = _normalize_path(resource)
- package = _get_package(package)
- reader = _get_resource_reader(package)
- if reader is not None:
- try:
- yield Path(reader.resource_path(resource))
- return
- except FileNotFoundError:
- pass
- else:
- _check_location(package)
- # Fall-through for both the lack of resource_path() *and* if
- # resource_path() raises FileNotFoundError.
- package_directory = Path(package.__spec__.origin).parent
- file_path = package_directory / resource
- if file_path.exists():
- yield file_path
- else:
- with open_binary(package, resource) as fp:
- data = fp.read()
- # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
- # blocks due to the need to close the temporary file to work on
- # Windows properly.
- fd, raw_path = tempfile.mkstemp()
- try:
- os.write(fd, data)
- os.close(fd)
- yield Path(raw_path)
- finally:
- try:
- os.remove(raw_path)
- except FileNotFoundError:
- pass
+ reader = _get_resource_reader(_get_package(package))
+ return (
+ _path_from_reader(reader, resource)
+ if reader else
+ _common.as_file(files(package).joinpath(_normalize_path(resource)))
+ )
+
+
+@contextmanager
+def _path_from_reader(reader, resource):
+ norm_resource = _normalize_path(resource)
+ with suppress(FileNotFoundError):
+ yield Path(reader.resource_path(norm_resource))
+ return
+ opener_reader = reader.open_resource(norm_resource)
+ with _common._tempfile(opener_reader.read, suffix=norm_resource) as res:
+ yield res
def is_resource(package: Package, name: str) -> bool:
reader = _get_resource_reader(package)
if reader is not None:
return reader.is_resource(name)
- try:
- package_contents = set(contents(package))
- except (NotADirectoryError, FileNotFoundError):
- return False
+ package_contents = set(contents(package))
if name not in package_contents:
return False
- # Just because the given file_name lives as an entry in the package's
- # contents doesn't necessarily mean it's a resource. Directories are not
- # resources, so let's try to find out if it's a directory or not.
- path = Path(package.__spec__.origin).parent / name
- return path.is_file()
+ return (_common.from_package(package) / name).is_file()
def contents(package: Package) -> Iterable[str]:
if reader is not None:
return reader.contents()
# Is the package a namespace package? By definition, namespace packages
- # cannot have resources. We could use _check_location() and catch the
- # exception, but that's extra work, so just inline the check.
- elif package.__spec__.origin is None or not package.__spec__.has_location:
+ # cannot have resources.
+ namespace = (
+ package.__spec__.origin is None or
+ package.__spec__.origin == 'namespace'
+ )
+ if namespace or not package.__spec__.has_location:
return ()
- else:
- package_directory = Path(package.__spec__.origin).parent
- return os.listdir(package_directory)
+ return list(item.name for item in _common.from_package(package).iterdir())