try:
os.write(fd, reader())
os.close(fd)
+ del reader
yield pathlib.Path(raw_path)
finally:
try:
@functools.singledispatch
-@contextlib.contextmanager
def as_file(path):
"""
Given a Traversable object, return that object as a
path on the local file system in a context manager.
"""
- with _tempfile(path.read_bytes, suffix=path.name) as local:
- yield local
+ return _tempfile(path.read_bytes, suffix=path.name)
@as_file.register(pathlib.Path)
class ZipReader(abc.TraversableResources):
def __init__(self, loader, module):
_, _, name = module.rpartition('.')
- prefix = loader.prefix.replace('\\', '/') + name + '/'
- self.path = zipfile.Path(loader.archive, prefix)
+ self.prefix = loader.prefix.replace('\\', '/') + name + '/'
+ self.archive = loader.archive
def open_resource(self, resource):
try:
return target.is_file() and target.exists()
def files(self):
- return self.path
+ return zipfile.Path(self.archive, self.prefix)
import os
+import io
from . import _common
from ._common import as_file, files
-from contextlib import contextmanager, suppress
+from contextlib import suppress
from importlib.abc import ResourceLoader
from io import BytesIO, TextIOWrapper
from pathlib import Path
from typing import ContextManager, Iterable, Union
from typing import cast
from typing.io import BinaryIO, TextIO
+from collections.abc import Sequence
+from functools import singledispatch
__all__ = [
"""
reader = _common.get_resource_reader(_common.get_package(package))
return (
- _path_from_reader(reader, resource)
+ _path_from_reader(reader, _common.normalize_path(resource))
if reader else
_common.as_file(
_common.files(package).joinpath(_common.normalize_path(resource)))
)
-@contextmanager
def _path_from_reader(reader, resource):
- norm_resource = _common.normalize_path(resource)
+ return _path_from_resource_path(reader, resource) or \
+ _path_from_open_resource(reader, resource)
+
+
+def _path_from_resource_path(reader, resource):
with suppress(FileNotFoundError):
- yield Path(reader.resource_path(norm_resource))
- return
- opener_reader = reader.open_resource(norm_resource)
- with _common._tempfile(opener_reader.read, suffix=norm_resource) as res:
- yield res
+ return Path(reader.resource_path(resource))
+
+
+def _path_from_open_resource(reader, resource):
+ saved = io.BytesIO(reader.open_resource(resource).read())
+ return _common._tempfile(saved.read, suffix=resource)
def is_resource(package: Package, name: str) -> bool:
package = _common.get_package(package)
reader = _common.get_resource_reader(package)
if reader is not None:
- return reader.contents()
+ return _ensure_sequence(reader.contents())
# Is the package a namespace package? By definition, namespace packages
# cannot have resources.
namespace = (
if namespace or not package.__spec__.has_location:
return ()
return list(item.name for item in _common.from_package(package).iterdir())
+
+
+@singledispatch
+def _ensure_sequence(iterable):
+ return list(iterable)
+
+
+@_ensure_sequence.register(Sequence)
+def _(iterable):
+ return iterable
import sys
import unittest
+import uuid
+import pathlib
from . import data01
from . import zipdata01, zipdata02
from . import util
from importlib import resources, import_module
+from test.support import import_helper
+from test.support.os_helper import unlink
class ResourceTests:
'test.test_importlib.data03.namespace', 'resource1.txt')
+class DeletingZipsTest(unittest.TestCase):
+ """Having accessed resources in a zip file should not keep an open
+ reference to the zip.
+ """
+ ZIP_MODULE = zipdata01
+
+ def setUp(self):
+ modules = import_helper.modules_setup()
+ self.addCleanup(import_helper.modules_cleanup, *modules)
+
+ data_path = pathlib.Path(self.ZIP_MODULE.__file__)
+ data_dir = data_path.parent
+ self.source_zip_path = data_dir / 'ziptestdata.zip'
+ self.zip_path = pathlib.Path('{}.zip'.format(uuid.uuid4())).absolute()
+ self.zip_path.write_bytes(self.source_zip_path.read_bytes())
+ sys.path.append(str(self.zip_path))
+ self.data = import_module('ziptestdata')
+
+ def tearDown(self):
+ try:
+ sys.path.remove(str(self.zip_path))
+ except ValueError:
+ pass
+
+ try:
+ del sys.path_importer_cache[str(self.zip_path)]
+ del sys.modules[self.data.__name__]
+ except KeyError:
+ pass
+
+ try:
+ unlink(self.zip_path)
+ except OSError:
+ # If the test fails, this will probably fail too
+ pass
+
+ def test_contents_does_not_keep_open(self):
+ c = resources.contents('ziptestdata')
+ self.zip_path.unlink()
+ del c
+
+ def test_is_resource_does_not_keep_open(self):
+ c = resources.is_resource('ziptestdata', 'binary.file')
+ self.zip_path.unlink()
+ del c
+
+ def test_is_resource_failure_does_not_keep_open(self):
+ c = resources.is_resource('ziptestdata', 'not-present')
+ self.zip_path.unlink()
+ del c
+
+ @unittest.skip("Desired but not supported.")
+ def test_path_does_not_keep_open(self):
+ c = resources.path('ziptestdata', 'binary.file')
+ self.zip_path.unlink()
+ del c
+
+ def test_entered_path_does_not_keep_open(self):
+ # This is what certifi does on import to make its bundle
+ # available for the process duration.
+ c = resources.path('ziptestdata', 'binary.file').__enter__()
+ self.zip_path.unlink()
+ del c
+
+ def test_read_binary_does_not_keep_open(self):
+ c = resources.read_binary('ziptestdata', 'binary.file')
+ self.zip_path.unlink()
+ del c
+
+ def test_read_text_does_not_keep_open(self):
+ c = resources.read_text('ziptestdata', 'utf-8.file', encoding='utf-8')
+ self.zip_path.unlink()
+ del c
+
+
if __name__ == '__main__':
unittest.main()
--- /dev/null
+In ``importlib.resources``, ``.path`` method is more aggressive about
+releasing handles to zipfile objects early, enabling use-cases like certifi
+to leave the context open but delete the underlying zip file.