]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-101659: Clean Up the General Import Tests for Subinterpreters (gh-103151)
authorEric Snow <ericsnowcurrently@gmail.com>
Fri, 31 Mar 2023 18:18:33 +0000 (12:18 -0600)
committerGitHub <noreply@github.com>
Fri, 31 Mar 2023 18:18:33 +0000 (12:18 -0600)
This involves 3 changes: some general cleanup, checks to match the kind of module, and switch from testing against sys to _imp.

This is a precursor to gh-103150, though the changes are meant to stand on their own.

Lib/test/test_import/__init__.py

index 96815b3f758a5b83ad65de9ff53b6e093e54a170..3ef07203c46c7e99697ca7b16e65dbecad73235c 100644 (file)
@@ -4,6 +4,9 @@ import errno
 import glob
 import importlib.util
 from importlib._bootstrap_external import _get_sourcefile
+from importlib.machinery import (
+    BuiltinImporter, ExtensionFileLoader, FrozenImporter, SourceFileLoader,
+)
 import marshal
 import os
 import py_compile
@@ -44,6 +47,49 @@ skip_if_dont_write_bytecode = unittest.skipIf(
         sys.dont_write_bytecode,
         "test meaningful only when writing bytecode")
 
+
+def _require_loader(module, loader, skip):
+    if isinstance(module, str):
+        module = __import__(module)
+
+    MODULE_KINDS = {
+        BuiltinImporter: 'built-in',
+        ExtensionFileLoader: 'extension',
+        FrozenImporter: 'frozen',
+        SourceFileLoader: 'pure Python',
+    }
+
+    expected = loader
+    assert isinstance(expected, type), expected
+    expected = MODULE_KINDS[expected]
+
+    actual = module.__spec__.loader
+    if not isinstance(actual, type):
+        actual = type(actual)
+    actual = MODULE_KINDS[actual]
+
+    if actual != expected:
+        err = f'expected module to be {expected}, got {module.__spec__}'
+        if skip:
+            raise unittest.SkipTest(err)
+        raise Exception(err)
+    return module
+
+def require_builtin(module, *, skip=False):
+    module = _require_loader(module, BuiltinImporter, skip)
+    assert module.__spec__.origin == 'built-in', module.__spec__
+
+def require_extension(module, *, skip=False):
+    _require_loader(module, ExtensionFileLoader, skip)
+
+def require_frozen(module, *, skip=True):
+    module = _require_loader(module, FrozenImporter, skip)
+    assert module.__spec__.origin == 'frozen', module.__spec__
+
+def require_pure_python(module, *, skip=False):
+    _require_loader(module, SourceFileLoader, skip)
+
+
 def remove_files(name):
     for f in (name + ".py",
               name + ".pyc",
@@ -1437,10 +1483,10 @@ class SubinterpImportTests(unittest.TestCase):
             os.write({fd}, text.encode('utf-8'))
             ''')
 
-    def run_shared(self, name, *,
-                   check_singlephase_setting=False,
-                   check_singlephase_override=None,
-                   ):
+    def run_here(self, name, *,
+                 check_singlephase_setting=False,
+                 check_singlephase_override=None,
+                 ):
         """
         Try importing the named module in a subinterpreter.
 
@@ -1470,27 +1516,35 @@ class SubinterpImportTests(unittest.TestCase):
         self.assertEqual(ret, 0)
         return os.read(r, 100)
 
-    def check_compatible_shared(self, name, *, strict=False):
+    def check_compatible_here(self, name, *, strict=False):
         # Verify that the named module may be imported in a subinterpreter.
-        # (See run_shared() for more info.)
-        out = self.run_shared(name, check_singlephase_setting=strict)
+        # (See run_here() for more info.)
+        out = self.run_here(name,
+                            check_singlephase_setting=strict,
+                            )
         self.assertEqual(out, b'okay')
 
-    def check_incompatible_shared(self, name):
-        # Differences from check_compatible_shared():
+    def check_incompatible_here(self, name):
+        # Differences from check_compatible_here():
         #  * verify that import fails
         #  * "strict" is always True
-        out = self.run_shared(name, check_singlephase_setting=True)
+        out = self.run_here(name,
+                            check_singlephase_setting=True,
+                            )
         self.assertEqual(
             out.decode('utf-8'),
             f'ImportError: module {name} does not support loading in subinterpreters',
         )
 
-    def check_compatible_isolated(self, name, *, strict=False):
-        # Differences from check_compatible_shared():
+    def check_compatible_fresh(self, name, *, strict=False):
+        # Differences from check_compatible_here():
         #  * subinterpreter in a new process
         #  * module has never been imported before in that process
         #  * this tests importing the module for the first time
+        kwargs = dict(
+            **self.RUN_KWARGS,
+            check_multi_interp_extensions=strict,
+        )
         _, out, err = script_helper.assert_python_ok('-c', textwrap.dedent(f'''
             import _testcapi, sys
             assert (
@@ -1499,25 +1553,27 @@ class SubinterpImportTests(unittest.TestCase):
             ), repr({name!r})
             ret = _testcapi.run_in_subinterp_with_config(
                 {self.import_script(name, "sys.stdout.fileno()")!r},
-                **{self.RUN_KWARGS},
-                check_multi_interp_extensions={strict},
+                **{kwargs},
             )
             assert ret == 0, ret
             '''))
         self.assertEqual(err, b'')
         self.assertEqual(out, b'okay')
 
-    def check_incompatible_isolated(self, name):
-        # Differences from check_compatible_isolated():
+    def check_incompatible_fresh(self, name):
+        # Differences from check_compatible_fresh():
         #  * verify that import fails
         #  * "strict" is always True
+        kwargs = dict(
+            **self.RUN_KWARGS,
+            check_multi_interp_extensions=True,
+        )
         _, out, err = script_helper.assert_python_ok('-c', textwrap.dedent(f'''
             import _testcapi, sys
             assert {name!r} not in sys.modules, {name!r}
             ret = _testcapi.run_in_subinterp_with_config(
                 {self.import_script(name, "sys.stdout.fileno()")!r},
-                **{self.RUN_KWARGS},
-                check_multi_interp_extensions=True,
+                **{kwargs},
             )
             assert ret == 0, ret
             '''))
@@ -1528,59 +1584,65 @@ class SubinterpImportTests(unittest.TestCase):
         )
 
     def test_builtin_compat(self):
-        module = 'sys'
+        # For now we avoid using sys or builtins
+        # since they still don't implement multi-phase init.
+        module = '_imp'
+        require_builtin(module)
         with self.subTest(f'{module}: not strict'):
-            self.check_compatible_shared(module, strict=False)
-        with self.subTest(f'{module}: strict, shared'):
-            self.check_compatible_shared(module, strict=True)
+            self.check_compatible_here(module, strict=False)
+        with self.subTest(f'{module}: strict, not fresh'):
+            self.check_compatible_here(module, strict=True)
 
     @cpython_only
     def test_frozen_compat(self):
         module = '_frozen_importlib'
+        require_frozen(module, skip=True)
         if __import__(module).__spec__.origin != 'frozen':
             raise unittest.SkipTest(f'{module} is unexpectedly not frozen')
         with self.subTest(f'{module}: not strict'):
-            self.check_compatible_shared(module, strict=False)
-        with self.subTest(f'{module}: strict, shared'):
-            self.check_compatible_shared(module, strict=True)
+            self.check_compatible_here(module, strict=False)
+        with self.subTest(f'{module}: strict, not fresh'):
+            self.check_compatible_here(module, strict=True)
 
     @unittest.skipIf(_testsinglephase is None, "test requires _testsinglephase module")
     def test_single_init_extension_compat(self):
         module = '_testsinglephase'
+        require_extension(module)
         with self.subTest(f'{module}: not strict'):
-            self.check_compatible_shared(module, strict=False)
-        with self.subTest(f'{module}: strict, shared'):
-            self.check_incompatible_shared(module)
-        with self.subTest(f'{module}: strict, isolated'):
-            self.check_incompatible_isolated(module)
+            self.check_compatible_here(module, strict=False)
+        with self.subTest(f'{module}: strict, not fresh'):
+            self.check_incompatible_here(module)
+        with self.subTest(f'{module}: strict, fresh'):
+            self.check_incompatible_fresh(module)
 
     @unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module")
     def test_multi_init_extension_compat(self):
         module = '_testmultiphase'
+        require_extension(module)
         with self.subTest(f'{module}: not strict'):
-            self.check_compatible_shared(module, strict=False)
-        with self.subTest(f'{module}: strict, shared'):
-            self.check_compatible_shared(module, strict=True)
-        with self.subTest(f'{module}: strict, isolated'):
-            self.check_compatible_isolated(module, strict=True)
+            self.check_compatible_here(module, strict=False)
+        with self.subTest(f'{module}: strict, not fresh'):
+            self.check_compatible_here(module, strict=True)
+        with self.subTest(f'{module}: strict, fresh'):
+            self.check_compatible_fresh(module, strict=True)
 
     def test_python_compat(self):
         module = 'threading'
-        if __import__(module).__spec__.origin == 'frozen':
-            raise unittest.SkipTest(f'{module} is unexpectedly frozen')
+        require_pure_python(module)
         with self.subTest(f'{module}: not strict'):
-            self.check_compatible_shared(module, strict=False)
-        with self.subTest(f'{module}: strict, shared'):
-            self.check_compatible_shared(module, strict=True)
-        with self.subTest(f'{module}: strict, isolated'):
-            self.check_compatible_isolated(module, strict=True)
+            self.check_compatible_here(module, strict=False)
+        with self.subTest(f'{module}: strict, not fresh'):
+            self.check_compatible_here(module, strict=True)
+        with self.subTest(f'{module}: strict, fresh'):
+            self.check_compatible_fresh(module, strict=True)
 
     @unittest.skipIf(_testsinglephase is None, "test requires _testsinglephase module")
     def test_singlephase_check_with_setting_and_override(self):
         module = '_testsinglephase'
+        require_extension(module)
 
         def check_compatible(setting, override):
-            out = self.run_shared(
+            out = self.run_here(
                 module,
                 check_singlephase_setting=setting,
                 check_singlephase_override=override,
@@ -1588,7 +1650,7 @@ class SubinterpImportTests(unittest.TestCase):
             self.assertEqual(out, b'okay')
 
         def check_incompatible(setting, override):
-            out = self.run_shared(
+            out = self.run_here(
                 module,
                 check_singlephase_setting=setting,
                 check_singlephase_override=override,