from asyncio import events
from asyncio import proactor_events
from asyncio import selector_events
+from multiprocessing.util import _cleanup_tests as multiprocessing_cleanup_tests
from test.test_asyncio import utils as test_utils
from test import support
from test.support import socket_helper
# multiprocessing.synchronize module cannot be imported.
support.skip_if_broken_multiprocessing_synchronize()
+ self.addCleanup(multiprocessing_cleanup_tests)
+
async def main():
pool = concurrent.futures.ProcessPoolExecutor()
result = await self.loop.run_in_executor(
try:
# compileall relies on ProcessPoolExecutor if ProcessPoolExecutor exists
# and it can function.
+ from multiprocessing.util import _cleanup_tests as multiprocessing_cleanup_tests
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures.process import _check_system_limits
_check_system_limits()
def setUp(self):
self.directory = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.directory)
+
self.source_path = os.path.join(self.directory, '_test.py')
self.bc_path = importlib.util.cache_from_source(self.source_path)
with open(self.source_path, 'w', encoding="utf-8") as file:
self.source_path3 = os.path.join(self.subdirectory, '_test3.py')
shutil.copyfile(self.source_path, self.source_path3)
- def tearDown(self):
- shutil.rmtree(self.directory)
-
def add_bad_source_file(self):
self.bad_source_path = os.path.join(self.directory, '_test_bad.py')
with open(self.bad_source_path, 'w', encoding="utf-8") as file:
script_helper.make_script(path, "__init__", "")
mods.append(script_helper.make_script(path, "mod",
"def fn(): 1/0\nfn()\n"))
+
+ if parallel:
+ self.addCleanup(multiprocessing_cleanup_tests)
compileall.compile_dir(
self.directory, quiet=True, ddir=ddir,
workers=2 if parallel else 1)
+
self.assertTrue(mods)
for mod in mods:
self.assertTrue(mod.startswith(self.directory), mod)