]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-59110: zipimport: support namespace packages when no directory entry exists (GH...
authorSerhiy Storchaka <storchaka@gmail.com>
Thu, 4 Jul 2024 15:04:24 +0000 (18:04 +0300)
committerGitHub <noreply@github.com>
Thu, 4 Jul 2024 15:04:24 +0000 (15:04 +0000)
Lib/test/test_importlib/test_namespace_pkgs.py
Lib/test/test_zipimport.py
Lib/zipimport.py
Misc/NEWS.d/next/Library/2024-07-04-17-36-03.gh-issue-59110.IlI9Fz.rst [new file with mode: 0644]

index 072e198795d39403d2856acaa377235a13cbce52..cbbdada3b010a7ca53072377d8b884678828356e 100644 (file)
@@ -286,25 +286,24 @@ class DynamicPathCalculation(NamespacePackageTest):
 
 class ZipWithMissingDirectory(NamespacePackageTest):
     paths = ['missing_directory.zip']
+    # missing_directory.zip contains:
+    #   Length      Date    Time    Name
+    # ---------  ---------- -----   ----
+    #        29  2012-05-03 18:13   foo/one.py
+    #         0  2012-05-03 20:57   bar/
+    #        38  2012-05-03 20:57   bar/two.py
+    # ---------                     -------
+    #        67                     3 files
 
-    @unittest.expectedFailure
     def test_missing_directory(self):
-        # This will fail because missing_directory.zip contains:
-        #   Length      Date    Time    Name
-        # ---------  ---------- -----   ----
-        #        29  2012-05-03 18:13   foo/one.py
-        #         0  2012-05-03 20:57   bar/
-        #        38  2012-05-03 20:57   bar/two.py
-        # ---------                     -------
-        #        67                     3 files
-
-        # Because there is no 'foo/', the zipimporter currently doesn't
-        #  know that foo is a namespace package
-
         import foo.one
+        self.assertEqual(foo.one.attr, 'portion1 foo one')
+
+    def test_missing_directory2(self):
+        import foo
+        self.assertFalse(hasattr(foo, 'one'))
 
     def test_present_directory(self):
-        # This succeeds because there is a "bar/" in the zip file
         import bar.two
         self.assertEqual(bar.two.attr, 'missing_directory foo two')
 
index 0bae54d26c64f17bfb0e83fa58335f99053b2b86..d70fa3e863b98ccf706105887d7b68bf6a3c28c1 100644 (file)
@@ -296,6 +296,81 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
                  packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc)}
         self.doTest(pyc_ext, files, TESTPACK, TESTPACK2, TESTMOD)
 
