return spec
+# Normally we would use contextlib.contextmanager. However, this module
+# is imported by runpy, which means we want to avoid any unnecessary
+# dependencies. Thus we use a class.
+
+class allowing_all_extensions:
+ """A context manager that lets users skip the compatibility check.
+
+ Normally, extensions that do not support multiple interpreters
+ may not be imported in a subinterpreter. That implies modules
+ that do not implement multi-phase init.
+
+ Likewise for modules import in a subinterpeter with its own GIL
+ when the extension does not support a per-interpreter GIL. This
+ implies the module does not have a Py_mod_multiple_interpreters slot
+ set to Py_MOD_PER_INTERPRETER_GIL_SUPPORTED.
+
+ In both cases, this context manager may be used to temporarily
+ disable the check for compatible extension modules.
+ """
+
+ def __init__(self, disable_check=True):
+ self.disable_check = disable_check
+
+ def __enter__(self):
+ self.old = _imp._override_multi_interp_extensions_check(self.override)
+ return self
+
+ def __exit__(self, *args):
+ old = self.old
+ del self.old
+ _imp._override_multi_interp_extensions_check(old)
+
+ @property
+ def override(self):
+ return -1 if self.disable_check else 1
+
+
class _LazyModule(types.ModuleType):
"""A subclass of the module type which triggers loading upon attribute access."""
import importlib.util
import os
import pathlib
+import re
import string
import sys
from test import support
+import textwrap
import types
import unittest
import unittest.mock
import warnings
+try:
+ import _testsinglephase
+except ImportError:
+ _testsinglephase = None
+try:
+ import _testmultiphase
+except ImportError:
+ _testmultiphase = None
+try:
+ import _xxsubinterpreters as _interpreters
+except ModuleNotFoundError:
+ _interpreters = None
+
class DecodeSourceBytesTests:
self.assertEqual(EXPECTED_MAGIC_NUMBER, actual, msg)
+@unittest.skipIf(_interpreters is None, 'subinterpreters required')
+class AllowingAllExtensionsTests(unittest.TestCase):
+
+ ERROR = re.compile("^<class 'ImportError'>: module (.*) does not support loading in subinterpreters")
+
+ def run_with_own_gil(self, script):
+ interpid = _interpreters.create(isolated=True)
+ try:
+ _interpreters.run_string(interpid, script)
+ except _interpreters.RunFailedError as exc:
+ if m := self.ERROR.match(str(exc)):
+ modname, = m.groups()
+ raise ImportError(modname)
+
+ def run_with_shared_gil(self, script):
+ interpid = _interpreters.create(isolated=False)
+ try:
+ _interpreters.run_string(interpid, script)
+ except _interpreters.RunFailedError as exc:
+ if m := self.ERROR.match(str(exc)):
+ modname, = m.groups()
+ raise ImportError(modname)
+
+ @unittest.skipIf(_testsinglephase is None, "test requires _testsinglephase module")
+ def test_single_phase_init_module(self):
+ script = textwrap.dedent('''
+ import importlib.util
+ with importlib.util.allowing_all_extensions():
+ import _testsinglephase
+ ''')
+ with self.subTest('check disabled, shared GIL'):
+ self.run_with_shared_gil(script)
+ with self.subTest('check disabled, per-interpreter GIL'):
+ self.run_with_own_gil(script)
+
+ script = textwrap.dedent(f'''
+ import importlib.util
+ with importlib.util.allowing_all_extensions(False):
+ import _testsinglephase
+ ''')
+ with self.subTest('check enabled, shared GIL'):
+ with self.assertRaises(ImportError):
+ self.run_with_shared_gil(script)
+ with self.subTest('check enabled, per-interpreter GIL'):
+ with self.assertRaises(ImportError):
+ self.run_with_own_gil(script)
+
+ @unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module")
+ def test_incomplete_multi_phase_init_module(self):
+ prescript = textwrap.dedent(f'''
+ from importlib.util import spec_from_loader, module_from_spec
+ from importlib.machinery import ExtensionFileLoader
+
+ name = '_test_shared_gil_only'
+ filename = {_testmultiphase.__file__!r}
+ loader = ExtensionFileLoader(name, filename)
+ spec = spec_from_loader(name, loader)
+
+ ''')
+
+ script = prescript + textwrap.dedent('''
+ import importlib.util
+ with importlib.util.allowing_all_extensions():
+ module = module_from_spec(spec)
+ loader.exec_module(module)
+ ''')
+ with self.subTest('check disabled, shared GIL'):
+ self.run_with_shared_gil(script)
+ with self.subTest('check disabled, per-interpreter GIL'):
+ self.run_with_own_gil(script)
+
+ script = prescript + textwrap.dedent('''
+ import importlib.util
+ with importlib.util.allowing_all_extensions(False):
+ module = module_from_spec(spec)
+ loader.exec_module(module)
+ ''')
+ with self.subTest('check enabled, shared GIL'):
+ self.run_with_shared_gil(script)
+ with self.subTest('check enabled, per-interpreter GIL'):
+ with self.assertRaises(ImportError):
+ self.run_with_own_gil(script)
+
+ @unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module")
+ def test_complete_multi_phase_init_module(self):
+ script = textwrap.dedent('''
+ import importlib.util
+ with importlib.util.allowing_all_extensions():
+ import _testmultiphase
+ ''')
+ with self.subTest('check disabled, shared GIL'):
+ self.run_with_shared_gil(script)
+ with self.subTest('check disabled, per-interpreter GIL'):
+ self.run_with_own_gil(script)
+
+ script = textwrap.dedent(f'''
+ import importlib.util
+ with importlib.util.allowing_all_extensions(False):
+ import _testmultiphase
+ ''')
+ with self.subTest('check enabled, shared GIL'):
+ self.run_with_shared_gil(script)
+ with self.subTest('check enabled, per-interpreter GIL'):
+ self.run_with_own_gil(script)
+
+
if __name__ == '__main__':
unittest.main()