+    def testPackageExplicitDirectories(self):
+        # Test explicit namespace packages with explicit directory entries.
+        self.addCleanup(os_helper.unlink, TEMP_ZIP)
+        with ZipFile(TEMP_ZIP, 'w', compression=self.compression) as z:
+            z.mkdir('a')
+            z.writestr('a/__init__.py', test_src)
+            z.mkdir('a/b')
+            z.writestr('a/b/__init__.py', test_src)
+            z.mkdir('a/b/c')
+            z.writestr('a/b/c/__init__.py', test_src)
+            z.writestr('a/b/c/d.py', test_src)
+        self._testPackage(initfile='__init__.py')
+
+    def testPackageImplicitDirectories(self):
+        # Test explicit namespace packages without explicit directory entries.
+        self.addCleanup(os_helper.unlink, TEMP_ZIP)
+        with ZipFile(TEMP_ZIP, 'w', compression=self.compression) as z:
+            z.writestr('a/__init__.py', test_src)
+            z.writestr('a/b/__init__.py', test_src)
+            z.writestr('a/b/c/__init__.py', test_src)
+            z.writestr('a/b/c/d.py', test_src)
+        self._testPackage(initfile='__init__.py')
+
+    def testNamespacePackageExplicitDirectories(self):
+        # Test implicit namespace packages with explicit directory entries.
+        self.addCleanup(os_helper.unlink, TEMP_ZIP)
+        with ZipFile(TEMP_ZIP, 'w', compression=self.compression) as z:
+            z.mkdir('a')
+            z.mkdir('a/b')
+            z.mkdir('a/b/c')
+            z.writestr('a/b/c/d.py', test_src)
+        self._testPackage(initfile=None)
+
+    def testNamespacePackageImplicitDirectories(self):
+        # Test implicit namespace packages without explicit directory entries.
+        self.addCleanup(os_helper.unlink, TEMP_ZIP)
+        with ZipFile(TEMP_ZIP, 'w', compression=self.compression) as z:
+            z.writestr('a/b/c/d.py', test_src)
+        self._testPackage(initfile=None)
+
+    def _testPackage(self, initfile):
+        zi = zipimport.zipimporter(os.path.join(TEMP_ZIP, 'a'))
+        if initfile is None:
+            # XXX Should it work?
+            self.assertRaises(zipimport.ZipImportError, zi.is_package, 'b')
+            self.assertRaises(zipimport.ZipImportError, zi.get_source, 'b')
+            self.assertRaises(zipimport.ZipImportError, zi.get_code, 'b')
+        else:
+            self.assertTrue(zi.is_package('b'))
+            self.assertEqual(zi.get_source('b'), test_src)
+            self.assertEqual(zi.get_code('b').co_filename,
+                             os.path.join(TEMP_ZIP, 'a', 'b', initfile))
+
+        sys.path.insert(0, TEMP_ZIP)
+        self.assertNotIn('a', sys.modules)
+
+        mod = importlib.import_module(f'a.b')
+        self.assertIn('a', sys.modules)
+        self.assertIs(sys.modules['a.b'], mod)
+        if initfile is None:
+            self.assertIsNone(mod.__file__)
+        else:
+            self.assertEqual(mod.__file__,
+                             os.path.join(TEMP_ZIP, 'a', 'b', initfile))
+        self.assertEqual(len(mod.__path__), 1, mod.__path__)
+        self.assertEqual(mod.__path__[0], os.path.join(TEMP_ZIP, 'a', 'b'))
+
+        mod2 = importlib.import_module(f'a.b.c.d')
+        self.assertIn('a.b.c', sys.modules)
+        self.assertIn('a.b.c.d', sys.modules)
+        self.assertIs(sys.modules['a.b.c.d'], mod2)
+        self.assertIs(mod.c.d, mod2)
+        self.assertEqual(mod2.__file__,
+                         os.path.join(TEMP_ZIP, 'a', 'b', 'c', 'd.py'))
+
     def testMixedNamespacePackage(self):
         # Test implicit namespace packages spread between a
         # real filesystem and a zip archive.
@@ -520,6 +595,7 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
                  packdir2 + "__init__" + pyc_ext: (NOW, test_pyc),
                  packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc),
                  "spam" + pyc_ext: (NOW, test_pyc)}
+        extra_files = [packdir, packdir2]
         self.addCleanup(os_helper.unlink, TEMP_ZIP)
         with ZipFile(TEMP_ZIP, "w") as z:
             for name, (mtime, data) in files.items():
@@ -529,10 +605,10 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
                 z.writestr(zinfo, data)
 
         zi = zipimport.zipimporter(TEMP_ZIP)
-        self.assertEqual(zi._get_files().keys(), files.keys())
+        self.assertEqual(sorted(zi._get_files()), sorted([*files, *extra_files]))
         # Check that the file information remains accurate after reloading
         zi.invalidate_caches()
-        self.assertEqual(zi._get_files().keys(), files.keys())
+        self.assertEqual(sorted(zi._get_files()), sorted([*files, *extra_files]))
         # Add a new file to the ZIP archive
         newfile = {"spam2" + pyc_ext: (NOW, test_pyc)}
         files.update(newfile)
@@ -544,7 +620,7 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
                 z.writestr(zinfo, data)
         # Check that we can detect the new file after invalidating the cache
         zi.invalidate_caches()
-        self.assertEqual(zi._get_files().keys(), files.keys())
+        self.assertEqual(sorted(zi._get_files()), sorted([*files, *extra_files]))
         spec = zi.find_spec('spam2')
         self.assertIsNotNone(spec)
         self.assertIsInstance(spec.loader, zipimport.zipimporter)
@@ -562,6 +638,7 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
                  packdir2 + "__init__" + pyc_ext: (NOW, test_pyc),
                  packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc),
                  "spam" + pyc_ext: (NOW, test_pyc)}
+        extra_files = [packdir, packdir2]
         self.addCleanup(os_helper.unlink, TEMP_ZIP)
         with ZipFile(TEMP_ZIP, "w") as z:
             for name, (mtime, data) in files.items():
@@ -571,10 +648,10 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
                 z.writestr(zinfo, data)
 
         zi = zipimport.zipimporter(TEMP_ZIP)
-        self.assertEqual(zi._get_files().keys(), files.keys())
+        self.assertEqual(sorted(zi._get_files()), sorted([*files, *extra_files]))
         # Zipimporter for the same path.
         zi2 = zipimport.zipimporter(TEMP_ZIP)
-        self.assertEqual(zi2._get_files().keys(), files.keys())
+        self.assertEqual(sorted(zi2._get_files()), sorted([*files, *extra_files]))
         # Add a new file to the ZIP archive to make the cache wrong.
         newfile = {"spam2" + pyc_ext: (NOW, test_pyc)}
         files.update(newfile)
@@ -587,7 +664,7 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
         # Invalidate the cache of the first zipimporter.
         zi.invalidate_caches()
         # Check that the second zipimporter detects the new file and isn't using a stale cache.
-        self.assertEqual(zi2._get_files().keys(), files.keys())
+        self.assertEqual(sorted(zi2._get_files()), sorted([*files, *extra_files]))
         spec = zi2.find_spec('spam2')
         self.assertIsNotNone(spec)
         self.assertIsInstance(spec.loader, zipimport.zipimporter)
@@ -650,17 +727,33 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
         self.assertIsNone(loader.get_source(mod_name))
         self.assertEqual(loader.get_filename(mod_name), mod.__file__)
 
-    def testGetData(self):
+    def testGetDataExplicitDirectories(self):
         self.addCleanup(os_helper.unlink, TEMP_ZIP)
-        with ZipFile(TEMP_ZIP, "w") as z:
-            z.compression = self.compression
-            name = "testdata.dat"
-            data = bytes(x for x in range(256))
-            z.writestr(name, data)
-
-        zi = zipimport.zipimporter(TEMP_ZIP)
-        self.assertEqual(data, zi.get_data(name))
-        self.assertIn('zipimporter object', repr(zi))
+        with ZipFile(TEMP_ZIP, 'w', compression=self.compression) as z:
+            z.mkdir('a')
+            z.mkdir('a/b')
+            z.mkdir('a/b/c')
+            data = bytes(range(256))
+            z.writestr('a/b/c/testdata.dat', data)
+        self._testGetData()
+
+    def testGetDataImplicitDirectories(self):
+        self.addCleanup(os_helper.unlink, TEMP_ZIP)
+        with ZipFile(TEMP_ZIP, 'w', compression=self.compression) as z:
+            data = bytes(range(256))
+            z.writestr('a/b/c/testdata.dat', data)
+        self._testGetData()
+
+    def _testGetData(self):
+        zi = zipimport.zipimporter(os.path.join(TEMP_ZIP, 'ignored'))
+        pathname = os.path.join('a', 'b', 'c', 'testdata.dat')
+        data = bytes(range(256))
+        self.assertEqual(zi.get_data(pathname), data)
+        self.assertEqual(zi.get_data(os.path.join(TEMP_ZIP, pathname)), data)
+        self.assertEqual(zi.get_data(os.path.join('a', 'b', '')), b'')
+        self.assertEqual(zi.get_data(os.path.join(TEMP_ZIP, 'a', 'b', '')), b'')
+        self.assertRaises(OSError, zi.get_data, os.path.join('a', 'b'))
+        self.assertRaises(OSError, zi.get_data, os.path.join(TEMP_ZIP, 'a', 'b'))
 
     def testImporterAttr(self):
         src = """if 1:  # indent hack
index a49a21f0799df270f8ac30300cc172d84eb38e16..a79862f1de756410ebc13fe1845f3e505e66def7 100644 (file)
@@ -155,6 +155,8 @@ class zipimporter(_bootstrap_external._LoaderBasics):
             toc_entry = self._get_files()[key]
         except KeyError:
             raise OSError(0, '', key)
+        if toc_entry is None:
+            return b''
         return _get_data(self.archive, toc_entry)
 
 
@@ -554,6 +556,20 @@ def _read_directory(archive):
         finally:
             fp.seek(start_offset)
     _bootstrap._verbose_message('zipimport: found {} names in {!r}', count, archive)
+
+    # Add implicit directories.
+    for name in list(files):
+        while True:
+            i = name.rstrip(path_sep).rfind(path_sep)
+            if i < 0:
+                break
+            name = name[:i + 1]
+            if name in files:
+                break
+            files[name] = None
+            count += 1
+    _bootstrap._verbose_message('zipimport: added {} implicit directories in {!r}',
+                                count, archive)
     return files
 
 # During bootstrap, we may need to load the encodings
diff --git a/Misc/NEWS.d/next/Library/2024-07-04-17-36-03.gh-issue-59110.IlI9Fz.rst b/Misc/NEWS.d/next/Library/2024-07-04-17-36-03.gh-issue-59110.IlI9Fz.rst
new file mode 100644 (file)
index 0000000..b8e3ee0
--- /dev/null
@@ -0,0 +1,2 @@
+:mod:`zipimport` supports now namespace packages when no directory entry
+exists.