]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
[3.11] gh-108388: Convert test_concurrent_futures to package (#108401) (#109704)
authorVictor Stinner <vstinner@python.org>
Fri, 22 Sep 2023 01:37:25 +0000 (03:37 +0200)
committerGitHub <noreply@github.com>
Fri, 22 Sep 2023 01:37:25 +0000 (01:37 +0000)
* gh-108388: Convert test_concurrent_futures to package (#108401)

Convert test_concurrent_futures to a package of sub-tests.

(cherry picked from commit aa6f787faa4bc45006da4dc2f942fb9b82c98836)

Notes on backport to 3.11:

* AsCompletedTests: Revert test_future_times_out() => test_zero_timeout()
* Restore TODO comment
* ThreadPoolExecutorTest.test_hang_global_shutdown_lock():
  add @support.requires_resource('cpu').

14 files changed:
Lib/test/libregrtest/runtest.py
Lib/test/test_concurrent_futures.py [deleted file]
Lib/test/test_concurrent_futures/__init__.py [new file with mode: 0644]
Lib/test/test_concurrent_futures/executor.py
Lib/test/test_concurrent_futures/test_as_completed.py [new file with mode: 0644]
Lib/test/test_concurrent_futures/test_deadlock.py [new file with mode: 0644]
Lib/test/test_concurrent_futures/test_future.py [new file with mode: 0644]
Lib/test/test_concurrent_futures/test_init.py [new file with mode: 0644]
Lib/test/test_concurrent_futures/test_process_pool.py [new file with mode: 0644]
Lib/test/test_concurrent_futures/test_shutdown.py [new file with mode: 0644]
Lib/test/test_concurrent_futures/test_thread_pool.py [new file with mode: 0644]
Lib/test/test_concurrent_futures/test_wait.py
Lib/test/test_concurrent_futures/util.py [new file with mode: 0644]
Misc/NEWS.d/next/Tests/2023-08-24-06-10-36.gh-issue-108388.YCVB0D.rst [new file with mode: 0644]

index 74e28e27e1abac3b5fbecb9088d4ffcd29ace9b6..000a4aa75c44a0c0a26f753a288403793daa1125 100644 (file)
@@ -141,6 +141,7 @@ PROGRESS_MIN_TIME = 30.0   # seconds
 
 SPLITTESTDIRS = {
     "test_asyncio",
+    "test_concurrent_futures",
     "test_future_stmt",
     "test_multiprocessing_fork",
     "test_multiprocessing_forkserver",
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
deleted file mode 100644 (file)
index a2a4d87..0000000
+++ /dev/null
@@ -1,1673 +0,0 @@
-from test import support
-from test.support import import_helper
-from test.support import threading_helper
-
-# Skip tests if _multiprocessing wasn't built.
-import_helper.import_module('_multiprocessing')
-
-from test.support import hashlib_helper
-from test.support.script_helper import assert_python_ok
-
-import contextlib
-import itertools
-import logging
-from logging.handlers import QueueHandler
-import os
-import queue
-import signal
-import sys
-import threading
-import time
-import unittest
-import weakref
-from pickle import PicklingError
-
-from concurrent import futures
-from concurrent.futures._base import (
-    PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
-    BrokenExecutor)
-from concurrent.futures.process import BrokenProcessPool, _check_system_limits
-
-import multiprocessing.process
-import multiprocessing.util
-import multiprocessing as mp
-
-
-if support.check_sanitizer(address=True, memory=True):
-    # bpo-46633: Skip the test because it is too slow when Python is built
-    # with ASAN/MSAN: between 5 and 20 minutes on GitHub Actions.
-    raise unittest.SkipTest("test too slow on ASAN/MSAN build")
-
-
-def create_future(state=PENDING, exception=None, result=None):
-    f = Future()
-    f._state = state
-    f._exception = exception
-    f._result = result
-    return f
-
-
-PENDING_FUTURE = create_future(state=PENDING)
-RUNNING_FUTURE = create_future(state=RUNNING)
-CANCELLED_FUTURE = create_future(state=CANCELLED)
-CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
-EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
-SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
-
-INITIALIZER_STATUS = 'uninitialized'
-
-def mul(x, y):
-    return x * y
-
-def capture(*args, **kwargs):
-    return args, kwargs
-
-def sleep_and_raise(t):
-    time.sleep(t)
-    raise Exception('this is an exception')
-
-def sleep_and_print(t, msg):
-    time.sleep(t)
-    print(msg)
-    sys.stdout.flush()
-
-def init(x):
-    global INITIALIZER_STATUS
-    INITIALIZER_STATUS = x
-
-def get_init_status():
-    return INITIALIZER_STATUS
-
-def init_fail(log_queue=None):
-    if log_queue is not None:
-        logger = logging.getLogger('concurrent.futures')
-        logger.addHandler(QueueHandler(log_queue))
-        logger.setLevel('CRITICAL')
-        logger.propagate = False
-    time.sleep(0.1)  # let some futures be scheduled
-    raise ValueError('error in initializer')
-
-
-class MyObject(object):
-    def my_method(self):
-        pass
-
-
-class EventfulGCObj():
-    def __init__(self, mgr):
-        self.event = mgr.Event()
-
-    def __del__(self):
-        self.event.set()
-
-
-def make_dummy_object(_):
-    return MyObject()
-
-
-class BaseTestCase(unittest.TestCase):
-    def setUp(self):
-        self._thread_key = threading_helper.threading_setup()
-
-    def tearDown(self):
-        support.reap_children()
-        threading_helper.threading_cleanup(*self._thread_key)
-
-
-class ExecutorMixin:
-    worker_count = 5
-    executor_kwargs = {}
-
-    def setUp(self):
-        super().setUp()
-
-        self.t1 = time.monotonic()
-        if hasattr(self, "ctx"):
-            self.executor = self.executor_type(
-                max_workers=self.worker_count,
-                mp_context=self.get_context(),
-                **self.executor_kwargs)
-        else:
-            self.executor = self.executor_type(
-                max_workers=self.worker_count,
-                **self.executor_kwargs)
-
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
-        self.executor = None
-
-        dt = time.monotonic() - self.t1
-        if support.verbose:
-            print("%.2fs" % dt, end=' ')
-        self.assertLess(dt, 300, "synchronization issue: test lasted too long")
-
-        super().tearDown()
-
-    def get_context(self):
-        return mp.get_context(self.ctx)
-
-
-class ThreadPoolMixin(ExecutorMixin):
-    executor_type = futures.ThreadPoolExecutor
-
-
-class ProcessPoolForkMixin(ExecutorMixin):
-    executor_type = futures.ProcessPoolExecutor
-    ctx = "fork"
-
-    def get_context(self):
-        try:
-            _check_system_limits()
-        except NotImplementedError:
-            self.skipTest("ProcessPoolExecutor unavailable on this system")
-        if sys.platform == "win32":
-            self.skipTest("require unix system")
-        return super().get_context()
-
-
-class ProcessPoolSpawnMixin(ExecutorMixin):
-    executor_type = futures.ProcessPoolExecutor
-    ctx = "spawn"
-
-    def get_context(self):
-        try:
-            _check_system_limits()
-        except NotImplementedError:
-            self.skipTest("ProcessPoolExecutor unavailable on this system")
-        return super().get_context()
-
-
-class ProcessPoolForkserverMixin(ExecutorMixin):
-    executor_type = futures.ProcessPoolExecutor
-    ctx = "forkserver"
-
-    def get_context(self):
-        try:
-            _check_system_limits()
-        except NotImplementedError:
-            self.skipTest("ProcessPoolExecutor unavailable on this system")
-        if sys.platform == "win32":
-            self.skipTest("require unix system")
-        return super().get_context()
-
-
-def create_executor_tests(mixin, bases=(BaseTestCase,),
-                          executor_mixins=(ThreadPoolMixin,
-                                           ProcessPoolForkMixin,
-                                           ProcessPoolForkserverMixin,
-                                           ProcessPoolSpawnMixin)):
-    def strip_mixin(name):
-        if name.endswith(('Mixin', 'Tests')):
-            return name[:-5]
-        elif name.endswith('Test'):
-            return name[:-4]
-        else:
-            return name
-
-    for exe in executor_mixins:
-        name = ("%s%sTest"
-                % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__)))
-        cls = type(name, (mixin,) + (exe,) + bases, {})
-        globals()[name] = cls
-
-
-class InitializerMixin(ExecutorMixin):
-    worker_count = 2
-
-    def setUp(self):
-        global INITIALIZER_STATUS
-        INITIALIZER_STATUS = 'uninitialized'
-        self.executor_kwargs = dict(initializer=init,
-                                    initargs=('initialized',))
-        super().setUp()
-
-    def test_initializer(self):
-        futures = [self.executor.submit(get_init_status)
-                   for _ in range(self.worker_count)]
-
-        for f in futures:
-            self.assertEqual(f.result(), 'initialized')
-
-
-class FailingInitializerMixin(ExecutorMixin):
-    worker_count = 2
-
-    def setUp(self):
-        if hasattr(self, "ctx"):
-            # Pass a queue to redirect the child's logging output
-            self.mp_context = self.get_context()
-            self.log_queue = self.mp_context.Queue()
-            self.executor_kwargs = dict(initializer=init_fail,
-                                        initargs=(self.log_queue,))
-        else:
-            # In a thread pool, the child shares our logging setup
-            # (see _assert_logged())
-            self.mp_context = None
-            self.log_queue = None
-            self.executor_kwargs = dict(initializer=init_fail)
-        super().setUp()
-
-    def test_initializer(self):
-        with self._assert_logged('ValueError: error in initializer'):
-            try:
-                future = self.executor.submit(get_init_status)
-            except BrokenExecutor:
-                # Perhaps the executor is already broken
-                pass
-            else:
-                with self.assertRaises(BrokenExecutor):
-                    future.result()
-            # At some point, the executor should break
-            t1 = time.monotonic()
-            while not self.executor._broken:
-                if time.monotonic() - t1 > 5:
-                    self.fail("executor not broken after 5 s.")
-                time.sleep(0.01)
-            # ... and from this point submit() is guaranteed to fail
-            with self.assertRaises(BrokenExecutor):
-                self.executor.submit(get_init_status)
-
-    @contextlib.contextmanager
-    def _assert_logged(self, msg):
-        if self.log_queue is not None:
-            yield
-            output = []
-            try:
-                while True:
-                    output.append(self.log_queue.get_nowait().getMessage())
-            except queue.Empty:
-                pass
-        else:
-            with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
-                yield
-            output = cm.output
-        self.assertTrue(any(msg in line for line in output),
-                        output)
-
-
-create_executor_tests(InitializerMixin)
-create_executor_tests(FailingInitializerMixin)
-
-
-class ExecutorShutdownTest:
-    def test_run_after_shutdown(self):
-        self.executor.shutdown()
-        self.assertRaises(RuntimeError,
-                          self.executor.submit,
-                          pow, 2, 5)
-
-    def test_interpreter_shutdown(self):
-        # Test the atexit hook for shutdown of worker threads and processes
-        rc, out, err = assert_python_ok('-c', """if 1:
-            from concurrent.futures import {executor_type}
-            from time import sleep
-            from test.test_concurrent_futures import sleep_and_print
-            if __name__ == "__main__":
-                context = '{context}'
-                if context == "":
-                    t = {executor_type}(5)
-                else:
-                    from multiprocessing import get_context
-                    context = get_context(context)
-                    t = {executor_type}(5, mp_context=context)
-                t.submit(sleep_and_print, 1.0, "apple")
-            """.format(executor_type=self.executor_type.__name__,
-                       context=getattr(self, "ctx", "")))
-        # Errors in atexit hooks don't change the process exit code, check
-        # stderr manually.
-        self.assertFalse(err)
-        self.assertEqual(out.strip(), b"apple")
-
-    def test_submit_after_interpreter_shutdown(self):
-        # Test the atexit hook for shutdown of worker threads and processes
-        rc, out, err = assert_python_ok('-c', """if 1:
-            import atexit
-            @atexit.register
-            def run_last():
-                try:
-                    t.submit(id, None)
-                except RuntimeError:
-                    print("runtime-error")
-                    raise
-            from concurrent.futures import {executor_type}
-            if __name__ == "__main__":
-                context = '{context}'
-                if not context:
-                    t = {executor_type}(5)
-                else:
-                    from multiprocessing import get_context
-                    context = get_context(context)
-                    t = {executor_type}(5, mp_context=context)
-                    t.submit(id, 42).result()
-            """.format(executor_type=self.executor_type.__name__,
-                       context=getattr(self, "ctx", "")))
-        # Errors in atexit hooks don't change the process exit code, check
-        # stderr manually.
-        self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
-        self.assertEqual(out.strip(), b"runtime-error")
-
-    def test_hang_issue12364(self):
-        fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
-        self.executor.shutdown()
-        for f in fs:
-            f.result()
-
-    def test_cancel_futures(self):
-        assert self.worker_count <= 5, "test needs few workers"
-        fs = [self.executor.submit(time.sleep, .1) for _ in range(50)]
-        self.executor.shutdown(cancel_futures=True)
-        # We can't guarantee the exact number of cancellations, but we can
-        # guarantee that *some* were cancelled. With few workers, many of
-        # the submitted futures should have been cancelled.
-        cancelled = [fut for fut in fs if fut.cancelled()]
-        self.assertGreater(len(cancelled), 20)
-
-        # Ensure the other futures were able to finish.
-        # Use "not fut.cancelled()" instead of "fut.done()" to include futures
-        # that may have been left in a pending state.
-        others = [fut for fut in fs if not fut.cancelled()]
-        for fut in others:
-            self.assertTrue(fut.done(), msg=f"{fut._state=}")
-            self.assertIsNone(fut.exception())
-
-        # Similar to the number of cancelled futures, we can't guarantee the
-        # exact number that completed. But, we can guarantee that at least
-        # one finished.
-        self.assertGreater(len(others), 0)
-
-    def test_hang_gh83386(self):
-        """shutdown(wait=False) doesn't hang at exit with running futures.
-
-        See https://github.com/python/cpython/issues/83386.
-        """
-        if self.executor_type == futures.ProcessPoolExecutor:
-            raise unittest.SkipTest(
-                "Hangs, see https://github.com/python/cpython/issues/83386")
-
-        rc, out, err = assert_python_ok('-c', """if True:
-            from concurrent.futures import {executor_type}
-            from test.test_concurrent_futures import sleep_and_print
-            if __name__ == "__main__":
-                if {context!r}: multiprocessing.set_start_method({context!r})
-                t = {executor_type}(max_workers=3)
-                t.submit(sleep_and_print, 1.0, "apple")
-                t.shutdown(wait=False)
-            """.format(executor_type=self.executor_type.__name__,
-                       context=getattr(self, 'ctx', None)))
-        self.assertFalse(err)
-        self.assertEqual(out.strip(), b"apple")
-
-    def test_hang_gh94440(self):
-        """shutdown(wait=True) doesn't hang when a future was submitted and
-        quickly canceled right before shutdown.
-
-        See https://github.com/python/cpython/issues/94440.
-        """
-        if not hasattr(signal, 'alarm'):
-            raise unittest.SkipTest(
-                "Tested platform does not support the alarm signal")
-
-        def timeout(_signum, _frame):
-            raise RuntimeError("timed out waiting for shutdown")
-
-        kwargs = {}
-        if getattr(self, 'ctx', None):
-            kwargs['mp_context'] = self.get_context()
-        executor = self.executor_type(max_workers=1, **kwargs)
-        executor.submit(int).result()
-        old_handler = signal.signal(signal.SIGALRM, timeout)
-        try:
-            signal.alarm(5)
-            executor.submit(int).cancel()
-            executor.shutdown(wait=True)
-        finally:
-            signal.alarm(0)
-            signal.signal(signal.SIGALRM, old_handler)
-
-
-class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
-    def test_threads_terminate(self):
-        def acquire_lock(lock):
-            lock.acquire()
-
-        sem = threading.Semaphore(0)
-        for i in range(3):
-            self.executor.submit(acquire_lock, sem)
-        self.assertEqual(len(self.executor._threads), 3)
-        for i in range(3):
-            sem.release()
-        self.executor.shutdown()
-        for t in self.executor._threads:
-            t.join()
-
-    def test_context_manager_shutdown(self):
-        with futures.ThreadPoolExecutor(max_workers=5) as e:
-            executor = e
-            self.assertEqual(list(e.map(abs, range(-5, 5))),
-                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
-
-        for t in executor._threads:
-            t.join()
-
-    def test_del_shutdown(self):
-        executor = futures.ThreadPoolExecutor(max_workers=5)
-        res = executor.map(abs, range(-5, 5))
-        threads = executor._threads
-        del executor
-
-        for t in threads:
-            t.join()
-
-        # Make sure the results were all computed before the
-        # executor got shutdown.
-        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
-
-    def test_shutdown_no_wait(self):
-        # Ensure that the executor cleans up the threads when calling
-        # shutdown with wait=False
-        executor = futures.ThreadPoolExecutor(max_workers=5)
-        res = executor.map(abs, range(-5, 5))
-        threads = executor._threads
-        executor.shutdown(wait=False)
-        for t in threads:
-            t.join()
-
-        # Make sure the results were all computed before the
-        # executor got shutdown.
-        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
-
-
-    def test_thread_names_assigned(self):
-        executor = futures.ThreadPoolExecutor(
-            max_workers=5, thread_name_prefix='SpecialPool')
-        executor.map(abs, range(-5, 5))
-        threads = executor._threads
-        del executor
-        support.gc_collect()  # For PyPy or other GCs.
-
-        for t in threads:
-            self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
-            t.join()
-
-    def test_thread_names_default(self):
-        executor = futures.ThreadPoolExecutor(max_workers=5)
-        executor.map(abs, range(-5, 5))
-        threads = executor._threads
-        del executor
-        support.gc_collect()  # For PyPy or other GCs.
-
-        for t in threads:
-            # Ensure that our default name is reasonably sane and unique when
-            # no thread_name_prefix was supplied.
-            self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
-            t.join()
-
-    def test_cancel_futures_wait_false(self):
-        # Can only be reliably tested for TPE, since PPE often hangs with
-        # `wait=False` (even without *cancel_futures*).
-        rc, out, err = assert_python_ok('-c', """if True:
-            from concurrent.futures import ThreadPoolExecutor
-            from test.test_concurrent_futures import sleep_and_print
-            if __name__ == "__main__":
-                t = ThreadPoolExecutor()
-                t.submit(sleep_and_print, .1, "apple")
-                t.shutdown(wait=False, cancel_futures=True)
-            """)
-        # Errors in atexit hooks don't change the process exit code, check
-        # stderr manually.
-        self.assertFalse(err)
-        self.assertEqual(out.strip(), b"apple")
-
-
-class ProcessPoolShutdownTest(ExecutorShutdownTest):
-    def test_processes_terminate(self):
-        def acquire_lock(lock):
-            lock.acquire()
-
-        mp_context = self.get_context()
-        if mp_context.get_start_method(allow_none=False) == "fork":
-            # fork pre-spawns, not on demand.
-            expected_num_processes = self.worker_count
-        else:
-            expected_num_processes = 3
-
-        sem = mp_context.Semaphore(0)
-        for _ in range(3):
-            self.executor.submit(acquire_lock, sem)
-        self.assertEqual(len(self.executor._processes), expected_num_processes)
-        for _ in range(3):
-            sem.release()
-        processes = self.executor._processes
-        self.executor.shutdown()
-
-        for p in processes.values():
-            p.join()
-
-    def test_context_manager_shutdown(self):
-        with futures.ProcessPoolExecutor(
-                max_workers=5, mp_context=self.get_context()) as e:
-            processes = e._processes
-            self.assertEqual(list(e.map(abs, range(-5, 5))),
-                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
-
-        for p in processes.values():
-            p.join()
-
-    def test_del_shutdown(self):
-        executor = futures.ProcessPoolExecutor(
-                max_workers=5, mp_context=self.get_context())
-        res = executor.map(abs, range(-5, 5))
-        executor_manager_thread = executor._executor_manager_thread
-        processes = executor._processes
-        call_queue = executor._call_queue
-        executor_manager_thread = executor._executor_manager_thread
-        del executor
-        support.gc_collect()  # For PyPy or other GCs.
-
-        # Make sure that all the executor resources were properly cleaned by
-        # the shutdown process
-        executor_manager_thread.join()
-        for p in processes.values():
-            p.join()
-        call_queue.join_thread()
-
-        # Make sure the results were all computed before the
-        # executor got shutdown.
-        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
-
-    def test_shutdown_no_wait(self):
-        # Ensure that the executor cleans up the processes when calling
-        # shutdown with wait=False
-        executor = futures.ProcessPoolExecutor(
-                max_workers=5, mp_context=self.get_context())
-        res = executor.map(abs, range(-5, 5))
-        processes = executor._processes
-        call_queue = executor._call_queue
-        executor_manager_thread = executor._executor_manager_thread
-        executor.shutdown(wait=False)
-
-        # Make sure that all the executor resources were properly cleaned by
-        # the shutdown process
-        executor_manager_thread.join()
-        for p in processes.values():
-            p.join()
-        call_queue.join_thread()
-
-        # Make sure the results were all computed before the executor got
-        # shutdown.
-        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
-
-
-create_executor_tests(ProcessPoolShutdownTest,
-                      executor_mixins=(ProcessPoolForkMixin,
-                                       ProcessPoolForkserverMixin,
-                                       ProcessPoolSpawnMixin))
-
-
-class WaitTests:
-    def test_20369(self):
-        # See https://bugs.python.org/issue20369
-        future = self.executor.submit(time.sleep, 1.5)
-        done, not_done = futures.wait([future, future],
-                            return_when=futures.ALL_COMPLETED)
-        self.assertEqual({future}, done)
-        self.assertEqual(set(), not_done)
-
-
-    def test_first_completed(self):
-        future1 = self.executor.submit(mul, 21, 2)
-        future2 = self.executor.submit(time.sleep, 1.5)
-
-        done, not_done = futures.wait(
-                [CANCELLED_FUTURE, future1, future2],
-                 return_when=futures.FIRST_COMPLETED)
-
-        self.assertEqual(set([future1]), done)
-        self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
-
-    def test_first_completed_some_already_completed(self):
-        future1 = self.executor.submit(time.sleep, 1.5)
-
-        finished, pending = futures.wait(
-                 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
-                 return_when=futures.FIRST_COMPLETED)
-
-        self.assertEqual(
-                set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
-                finished)
-        self.assertEqual(set([future1]), pending)
-
-    def test_first_exception(self):
-        future1 = self.executor.submit(mul, 2, 21)
-        future2 = self.executor.submit(sleep_and_raise, 1.5)
-        future3 = self.executor.submit(time.sleep, 3)
-
-        finished, pending = futures.wait(
-                [future1, future2, future3],
-                return_when=futures.FIRST_EXCEPTION)
-
-        self.assertEqual(set([future1, future2]), finished)
-        self.assertEqual(set([future3]), pending)
-
-    def test_first_exception_some_already_complete(self):
-        future1 = self.executor.submit(divmod, 21, 0)
-        future2 = self.executor.submit(time.sleep, 1.5)
-
-        finished, pending = futures.wait(
-                [SUCCESSFUL_FUTURE,
-                 CANCELLED_FUTURE,
-                 CANCELLED_AND_NOTIFIED_FUTURE,
-                 future1, future2],
-                return_when=futures.FIRST_EXCEPTION)
-
-        self.assertEqual(set([SUCCESSFUL_FUTURE,
-                              CANCELLED_AND_NOTIFIED_FUTURE,
-                              future1]), finished)
-        self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
-
-    def test_first_exception_one_already_failed(self):
-        future1 = self.executor.submit(time.sleep, 2)
-
-        finished, pending = futures.wait(
-                 [EXCEPTION_FUTURE, future1],
-                 return_when=futures.FIRST_EXCEPTION)
-
-        self.assertEqual(set([EXCEPTION_FUTURE]), finished)
-        self.assertEqual(set([future1]), pending)
-
-    def test_all_completed(self):
-        future1 = self.executor.submit(divmod, 2, 0)
-        future2 = self.executor.submit(mul, 2, 21)
-
-        finished, pending = futures.wait(
-                [SUCCESSFUL_FUTURE,
-                 CANCELLED_AND_NOTIFIED_FUTURE,
-                 EXCEPTION_FUTURE,
-                 future1,
-                 future2],
-                return_when=futures.ALL_COMPLETED)
-
-        self.assertEqual(set([SUCCESSFUL_FUTURE,
-                              CANCELLED_AND_NOTIFIED_FUTURE,
-                              EXCEPTION_FUTURE,
-                              future1,
-                              future2]), finished)
-        self.assertEqual(set(), pending)
-
-    def test_timeout(self):
-        future1 = self.executor.submit(mul, 6, 7)
-        future2 = self.executor.submit(time.sleep, 6)
-
-        finished, pending = futures.wait(
-                [CANCELLED_AND_NOTIFIED_FUTURE,
-                 EXCEPTION_FUTURE,
-                 SUCCESSFUL_FUTURE,
-                 future1, future2],
-                timeout=5,
-                return_when=futures.ALL_COMPLETED)
-
-        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
-                              EXCEPTION_FUTURE,
-                              SUCCESSFUL_FUTURE,
-                              future1]), finished)
-        self.assertEqual(set([future2]), pending)
-
-
-class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase):
-
-    def test_pending_calls_race(self):
-        # Issue #14406: multi-threaded race condition when waiting on all
-        # futures.
-        event = threading.Event()
-        def future_func():
-            event.wait()
-        oldswitchinterval = sys.getswitchinterval()
-        sys.setswitchinterval(1e-6)
-        try:
-            fs = {self.executor.submit(future_func) for i in range(100)}
-            event.set()
-            futures.wait(fs, return_when=futures.ALL_COMPLETED)
-        finally:
-            sys.setswitchinterval(oldswitchinterval)
-
-
-create_executor_tests(WaitTests,
-                      executor_mixins=(ProcessPoolForkMixin,
-                                       ProcessPoolForkserverMixin,
-                                       ProcessPoolSpawnMixin))
-
-
-class AsCompletedTests:
-    # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
-    def test_no_timeout(self):
-        future1 = self.executor.submit(mul, 2, 21)
-        future2 = self.executor.submit(mul, 7, 6)
-
-        completed = set(futures.as_completed(
-                [CANCELLED_AND_NOTIFIED_FUTURE,
-                 EXCEPTION_FUTURE,
-                 SUCCESSFUL_FUTURE,
-                 future1, future2]))
-        self.assertEqual(set(
-                [CANCELLED_AND_NOTIFIED_FUTURE,
-                 EXCEPTION_FUTURE,
-                 SUCCESSFUL_FUTURE,
-                 future1, future2]),
-                completed)
-
-    def test_zero_timeout(self):
-        future1 = self.executor.submit(time.sleep, 2)
-        completed_futures = set()
-        try:
-            for future in futures.as_completed(
-                    [CANCELLED_AND_NOTIFIED_FUTURE,
-                     EXCEPTION_FUTURE,
-                     SUCCESSFUL_FUTURE,
-                     future1],
-                    timeout=0):
-                completed_futures.add(future)
-        except futures.TimeoutError:
-            pass
-
-        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
-                              EXCEPTION_FUTURE,
-                              SUCCESSFUL_FUTURE]),
-                         completed_futures)
-
-    def test_duplicate_futures(self):
-        # Issue 20367. Duplicate futures should not raise exceptions or give
-        # duplicate responses.
-        # Issue #31641: accept arbitrary iterables.
-        future1 = self.executor.submit(time.sleep, 2)
-        completed = [
-            f for f in futures.as_completed(itertools.repeat(future1, 3))
-        ]
-        self.assertEqual(len(completed), 1)
-
-    def test_free_reference_yielded_future(self):
-        # Issue #14406: Generator should not keep references
-        # to finished futures.
-        futures_list = [Future() for _ in range(8)]
-        futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
-        futures_list.append(create_future(state=FINISHED, result=42))
-
-        with self.assertRaises(futures.TimeoutError):
-            for future in futures.as_completed(futures_list, timeout=0):
-                futures_list.remove(future)
-                wr = weakref.ref(future)
-                del future
-                support.gc_collect()  # For PyPy or other GCs.
-                self.assertIsNone(wr())
-
-        futures_list[0].set_result("test")
-        for future in futures.as_completed(futures_list):
-            futures_list.remove(future)
-            wr = weakref.ref(future)
-            del future
-            support.gc_collect()  # For PyPy or other GCs.
-            self.assertIsNone(wr())
-            if futures_list:
-                futures_list[0].set_result("test")
-
-    def test_correct_timeout_exception_msg(self):
-        futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
-                        RUNNING_FUTURE, SUCCESSFUL_FUTURE]
-
-        with self.assertRaises(futures.TimeoutError) as cm:
-            list(futures.as_completed(futures_list, timeout=0))
-
-        self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
-
-
-create_executor_tests(AsCompletedTests)
-
-
-class ExecutorTest:
-    # Executor.shutdown() and context manager usage is tested by
-    # ExecutorShutdownTest.
-    def test_submit(self):
-        future = self.executor.submit(pow, 2, 8)
-        self.assertEqual(256, future.result())
-
-    def test_submit_keyword(self):
-        future = self.executor.submit(mul, 2, y=8)
-        self.assertEqual(16, future.result())
-        future = self.executor.submit(capture, 1, self=2, fn=3)
-        self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3}))
-        with self.assertRaises(TypeError):
-            self.executor.submit(fn=capture, arg=1)
-        with self.assertRaises(TypeError):
-            self.executor.submit(arg=1)
-
-    def test_map(self):
-        self.assertEqual(
-                list(self.executor.map(pow, range(10), range(10))),
-                list(map(pow, range(10), range(10))))
-
-        self.assertEqual(
-                list(self.executor.map(pow, range(10), range(10), chunksize=3)),
-                list(map(pow, range(10), range(10))))
-
-    def test_map_exception(self):
-        i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
-        self.assertEqual(i.__next__(), (0, 1))
-        self.assertEqual(i.__next__(), (0, 1))
-        self.assertRaises(ZeroDivisionError, i.__next__)
-
-    def test_map_timeout(self):
-        results = []
-        try:
-            for i in self.executor.map(time.sleep,
-                                       [0, 0, 6],
-                                       timeout=5):
-                results.append(i)
-        except futures.TimeoutError:
-            pass
-        else:
-            self.fail('expected TimeoutError')
-
-        self.assertEqual([None, None], results)
-
-    def test_shutdown_race_issue12456(self):
-        # Issue #12456: race condition at shutdown where trying to post a
-        # sentinel in the call queue blocks (the queue is full while processes
-        # have exited).
-        self.executor.map(str, [2] * (self.worker_count + 1))
-        self.executor.shutdown()
-
-    @support.cpython_only
-    def test_no_stale_references(self):
-        # Issue #16284: check that the executors don't unnecessarily hang onto
-        # references.
-        my_object = MyObject()
-        my_object_collected = threading.Event()
-        my_object_callback = weakref.ref(
-            my_object, lambda obj: my_object_collected.set())
-        # Deliberately discarding the future.
-        self.executor.submit(my_object.my_method)
-        del my_object
-
-        collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT)
-        self.assertTrue(collected,
-                        "Stale reference not collected within timeout.")
-
-    def test_max_workers_negative(self):
-        for number in (0, -1):
-            with self.assertRaisesRegex(ValueError,
-                                        "max_workers must be greater "
-                                        "than 0"):
-                self.executor_type(max_workers=number)
-
-    def test_free_reference(self):
-        # Issue #14406: Result iterator should not keep an internal
-        # reference to result objects.
-        for obj in self.executor.map(make_dummy_object, range(10)):
-            wr = weakref.ref(obj)
-            del obj
-            support.gc_collect()  # For PyPy or other GCs.
-            self.assertIsNone(wr())
-
-
-class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
-    def test_map_submits_without_iteration(self):
-        """Tests verifying issue 11777."""
-        finished = []
-        def record_finished(n):
-            finished.append(n)
-
-        self.executor.map(record_finished, range(10))
-        self.executor.shutdown(wait=True)
-        self.assertCountEqual(finished, range(10))
-
-    def test_default_workers(self):
-        executor = self.executor_type()
-        expected = min(32, (os.cpu_count() or 1) + 4)
-        self.assertEqual(executor._max_workers, expected)
-
-    def test_saturation(self):
-        executor = self.executor_type(4)
-        def acquire_lock(lock):
-            lock.acquire()
-
-        sem = threading.Semaphore(0)
-        for i in range(15 * executor._max_workers):
-            executor.submit(acquire_lock, sem)
-        self.assertEqual(len(executor._threads), executor._max_workers)
-        for i in range(15 * executor._max_workers):
-            sem.release()
-        executor.shutdown(wait=True)
-
-    def test_idle_thread_reuse(self):
-        executor = self.executor_type()
-        executor.submit(mul, 21, 2).result()
-        executor.submit(mul, 6, 7).result()
-        executor.submit(mul, 3, 14).result()
-        self.assertEqual(len(executor._threads), 1)
-        executor.shutdown(wait=True)
-
-    @unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork')
-    @support.requires_resource('cpu')
-    def test_hang_global_shutdown_lock(self):
-        # bpo-45021: _global_shutdown_lock should be reinitialized in the child
-        # process, otherwise it will never exit
-        def submit(pool):
-            pool.submit(submit, pool)
-
-        with futures.ThreadPoolExecutor(1) as pool:
-            pool.submit(submit, pool)
-
-            for _ in range(50):
-                with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
-                    workers.submit(tuple)
-
-    def test_executor_map_current_future_cancel(self):
-        stop_event = threading.Event()
-        log = []
-
-        def log_n_wait(ident):
-            log.append(f"{ident=} started")
-            try:
-                stop_event.wait()
-            finally:
-                log.append(f"{ident=} stopped")
-
-        with self.executor_type(max_workers=1) as pool:
-            # submit work to saturate the pool
-            fut = pool.submit(log_n_wait, ident="first")
-            try:
-                with contextlib.closing(
-                    pool.map(log_n_wait, ["second", "third"], timeout=0)
-                ) as gen:
-                    with self.assertRaises(TimeoutError):
-                        next(gen)
-            finally:
-                stop_event.set()
-            fut.result()
-        # ident='second' is cancelled as a result of raising a TimeoutError
-        # ident='third' is cancelled because it remained in the collection of futures
-        self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"])
-
-
-class ProcessPoolExecutorTest(ExecutorTest):
-
-    @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit')
-    def test_max_workers_too_large(self):
-        with self.assertRaisesRegex(ValueError,
-                                    "max_workers must be <= 61"):
-            futures.ProcessPoolExecutor(max_workers=62)
-
-    def test_killed_child(self):
-        # When a child process is abruptly terminated, the whole pool gets
-        # "broken".
-        futures = [self.executor.submit(time.sleep, 3)]
-        # Get one of the processes, and terminate (kill) it
-        p = next(iter(self.executor._processes.values()))
-        p.terminate()
-        for fut in futures:
-            self.assertRaises(BrokenProcessPool, fut.result)
-        # Submitting other jobs fails as well.
-        self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
-
-    def test_map_chunksize(self):
-        def bad_map():
-            list(self.executor.map(pow, range(40), range(40), chunksize=-1))
-
-        ref = list(map(pow, range(40), range(40)))
-        self.assertEqual(
-            list(self.executor.map(pow, range(40), range(40), chunksize=6)),
-            ref)
-        self.assertEqual(
-            list(self.executor.map(pow, range(40), range(40), chunksize=50)),
-            ref)
-        self.assertEqual(
-            list(self.executor.map(pow, range(40), range(40), chunksize=40)),
-            ref)
-        self.assertRaises(ValueError, bad_map)
-
-    @classmethod
-    def _test_traceback(cls):
-        raise RuntimeError(123) # some comment
-
-    def test_traceback(self):
-        # We want ensure that the traceback from the child process is
-        # contained in the traceback raised in the main process.
-        future = self.executor.submit(self._test_traceback)
-        with self.assertRaises(Exception) as cm:
-            future.result()
-
-        exc = cm.exception
-        self.assertIs(type(exc), RuntimeError)
-        self.assertEqual(exc.args, (123,))
-        cause = exc.__cause__
-        self.assertIs(type(cause), futures.process._RemoteTraceback)
-        self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
-
-        with support.captured_stderr() as f1:
-            try:
-                raise exc
-            except RuntimeError:
-                sys.excepthook(*sys.exc_info())
-        self.assertIn('raise RuntimeError(123) # some comment',
-                      f1.getvalue())
-
-    @hashlib_helper.requires_hashdigest('md5')
-    def test_ressources_gced_in_workers(self):
-        # Ensure that argument for a job are correctly gc-ed after the job
-        # is finished
-        mgr = self.get_context().Manager()
-        obj = EventfulGCObj(mgr)
-        future = self.executor.submit(id, obj)
-        future.result()
-
-        self.assertTrue(obj.event.wait(timeout=1))
-
-        # explicitly destroy the object to ensure that EventfulGCObj.__del__()
-        # is called while manager is still running.
-        obj = None
-        support.gc_collect()
-
-        mgr.shutdown()
-        mgr.join()
-
-    def test_saturation(self):
-        executor = self.executor
-        mp_context = self.get_context()
-        sem = mp_context.Semaphore(0)
-        job_count = 15 * executor._max_workers
-        for _ in range(job_count):
-            executor.submit(sem.acquire)
-        self.assertEqual(len(executor._processes), executor._max_workers)
-        for _ in range(job_count):
-            sem.release()
-
-    def test_idle_process_reuse_one(self):
-        executor = self.executor
-        assert executor._max_workers >= 4
-        if self.get_context().get_start_method(allow_none=False) == "fork":
-            raise unittest.SkipTest("Incompatible with the fork start method.")
-        executor.submit(mul, 21, 2).result()
-        executor.submit(mul, 6, 7).result()
-        executor.submit(mul, 3, 14).result()
-        self.assertEqual(len(executor._processes), 1)
-
-    def test_idle_process_reuse_multiple(self):
-        executor = self.executor
-        assert executor._max_workers <= 5
-        if self.get_context().get_start_method(allow_none=False) == "fork":
-            raise unittest.SkipTest("Incompatible with the fork start method.")
-        executor.submit(mul, 12, 7).result()
-        executor.submit(mul, 33, 25)
-        executor.submit(mul, 25, 26).result()
-        executor.submit(mul, 18, 29)
-        executor.submit(mul, 1, 2).result()
-        executor.submit(mul, 0, 9)
-        self.assertLessEqual(len(executor._processes), 3)
-        executor.shutdown()
-
-    def test_max_tasks_per_child(self):
-        context = self.get_context()
-        if context.get_start_method(allow_none=False) == "fork":
-            with self.assertRaises(ValueError):
-                self.executor_type(1, mp_context=context, max_tasks_per_child=3)
-            return
-        # not using self.executor as we need to control construction.
-        # arguably this could go in another class w/o that mixin.
-        executor = self.executor_type(
-                1, mp_context=context, max_tasks_per_child=3)
-        f1 = executor.submit(os.getpid)
-        original_pid = f1.result()
-        # The worker pid remains the same as the worker could be reused
-        f2 = executor.submit(os.getpid)
-        self.assertEqual(f2.result(), original_pid)
-        self.assertEqual(len(executor._processes), 1)
-        f3 = executor.submit(os.getpid)
-        self.assertEqual(f3.result(), original_pid)
-
-        # A new worker is spawned, with a statistically different pid,
-        # while the previous was reaped.
-        f4 = executor.submit(os.getpid)
-        new_pid = f4.result()
-        self.assertNotEqual(original_pid, new_pid)
-        self.assertEqual(len(executor._processes), 1)
-
-        executor.shutdown()
-
-    def test_max_tasks_per_child_defaults_to_spawn_context(self):
-        # not using self.executor as we need to control construction.
-        # arguably this could go in another class w/o that mixin.
-        executor = self.executor_type(1, max_tasks_per_child=3)
-        self.assertEqual(executor._mp_context.get_start_method(), "spawn")
-
-    def test_max_tasks_early_shutdown(self):
-        context = self.get_context()
-        if context.get_start_method(allow_none=False) == "fork":
-            raise unittest.SkipTest("Incompatible with the fork start method.")
-        # not using self.executor as we need to control construction.
-        # arguably this could go in another class w/o that mixin.
-        executor = self.executor_type(
-                3, mp_context=context, max_tasks_per_child=1)
-        futures = []
-        for i in range(6):
-            futures.append(executor.submit(mul, i, i))
-        executor.shutdown()
-        for i, future in enumerate(futures):
-            self.assertEqual(future.result(), mul(i, i))
-
-
-create_executor_tests(ProcessPoolExecutorTest,
-                      executor_mixins=(ProcessPoolForkMixin,
-                                       ProcessPoolForkserverMixin,
-                                       ProcessPoolSpawnMixin))
-
-def _crash(delay=None):
-    """Induces a segfault."""
-    if delay:
-        time.sleep(delay)
-    import faulthandler
-    faulthandler.disable()
-    faulthandler._sigsegv()
-
-
-def _crash_with_data(data):
-    """Induces a segfault with dummy data in input."""
-    _crash()
-
-
-def _exit():
-    """Induces a sys exit with exitcode 1."""
-    sys.exit(1)
-
-
-def _raise_error(Err):
-    """Function that raises an Exception in process."""
-    raise Err()
-
-
-def _raise_error_ignore_stderr(Err):
-    """Function that raises an Exception in process and ignores stderr."""
-    import io
-    sys.stderr = io.StringIO()
-    raise Err()
-
-
-def _return_instance(cls):
-    """Function that returns a instance of cls."""
-    return cls()
-
-
-class CrashAtPickle(object):
-    """Bad object that triggers a segfault at pickling time."""
-    def __reduce__(self):
-        _crash()
-
-
-class CrashAtUnpickle(object):
-    """Bad object that triggers a segfault at unpickling time."""
-    def __reduce__(self):
-        return _crash, ()
-
-
-class ExitAtPickle(object):
-    """Bad object that triggers a process exit at pickling time."""
-    def __reduce__(self):
-        _exit()
-
-
-class ExitAtUnpickle(object):
-    """Bad object that triggers a process exit at unpickling time."""
-    def __reduce__(self):
-        return _exit, ()
-
-
-class ErrorAtPickle(object):
-    """Bad object that triggers an error at pickling time."""
-    def __reduce__(self):
-        from pickle import PicklingError
-        raise PicklingError("Error in pickle")
-
-
-class ErrorAtUnpickle(object):
-    """Bad object that triggers an error at unpickling time."""
-    def __reduce__(self):
-        from pickle import UnpicklingError
-        return _raise_error_ignore_stderr, (UnpicklingError, )
-
-
-class ExecutorDeadlockTest:
-    TIMEOUT = support.SHORT_TIMEOUT
-
-    def _fail_on_deadlock(self, executor):
-        # If we did not recover before TIMEOUT seconds, consider that the
-        # executor is in a deadlock state and forcefully clean all its
-        # composants.
-        import faulthandler
-        from tempfile import TemporaryFile
-        with TemporaryFile(mode="w+") as f:
-            faulthandler.dump_traceback(file=f)
-            f.seek(0)
-            tb = f.read()
-        for p in executor._processes.values():
-            p.terminate()
-        # This should be safe to call executor.shutdown here as all possible
-        # deadlocks should have been broken.
-        executor.shutdown(wait=True)
-        print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
-        self.fail(f"Executor deadlock:\n\n{tb}")
-
-
-    def _check_crash(self, error, func, *args, ignore_stderr=False):
-        # test for deadlock caused by crashes in a pool
-        self.executor.shutdown(wait=True)
-
-        executor = self.executor_type(
-            max_workers=2, mp_context=self.get_context())
-        res = executor.submit(func, *args)
-
-        if ignore_stderr:
-            cm = support.captured_stderr()
-        else:
-            cm = contextlib.nullcontext()
-
-        try:
-            with self.assertRaises(error):
-                with cm:
-                    res.result(timeout=self.TIMEOUT)
-        except futures.TimeoutError:
-            # If we did not recover before TIMEOUT seconds,
-            # consider that the executor is in a deadlock state
-            self._fail_on_deadlock(executor)
-        executor.shutdown(wait=True)
-
-    def test_error_at_task_pickle(self):
-        # Check problem occurring while pickling a task in
-        # the task_handler thread
-        self._check_crash(PicklingError, id, ErrorAtPickle())
-
-    def test_exit_at_task_unpickle(self):
-        # Check problem occurring while unpickling a task on workers
-        self._check_crash(BrokenProcessPool, id, ExitAtUnpickle())
-
-    def test_error_at_task_unpickle(self):
-        # Check problem occurring while unpickling a task on workers
-        self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle())
-
-    def test_crash_at_task_unpickle(self):
-        # Check problem occurring while unpickling a task on workers
-        self._check_crash(BrokenProcessPool, id, CrashAtUnpickle())
-
-    def test_crash_during_func_exec_on_worker(self):
-        # Check problem occurring during func execution on workers
-        self._check_crash(BrokenProcessPool, _crash)
-
-    def test_exit_during_func_exec_on_worker(self):
-        # Check problem occurring during func execution on workers
-        self._check_crash(SystemExit, _exit)
-
-    def test_error_during_func_exec_on_worker(self):
-        # Check problem occurring during func execution on workers
-        self._check_crash(RuntimeError, _raise_error, RuntimeError)
-
-    def test_crash_during_result_pickle_on_worker(self):
-        # Check problem occurring while pickling a task result
-        # on workers
-        self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle)
-
-    def test_exit_during_result_pickle_on_worker(self):
-        # Check problem occurring while pickling a task result
-        # on workers
-        self._check_crash(SystemExit, _return_instance, ExitAtPickle)
-
-    def test_error_during_result_pickle_on_worker(self):
-        # Check problem occurring while pickling a task result
-        # on workers
-        self._check_crash(PicklingError, _return_instance, ErrorAtPickle)
-
-    def test_error_during_result_unpickle_in_result_handler(self):
-        # Check problem occurring while unpickling a task in
-        # the result_handler thread
-        self._check_crash(BrokenProcessPool,
-                          _return_instance, ErrorAtUnpickle,
-                          ignore_stderr=True)
-
-    def test_exit_during_result_unpickle_in_result_handler(self):
-        # Check problem occurring while unpickling a task in
-        # the result_handler thread
-        self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle)
-
-    def test_shutdown_deadlock(self):
-        # Test that the pool calling shutdown do not cause deadlock
-        # if a worker fails after the shutdown call.
-        self.executor.shutdown(wait=True)
-        with self.executor_type(max_workers=2,
-                                mp_context=self.get_context()) as executor:
-            self.executor = executor  # Allow clean up in fail_on_deadlock
-            f = executor.submit(_crash, delay=.1)
-            executor.shutdown(wait=True)
-            with self.assertRaises(BrokenProcessPool):
-                f.result()
-
-    def test_shutdown_deadlock_pickle(self):
-        # Test that the pool calling shutdown with wait=False does not cause
-        # a deadlock if a task fails at pickle after the shutdown call.
-        # Reported in bpo-39104.
-        self.executor.shutdown(wait=True)
-        with self.executor_type(max_workers=2,
-                                mp_context=self.get_context()) as executor:
-            self.executor = executor  # Allow clean up in fail_on_deadlock
-
-            # Start the executor and get the executor_manager_thread to collect
-            # the threads and avoid dangling thread that should be cleaned up
-            # asynchronously.
-            executor.submit(id, 42).result()
-            executor_manager = executor._executor_manager_thread
-
-            # Submit a task that fails at pickle and shutdown the executor
-            # without waiting
-            f = executor.submit(id, ErrorAtPickle())
-            executor.shutdown(wait=False)
-            with self.assertRaises(PicklingError):
-                f.result()
-
-        # Make sure the executor is eventually shutdown and do not leave
-        # dangling threads
-        executor_manager.join()
-
-    def test_crash_big_data(self):
-        # Test that there is a clean exception instad of a deadlock when a
-        # child process crashes while some data is being written into the
-        # queue.
-        # https://github.com/python/cpython/issues/94777
-        self.executor.shutdown(wait=True)
-        data = "a" * support.PIPE_MAX_SIZE
-        with self.executor_type(max_workers=2,
-                                mp_context=self.get_context()) as executor:
-            self.executor = executor  # Allow clean up in fail_on_deadlock
-            with self.assertRaises(BrokenProcessPool):
-                list(executor.map(_crash_with_data, [data] * 10))
-
-
-create_executor_tests(ExecutorDeadlockTest,
-                      executor_mixins=(ProcessPoolForkMixin,
-                                       ProcessPoolForkserverMixin,
-                                       ProcessPoolSpawnMixin))
-
-
-class FutureTests(BaseTestCase):
-    def test_done_callback_with_result(self):
-        callback_result = None
-        def fn(callback_future):
-            nonlocal callback_result
-            callback_result = callback_future.result()
-
-        f = Future()
-        f.add_done_callback(fn)
-        f.set_result(5)
-        self.assertEqual(5, callback_result)
-
-    def test_done_callback_with_exception(self):
-        callback_exception = None
-        def fn(callback_future):
-            nonlocal callback_exception
-            callback_exception = callback_future.exception()
-
-        f = Future()
-        f.add_done_callback(fn)
-        f.set_exception(Exception('test'))
-        self.assertEqual(('test',), callback_exception.args)
-
-    def test_done_callback_with_cancel(self):
-        was_cancelled = None
-        def fn(callback_future):
-            nonlocal was_cancelled
-            was_cancelled = callback_future.cancelled()
-
-        f = Future()
-        f.add_done_callback(fn)
-        self.assertTrue(f.cancel())
-        self.assertTrue(was_cancelled)
-
-    def test_done_callback_raises(self):
-        with support.captured_stderr() as stderr:
-            raising_was_called = False
-            fn_was_called = False
-
-            def raising_fn(callback_future):
-                nonlocal raising_was_called
-                raising_was_called = True
-                raise Exception('doh!')
-
-            def fn(callback_future):
-                nonlocal fn_was_called
-                fn_was_called = True
-
-            f = Future()
-            f.add_done_callback(raising_fn)
-            f.add_done_callback(fn)
-            f.set_result(5)
-            self.assertTrue(raising_was_called)
-            self.assertTrue(fn_was_called)
-            self.assertIn('Exception: doh!', stderr.getvalue())
-
-    def test_done_callback_already_successful(self):
-        callback_result = None
-        def fn(callback_future):
-            nonlocal callback_result
-            callback_result = callback_future.result()
-
-        f = Future()
-        f.set_result(5)
-        f.add_done_callback(fn)
-        self.assertEqual(5, callback_result)
-
-    def test_done_callback_already_failed(self):
-        callback_exception = None
-        def fn(callback_future):
-            nonlocal callback_exception
-            callback_exception = callback_future.exception()
-
-        f = Future()
-        f.set_exception(Exception('test'))
-        f.add_done_callback(fn)
-        self.assertEqual(('test',), callback_exception.args)
-
-    def test_done_callback_already_cancelled(self):
-        was_cancelled = None
-        def fn(callback_future):
-            nonlocal was_cancelled
-            was_cancelled = callback_future.cancelled()
-
-        f = Future()
-        self.assertTrue(f.cancel())
-        f.add_done_callback(fn)
-        self.assertTrue(was_cancelled)
-
-    def test_done_callback_raises_already_succeeded(self):
-        with support.captured_stderr() as stderr:
-            def raising_fn(callback_future):
-                raise Exception('doh!')
-
-            f = Future()
-
-            # Set the result first to simulate a future that runs instantly,
-            # effectively allowing the callback to be run immediately.
-            f.set_result(5)
-            f.add_done_callback(raising_fn)
-
-            self.assertIn('exception calling callback for', stderr.getvalue())
-            self.assertIn('doh!', stderr.getvalue())
-
-
-    def test_repr(self):
-        self.assertRegex(repr(PENDING_FUTURE),
-                         '<Future at 0x[0-9a-f]+ state=pending>')
-        self.assertRegex(repr(RUNNING_FUTURE),
-                         '<Future at 0x[0-9a-f]+ state=running>')
-        self.assertRegex(repr(CANCELLED_FUTURE),
-                         '<Future at 0x[0-9a-f]+ state=cancelled>')
-        self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
-                         '<Future at 0x[0-9a-f]+ state=cancelled>')
-        self.assertRegex(
-                repr(EXCEPTION_FUTURE),
-                '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
-        self.assertRegex(
-                repr(SUCCESSFUL_FUTURE),
-                '<Future at 0x[0-9a-f]+ state=finished returned int>')
-
-
-    def test_cancel(self):
-        f1 = create_future(state=PENDING)
-        f2 = create_future(state=RUNNING)
-        f3 = create_future(state=CANCELLED)
-        f4 = create_future(state=CANCELLED_AND_NOTIFIED)
-        f5 = create_future(state=FINISHED, exception=OSError())
-        f6 = create_future(state=FINISHED, result=5)
-
-        self.assertTrue(f1.cancel())
-        self.assertEqual(f1._state, CANCELLED)
-
-        self.assertFalse(f2.cancel())
-        self.assertEqual(f2._state, RUNNING)
-
-        self.assertTrue(f3.cancel())
-        self.assertEqual(f3._state, CANCELLED)
-
-        self.assertTrue(f4.cancel())
-        self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
-
-        self.assertFalse(f5.cancel())
-        self.assertEqual(f5._state, FINISHED)
-
-        self.assertFalse(f6.cancel())
-        self.assertEqual(f6._state, FINISHED)
-
-    def test_cancelled(self):
-        self.assertFalse(PENDING_FUTURE.cancelled())
-        self.assertFalse(RUNNING_FUTURE.cancelled())
-        self.assertTrue(CANCELLED_FUTURE.cancelled())
-        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
-        self.assertFalse(EXCEPTION_FUTURE.cancelled())
-        self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
-
-    def test_done(self):
-        self.assertFalse(PENDING_FUTURE.done())
-        self.assertFalse(RUNNING_FUTURE.done())
-        self.assertTrue(CANCELLED_FUTURE.done())
-        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
-        self.assertTrue(EXCEPTION_FUTURE.done())
-        self.assertTrue(SUCCESSFUL_FUTURE.done())
-
-    def test_running(self):
-        self.assertFalse(PENDING_FUTURE.running())
-        self.assertTrue(RUNNING_FUTURE.running())
-        self.assertFalse(CANCELLED_FUTURE.running())
-        self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
-        self.assertFalse(EXCEPTION_FUTURE.running())
-        self.assertFalse(SUCCESSFUL_FUTURE.running())
-
-    def test_result_with_timeout(self):
-        self.assertRaises(futures.TimeoutError,
-                          PENDING_FUTURE.result, timeout=0)
-        self.assertRaises(futures.TimeoutError,
-                          RUNNING_FUTURE.result, timeout=0)
-        self.assertRaises(futures.CancelledError,
-                          CANCELLED_FUTURE.result, timeout=0)
-        self.assertRaises(futures.CancelledError,
-                          CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
-        self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
-        self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
-
-    def test_result_with_success(self):
-        # TODO(brian@sweetapp.com): This test is timing dependent.
-        def notification():
-            # Wait until the main thread is waiting for the result.
-            time.sleep(1)
-            f1.set_result(42)
-
-        f1 = create_future(state=PENDING)
-        t = threading.Thread(target=notification)
-        t.start()
-
-        self.assertEqual(f1.result(timeout=5), 42)
-        t.join()
-
-    def test_result_with_cancel(self):
-        # TODO(brian@sweetapp.com): This test is timing dependent.
-        def notification():
-            # Wait until the main thread is waiting for the result.
-            time.sleep(1)
-            f1.cancel()
-
-        f1 = create_future(state=PENDING)
-        t = threading.Thread(target=notification)
-        t.start()
-
-        self.assertRaises(futures.CancelledError,
-                          f1.result, timeout=support.SHORT_TIMEOUT)
-        t.join()
-
-    def test_exception_with_timeout(self):
-        self.assertRaises(futures.TimeoutError,
-                          PENDING_FUTURE.exception, timeout=0)
-        self.assertRaises(futures.TimeoutError,
-                          RUNNING_FUTURE.exception, timeout=0)
-        self.assertRaises(futures.CancelledError,
-                          CANCELLED_FUTURE.exception, timeout=0)
-        self.assertRaises(futures.CancelledError,
-                          CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
-        self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
-                                   OSError))
-        self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
-
-    def test_exception_with_success(self):
-        def notification():
-            # Wait until the main thread is waiting for the exception.
-            time.sleep(1)
-            with f1._condition:
-                f1._state = FINISHED
-                f1._exception = OSError()
-                f1._condition.notify_all()
-
-        f1 = create_future(state=PENDING)
-        t = threading.Thread(target=notification)
-        t.start()
-
-        self.assertTrue(isinstance(f1.exception(timeout=support.SHORT_TIMEOUT), OSError))
-        t.join()
-
-    def test_multiple_set_result(self):
-        f = create_future(state=PENDING)
-        f.set_result(1)
-
-        with self.assertRaisesRegex(
-                futures.InvalidStateError,
-                'FINISHED: <Future at 0x[0-9a-f]+ '
-                'state=finished returned int>'
-        ):
-            f.set_result(2)
-
-        self.assertTrue(f.done())
-        self.assertEqual(f.result(), 1)
-
-    def test_multiple_set_exception(self):
-        f = create_future(state=PENDING)
-        e = ValueError()
-        f.set_exception(e)
-
-        with self.assertRaisesRegex(
-                futures.InvalidStateError,
-                'FINISHED: <Future at 0x[0-9a-f]+ '
-                'state=finished raised ValueError>'
-        ):
-            f.set_exception(Exception())
-
-        self.assertEqual(f.exception(), e)
-
-
-def setUpModule():
-    unittest.addModuleCleanup(multiprocessing.util._cleanup_tests)
-    thread_info = threading_helper.threading_setup()
-    unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
-
-
-if __name__ == "__main__":
-    unittest.main()
diff --git a/Lib/test/test_concurrent_futures/__init__.py b/Lib/test/test_concurrent_futures/__init__.py
new file mode 100644 (file)
index 0000000..430fa93
--- /dev/null
@@ -0,0 +1,16 @@
+import os.path
+import unittest
+from test import support
+from test.support import import_helper
+
+# Skip tests if _multiprocessing wasn't built.
+import_helper.import_module('_multiprocessing')
+
+if support.check_sanitizer(address=True, memory=True):
+    # gh-90791: Skip the test because it is too slow when Python is built
+    # with ASAN/MSAN: between 5 and 20 minutes on GitHub Actions.
+    raise unittest.SkipTest("test too slow on ASAN/MSAN build")
+
+
+def load_tests(*args):
+    return support.load_package_tests(os.path.dirname(__file__), *args)
index 1e7d4344740943d54285fb455d1ca345d3880ad0..36278bdd50197119d855fa4c168338aff1fdaa73 100644 (file)
@@ -53,7 +53,6 @@ class ExecutorTest:
         self.assertEqual(i.__next__(), (0, 1))
         self.assertRaises(ZeroDivisionError, i.__next__)
 
-    @support.requires_resource('walltime')
     def test_map_timeout(self):
         results = []
         try:
diff --git a/Lib/test/test_concurrent_futures/test_as_completed.py b/Lib/test/test_concurrent_futures/test_as_completed.py
new file mode 100644 (file)
index 0000000..de53349
--- /dev/null
@@ -0,0 +1,111 @@
+import itertools
+import time
+import unittest
+import weakref
+from concurrent import futures
+from concurrent.futures._base import (
+    CANCELLED_AND_NOTIFIED, FINISHED, Future)
+
+from test import support
+
+from .util import (
+    PENDING_FUTURE, RUNNING_FUTURE,
+    CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE,
+    create_future, create_executor_tests, setup_module)
+
+
+def mul(x, y):
+    return x * y
+
+
+class AsCompletedTests:
+    # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
+    def test_no_timeout(self):
+        future1 = self.executor.submit(mul, 2, 21)
+        future2 = self.executor.submit(mul, 7, 6)
+
+        completed = set(futures.as_completed(
+                [CANCELLED_AND_NOTIFIED_FUTURE,
+                 EXCEPTION_FUTURE,
+                 SUCCESSFUL_FUTURE,
+                 future1, future2]))
+        self.assertEqual(set(
+                [CANCELLED_AND_NOTIFIED_FUTURE,
+                 EXCEPTION_FUTURE,
+                 SUCCESSFUL_FUTURE,
+                 future1, future2]),
+                completed)
+
+    def test_zero_timeout(self):
+        future1 = self.executor.submit(time.sleep, 2)
+        completed_futures = set()
+        try:
+            for future in futures.as_completed(
+                    [CANCELLED_AND_NOTIFIED_FUTURE,
+                     EXCEPTION_FUTURE,
+                     SUCCESSFUL_FUTURE,
+                     future1],
+                    timeout=0):
+                completed_futures.add(future)
+        except futures.TimeoutError:
+            pass
+
+        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
+                              EXCEPTION_FUTURE,
+                              SUCCESSFUL_FUTURE]),
+                         completed_futures)
+
+    def test_duplicate_futures(self):
+        # Issue 20367. Duplicate futures should not raise exceptions or give
+        # duplicate responses.
+        # Issue #31641: accept arbitrary iterables.
+        future1 = self.executor.submit(time.sleep, 2)
+        completed = [
+            f for f in futures.as_completed(itertools.repeat(future1, 3))
+        ]
+        self.assertEqual(len(completed), 1)
+
+    def test_free_reference_yielded_future(self):
+        # Issue #14406: Generator should not keep references
+        # to finished futures.
+        futures_list = [Future() for _ in range(8)]
+        futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
+        futures_list.append(create_future(state=FINISHED, result=42))
+
+        with self.assertRaises(futures.TimeoutError):
+            for future in futures.as_completed(futures_list, timeout=0):
+                futures_list.remove(future)
+                wr = weakref.ref(future)
+                del future
+                support.gc_collect()  # For PyPy or other GCs.
+                self.assertIsNone(wr())
+
+        futures_list[0].set_result("test")
+        for future in futures.as_completed(futures_list):
+            futures_list.remove(future)
+            wr = weakref.ref(future)
+            del future
+            support.gc_collect()  # For PyPy or other GCs.
+            self.assertIsNone(wr())
+            if futures_list:
+                futures_list[0].set_result("test")
+
+    def test_correct_timeout_exception_msg(self):
+        futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
+                        RUNNING_FUTURE, SUCCESSFUL_FUTURE]
+
+        with self.assertRaises(futures.TimeoutError) as cm:
+            list(futures.as_completed(futures_list, timeout=0))
+
+        self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
+
+
+create_executor_tests(globals(), AsCompletedTests)
+
+
+def setUpModule():
+    setup_module()
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/Lib/test/test_concurrent_futures/test_deadlock.py b/Lib/test/test_concurrent_futures/test_deadlock.py
new file mode 100644 (file)
index 0000000..6b78b36
--- /dev/null
@@ -0,0 +1,253 @@
+import contextlib
+import sys
+import time
+import unittest
+from pickle import PicklingError
+from concurrent import futures
+from concurrent.futures.process import BrokenProcessPool
+
+from test import support
+
+from .util import (
+    create_executor_tests, setup_module,
+    ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)
+
+
+def _crash(delay=None):
+    """Induces a segfault."""
+    if delay:
+        time.sleep(delay)
+    import faulthandler
+    faulthandler.disable()
+    faulthandler._sigsegv()
+
+
+def _crash_with_data(data):
+    """Induces a segfault with dummy data in input."""
+    _crash()
+
+
+def _exit():
+    """Induces a sys exit with exitcode 1."""
+    sys.exit(1)
+
+
+def _raise_error(Err):
+    """Function that raises an Exception in process."""
+    raise Err()
+
+
+def _raise_error_ignore_stderr(Err):
+    """Function that raises an Exception in process and ignores stderr."""
+    import io
+    sys.stderr = io.StringIO()
+    raise Err()
+
+
+def _return_instance(cls):
+    """Function that returns a instance of cls."""
+    return cls()
+
+
+class CrashAtPickle(object):
+    """Bad object that triggers a segfault at pickling time."""
+    def __reduce__(self):
+        _crash()
+
+
+class CrashAtUnpickle(object):
+    """Bad object that triggers a segfault at unpickling time."""
+    def __reduce__(self):
+        return _crash, ()
+
+
+class ExitAtPickle(object):
+    """Bad object that triggers a process exit at pickling time."""
+    def __reduce__(self):
+        _exit()
+
+
+class ExitAtUnpickle(object):
+    """Bad object that triggers a process exit at unpickling time."""
+    def __reduce__(self):
+        return _exit, ()
+
+
+class ErrorAtPickle(object):
+    """Bad object that triggers an error at pickling time."""
+    def __reduce__(self):
+        from pickle import PicklingError
+        raise PicklingError("Error in pickle")
+
+
+class ErrorAtUnpickle(object):
+    """Bad object that triggers an error at unpickling time."""
+    def __reduce__(self):
+        from pickle import UnpicklingError
+        return _raise_error_ignore_stderr, (UnpicklingError, )
+
+
+class ExecutorDeadlockTest:
+    TIMEOUT = support.SHORT_TIMEOUT
+
+    def _fail_on_deadlock(self, executor):
+        # If we did not recover before TIMEOUT seconds, consider that the
+        # executor is in a deadlock state and forcefully clean all its
+        # composants.
+        import faulthandler
+        from tempfile import TemporaryFile
+        with TemporaryFile(mode="w+") as f:
+            faulthandler.dump_traceback(file=f)
+            f.seek(0)
+            tb = f.read()
+        for p in executor._processes.values():
+            p.terminate()
+        # This should be safe to call executor.shutdown here as all possible
+        # deadlocks should have been broken.
+        executor.shutdown(wait=True)
+        print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
+        self.fail(f"Executor deadlock:\n\n{tb}")
+
+
+    def _check_crash(self, error, func, *args, ignore_stderr=False):
+        # test for deadlock caused by crashes in a pool
+        self.executor.shutdown(wait=True)
+
+        executor = self.executor_type(
+            max_workers=2, mp_context=self.get_context())
+        res = executor.submit(func, *args)
+
+        if ignore_stderr:
+            cm = support.captured_stderr()
+        else:
+            cm = contextlib.nullcontext()
+
+        try:
+            with self.assertRaises(error):
+                with cm:
+                    res.result(timeout=self.TIMEOUT)
+        except futures.TimeoutError:
+            # If we did not recover before TIMEOUT seconds,
+            # consider that the executor is in a deadlock state
+            self._fail_on_deadlock(executor)
+        executor.shutdown(wait=True)
+
+    def test_error_at_task_pickle(self):
+        # Check problem occurring while pickling a task in
+        # the task_handler thread
+        self._check_crash(PicklingError, id, ErrorAtPickle())
+
+    def test_exit_at_task_unpickle(self):
+        # Check problem occurring while unpickling a task on workers
+        self._check_crash(BrokenProcessPool, id, ExitAtUnpickle())
+
+    def test_error_at_task_unpickle(self):
+        # Check problem occurring while unpickling a task on workers
+        self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle())
+
+    def test_crash_at_task_unpickle(self):
+        # Check problem occurring while unpickling a task on workers
+        self._check_crash(BrokenProcessPool, id, CrashAtUnpickle())
+
+    def test_crash_during_func_exec_on_worker(self):
+        # Check problem occurring during func execution on workers
+        self._check_crash(BrokenProcessPool, _crash)
+
+    def test_exit_during_func_exec_on_worker(self):
+        # Check problem occurring during func execution on workers
+        self._check_crash(SystemExit, _exit)
+
+    def test_error_during_func_exec_on_worker(self):
+        # Check problem occurring during func execution on workers
+        self._check_crash(RuntimeError, _raise_error, RuntimeError)
+
+    def test_crash_during_result_pickle_on_worker(self):
+        # Check problem occurring while pickling a task result
+        # on workers
+        self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle)
+
+    def test_exit_during_result_pickle_on_worker(self):
+        # Check problem occurring while pickling a task result
+        # on workers
+        self._check_crash(SystemExit, _return_instance, ExitAtPickle)
+
+    def test_error_during_result_pickle_on_worker(self):
+        # Check problem occurring while pickling a task result
+        # on workers
+        self._check_crash(PicklingError, _return_instance, ErrorAtPickle)
+
+    def test_error_during_result_unpickle_in_result_handler(self):
+        # Check problem occurring while unpickling a task in
+        # the result_handler thread
+        self._check_crash(BrokenProcessPool,
+                          _return_instance, ErrorAtUnpickle,
+                          ignore_stderr=True)
+
+    def test_exit_during_result_unpickle_in_result_handler(self):
+        # Check problem occurring while unpickling a task in
+        # the result_handler thread
+        self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle)
+
+    def test_shutdown_deadlock(self):
+        # Test that the pool calling shutdown do not cause deadlock
+        # if a worker fails after the shutdown call.
+        self.executor.shutdown(wait=True)
+        with self.executor_type(max_workers=2,
+                                mp_context=self.get_context()) as executor:
+            self.executor = executor  # Allow clean up in fail_on_deadlock
+            f = executor.submit(_crash, delay=.1)
+            executor.shutdown(wait=True)
+            with self.assertRaises(BrokenProcessPool):
+                f.result()
+
+    def test_shutdown_deadlock_pickle(self):
+        # Test that the pool calling shutdown with wait=False does not cause
+        # a deadlock if a task fails at pickle after the shutdown call.
+        # Reported in bpo-39104.
+        self.executor.shutdown(wait=True)
+        with self.executor_type(max_workers=2,
+                                mp_context=self.get_context()) as executor:
+            self.executor = executor  # Allow clean up in fail_on_deadlock
+
+            # Start the executor and get the executor_manager_thread to collect
+            # the threads and avoid dangling thread that should be cleaned up
+            # asynchronously.
+            executor.submit(id, 42).result()
+            executor_manager = executor._executor_manager_thread
+
+            # Submit a task that fails at pickle and shutdown the executor
+            # without waiting
+            f = executor.submit(id, ErrorAtPickle())
+            executor.shutdown(wait=False)
+            with self.assertRaises(PicklingError):
+                f.result()
+
+        # Make sure the executor is eventually shutdown and do not leave
+        # dangling threads
+        executor_manager.join()
+
+    def test_crash_big_data(self):
+        # Test that there is a clean exception instad of a deadlock when a
+        # child process crashes while some data is being written into the
+        # queue.
+        # https://github.com/python/cpython/issues/94777
+        self.executor.shutdown(wait=True)
+        data = "a" * support.PIPE_MAX_SIZE
+        with self.executor_type(max_workers=2,
+                                mp_context=self.get_context()) as executor:
+            self.executor = executor  # Allow clean up in fail_on_deadlock
+            with self.assertRaises(BrokenProcessPool):
+                list(executor.map(_crash_with_data, [data] * 10))
+
+
+create_executor_tests(globals(), ExecutorDeadlockTest,
+                      executor_mixins=(ProcessPoolForkMixin,
+                                       ProcessPoolForkserverMixin,
+                                       ProcessPoolSpawnMixin))
+
+def setUpModule():
+    setup_module()
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/Lib/test/test_concurrent_futures/test_future.py b/Lib/test/test_concurrent_futures/test_future.py
new file mode 100644 (file)
index 0000000..4066ea1
--- /dev/null
@@ -0,0 +1,291 @@
+import threading
+import time
+import unittest
+from concurrent import futures
+from concurrent.futures._base import (
+    PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
+
+from test import support
+
+from .util import (
+    PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE,
+    CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE,
+    BaseTestCase, create_future, setup_module)
+
+
+class FutureTests(BaseTestCase):
+    def test_done_callback_with_result(self):
+        callback_result = None
+        def fn(callback_future):
+            nonlocal callback_result
+            callback_result = callback_future.result()
+
+        f = Future()
+        f.add_done_callback(fn)
+        f.set_result(5)
+        self.assertEqual(5, callback_result)
+
+    def test_done_callback_with_exception(self):
+        callback_exception = None
+        def fn(callback_future):
+            nonlocal callback_exception
+            callback_exception = callback_future.exception()
+
+        f = Future()
+        f.add_done_callback(fn)
+        f.set_exception(Exception('test'))
+        self.assertEqual(('test',), callback_exception.args)
+
+    def test_done_callback_with_cancel(self):
+        was_cancelled = None
+        def fn(callback_future):
+            nonlocal was_cancelled
+            was_cancelled = callback_future.cancelled()
+
+        f = Future()
+        f.add_done_callback(fn)
+        self.assertTrue(f.cancel())
+        self.assertTrue(was_cancelled)
+
+    def test_done_callback_raises(self):
+        with support.captured_stderr() as stderr:
+            raising_was_called = False
+            fn_was_called = False
+
+            def raising_fn(callback_future):
+                nonlocal raising_was_called
+                raising_was_called = True
+                raise Exception('doh!')
+
+            def fn(callback_future):
+                nonlocal fn_was_called
+                fn_was_called = True
+
+            f = Future()
+            f.add_done_callback(raising_fn)
+            f.add_done_callback(fn)
+            f.set_result(5)
+            self.assertTrue(raising_was_called)
+            self.assertTrue(fn_was_called)
+            self.assertIn('Exception: doh!', stderr.getvalue())
+
+    def test_done_callback_already_successful(self):
+        callback_result = None
+        def fn(callback_future):
+            nonlocal callback_result
+            callback_result = callback_future.result()
+
+        f = Future()
+        f.set_result(5)
+        f.add_done_callback(fn)
+        self.assertEqual(5, callback_result)
+
+    def test_done_callback_already_failed(self):
+        callback_exception = None
+        def fn(callback_future):
+            nonlocal callback_exception
+            callback_exception = callback_future.exception()
+
+        f = Future()
+        f.set_exception(Exception('test'))
+        f.add_done_callback(fn)
+        self.assertEqual(('test',), callback_exception.args)
+
+    def test_done_callback_already_cancelled(self):
+        was_cancelled = None
+        def fn(callback_future):
+            nonlocal was_cancelled
+            was_cancelled = callback_future.cancelled()
+
+        f = Future()
+        self.assertTrue(f.cancel())
+        f.add_done_callback(fn)
+        self.assertTrue(was_cancelled)
+
+    def test_done_callback_raises_already_succeeded(self):
+        with support.captured_stderr() as stderr:
+            def raising_fn(callback_future):
+                raise Exception('doh!')
+
+            f = Future()
+
+            # Set the result first to simulate a future that runs instantly,
+            # effectively allowing the callback to be run immediately.
+            f.set_result(5)
+            f.add_done_callback(raising_fn)
+
+            self.assertIn('exception calling callback for', stderr.getvalue())
+            self.assertIn('doh!', stderr.getvalue())
+
+
+    def test_repr(self):
+        self.assertRegex(repr(PENDING_FUTURE),
+                         '<Future at 0x[0-9a-f]+ state=pending>')
+        self.assertRegex(repr(RUNNING_FUTURE),
+                         '<Future at 0x[0-9a-f]+ state=running>')
+        self.assertRegex(repr(CANCELLED_FUTURE),
+                         '<Future at 0x[0-9a-f]+ state=cancelled>')
+        self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
+                         '<Future at 0x[0-9a-f]+ state=cancelled>')
+        self.assertRegex(
+                repr(EXCEPTION_FUTURE),
+                '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
+        self.assertRegex(
+                repr(SUCCESSFUL_FUTURE),
+                '<Future at 0x[0-9a-f]+ state=finished returned int>')
+
+    def test_cancel(self):
+        f1 = create_future(state=PENDING)
+        f2 = create_future(state=RUNNING)
+        f3 = create_future(state=CANCELLED)
+        f4 = create_future(state=CANCELLED_AND_NOTIFIED)
+        f5 = create_future(state=FINISHED, exception=OSError())
+        f6 = create_future(state=FINISHED, result=5)
+
+        self.assertTrue(f1.cancel())
+        self.assertEqual(f1._state, CANCELLED)
+
+        self.assertFalse(f2.cancel())
+        self.assertEqual(f2._state, RUNNING)
+
+        self.assertTrue(f3.cancel())
+        self.assertEqual(f3._state, CANCELLED)
+
+        self.assertTrue(f4.cancel())
+        self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
+
+        self.assertFalse(f5.cancel())
+        self.assertEqual(f5._state, FINISHED)
+
+        self.assertFalse(f6.cancel())
+        self.assertEqual(f6._state, FINISHED)
+
+    def test_cancelled(self):
+        self.assertFalse(PENDING_FUTURE.cancelled())
+        self.assertFalse(RUNNING_FUTURE.cancelled())
+        self.assertTrue(CANCELLED_FUTURE.cancelled())
+        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
+        self.assertFalse(EXCEPTION_FUTURE.cancelled())
+        self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
+
+    def test_done(self):
+        self.assertFalse(PENDING_FUTURE.done())
+        self.assertFalse(RUNNING_FUTURE.done())
+        self.assertTrue(CANCELLED_FUTURE.done())
+        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
+        self.assertTrue(EXCEPTION_FUTURE.done())
+        self.assertTrue(SUCCESSFUL_FUTURE.done())
+
+    def test_running(self):
+        self.assertFalse(PENDING_FUTURE.running())
+        self.assertTrue(RUNNING_FUTURE.running())
+        self.assertFalse(CANCELLED_FUTURE.running())
+        self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
+        self.assertFalse(EXCEPTION_FUTURE.running())
+        self.assertFalse(SUCCESSFUL_FUTURE.running())
+
+    def test_result_with_timeout(self):
+        self.assertRaises(futures.TimeoutError,
+                          PENDING_FUTURE.result, timeout=0)
+        self.assertRaises(futures.TimeoutError,
+                          RUNNING_FUTURE.result, timeout=0)
+        self.assertRaises(futures.CancelledError,
+                          CANCELLED_FUTURE.result, timeout=0)
+        self.assertRaises(futures.CancelledError,
+                          CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
+        self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
+        self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
+
+    def test_result_with_success(self):
+        # TODO(brian@sweetapp.com): This test is timing dependent.
+        def notification():
+            # Wait until the main thread is waiting for the result.
+            time.sleep(1)
+            f1.set_result(42)
+
+        f1 = create_future(state=PENDING)
+        t = threading.Thread(target=notification)
+        t.start()
+
+        self.assertEqual(f1.result(timeout=5), 42)
+        t.join()
+
+    def test_result_with_cancel(self):
+        # TODO(brian@sweetapp.com): This test is timing dependent.
+        def notification():
+            # Wait until the main thread is waiting for the result.
+            time.sleep(1)
+            f1.cancel()
+
+        f1 = create_future(state=PENDING)
+        t = threading.Thread(target=notification)
+        t.start()
+
+        self.assertRaises(futures.CancelledError,
+                          f1.result, timeout=support.SHORT_TIMEOUT)
+        t.join()
+
+    def test_exception_with_timeout(self):
+        self.assertRaises(futures.TimeoutError,
+                          PENDING_FUTURE.exception, timeout=0)
+        self.assertRaises(futures.TimeoutError,
+                          RUNNING_FUTURE.exception, timeout=0)
+        self.assertRaises(futures.CancelledError,
+                          CANCELLED_FUTURE.exception, timeout=0)
+        self.assertRaises(futures.CancelledError,
+                          CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
+        self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
+                                   OSError))
+        self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
+
+    def test_exception_with_success(self):
+        def notification():
+            # Wait until the main thread is waiting for the exception.
+            time.sleep(1)
+            with f1._condition:
+                f1._state = FINISHED
+                f1._exception = OSError()
+                f1._condition.notify_all()
+
+        f1 = create_future(state=PENDING)
+        t = threading.Thread(target=notification)
+        t.start()
+
+        self.assertTrue(isinstance(f1.exception(timeout=support.SHORT_TIMEOUT), OSError))
+        t.join()
+
+    def test_multiple_set_result(self):
+        f = create_future(state=PENDING)
+        f.set_result(1)
+
+        with self.assertRaisesRegex(
+                futures.InvalidStateError,
+                'FINISHED: <Future at 0x[0-9a-f]+ '
+                'state=finished returned int>'
+        ):
+            f.set_result(2)
+
+        self.assertTrue(f.done())
+        self.assertEqual(f.result(), 1)
+
+    def test_multiple_set_exception(self):
+        f = create_future(state=PENDING)
+        e = ValueError()
+        f.set_exception(e)
+
+        with self.assertRaisesRegex(
+                futures.InvalidStateError,
+                'FINISHED: <Future at 0x[0-9a-f]+ '
+                'state=finished raised ValueError>'
+        ):
+            f.set_exception(Exception())
+
+        self.assertEqual(f.exception(), e)
+
+
+def setUpModule():
+    setup_module()
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/Lib/test/test_concurrent_futures/test_init.py b/Lib/test/test_concurrent_futures/test_init.py
new file mode 100644 (file)
index 0000000..138d6ee
--- /dev/null
@@ -0,0 +1,117 @@
+import contextlib
+import logging
+import queue
+import time
+import unittest
+from concurrent.futures._base import BrokenExecutor
+from logging.handlers import QueueHandler
+
+from test import support
+
+from .util import ExecutorMixin, create_executor_tests, setup_module
+
+
+INITIALIZER_STATUS = 'uninitialized'
+
+def init(x):
+    global INITIALIZER_STATUS
+    INITIALIZER_STATUS = x
+
+def get_init_status():
+    return INITIALIZER_STATUS
+
+def init_fail(log_queue=None):
+    if log_queue is not None:
+        logger = logging.getLogger('concurrent.futures')
+        logger.addHandler(QueueHandler(log_queue))
+        logger.setLevel('CRITICAL')
+        logger.propagate = False
+    time.sleep(0.1)  # let some futures be scheduled
+    raise ValueError('error in initializer')
+
+
+class InitializerMixin(ExecutorMixin):
+    worker_count = 2
+
+    def setUp(self):
+        global INITIALIZER_STATUS
+        INITIALIZER_STATUS = 'uninitialized'
+        self.executor_kwargs = dict(initializer=init,
+                                    initargs=('initialized',))
+        super().setUp()
+
+    def test_initializer(self):
+        futures = [self.executor.submit(get_init_status)
+                   for _ in range(self.worker_count)]
+
+        for f in futures:
+            self.assertEqual(f.result(), 'initialized')
+
+
+class FailingInitializerMixin(ExecutorMixin):
+    worker_count = 2
+
+    def setUp(self):
+        if hasattr(self, "ctx"):
+            # Pass a queue to redirect the child's logging output
+            self.mp_context = self.get_context()
+            self.log_queue = self.mp_context.Queue()
+            self.executor_kwargs = dict(initializer=init_fail,
+                                        initargs=(self.log_queue,))
+        else:
+            # In a thread pool, the child shares our logging setup
+            # (see _assert_logged())
+            self.mp_context = None
+            self.log_queue = None
+            self.executor_kwargs = dict(initializer=init_fail)
+        super().setUp()
+
+    def test_initializer(self):
+        with self._assert_logged('ValueError: error in initializer'):
+            try:
+                future = self.executor.submit(get_init_status)
+            except BrokenExecutor:
+                # Perhaps the executor is already broken
+                pass
+            else:
+                with self.assertRaises(BrokenExecutor):
+                    future.result()
+
+            # At some point, the executor should break
+            t1 = time.monotonic()
+            while not self.executor._broken:
+                if time.monotonic() - t1 > 5:
+                    self.fail("executor not broken after 5 s.")
+                time.sleep(0.01)
+            # ... and from this point submit() is guaranteed to fail
+            with self.assertRaises(BrokenExecutor):
+                self.executor.submit(get_init_status)
+
+    @contextlib.contextmanager
+    def _assert_logged(self, msg):
+        if self.log_queue is not None:
+            yield
+            output = []
+            try:
+                while True:
+                    output.append(self.log_queue.get_nowait().getMessage())
+            except queue.Empty:
+                pass
+        else:
+            with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
+                yield
+            output = cm.output
+        self.assertTrue(any(msg in line for line in output),
+                        output)
+
+
+create_executor_tests(globals(), InitializerMixin)
+create_executor_tests(globals(), FailingInitializerMixin)
+
+
+def setUpModule():
+    setup_module()
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py
new file mode 100644 (file)
index 0000000..7763a49
--- /dev/null
@@ -0,0 +1,202 @@
+import os
+import sys
+import time
+import unittest
+from concurrent import futures
+from concurrent.futures.process import BrokenProcessPool
+
+from test import support
+from test.support import hashlib_helper
+
+from .executor import ExecutorTest, mul
+from .util import (
+    ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin,
+    create_executor_tests, setup_module)
+
+
+class EventfulGCObj():
+    def __init__(self, mgr):
+        self.event = mgr.Event()
+
+    def __del__(self):
+        self.event.set()
+
+
+class ProcessPoolExecutorTest(ExecutorTest):
+
+    @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit')
+    def test_max_workers_too_large(self):
+        with self.assertRaisesRegex(ValueError,
+                                    "max_workers must be <= 61"):
+            futures.ProcessPoolExecutor(max_workers=62)
+
+    def test_killed_child(self):
+        # When a child process is abruptly terminated, the whole pool gets
+        # "broken".
+        futures = [self.executor.submit(time.sleep, 3)]
+        # Get one of the processes, and terminate (kill) it
+        p = next(iter(self.executor._processes.values()))
+        p.terminate()
+        for fut in futures:
+            self.assertRaises(BrokenProcessPool, fut.result)
+        # Submitting other jobs fails as well.
+        self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
+
+    def test_map_chunksize(self):
+        def bad_map():
+            list(self.executor.map(pow, range(40), range(40), chunksize=-1))
+
+        ref = list(map(pow, range(40), range(40)))
+        self.assertEqual(
+            list(self.executor.map(pow, range(40), range(40), chunksize=6)),
+            ref)
+        self.assertEqual(
+            list(self.executor.map(pow, range(40), range(40), chunksize=50)),
+            ref)
+        self.assertEqual(
+            list(self.executor.map(pow, range(40), range(40), chunksize=40)),
+            ref)
+        self.assertRaises(ValueError, bad_map)
+
+    @classmethod
+    def _test_traceback(cls):
+        raise RuntimeError(123) # some comment
+
+    def test_traceback(self):
+        # We want ensure that the traceback from the child process is
+        # contained in the traceback raised in the main process.
+        future = self.executor.submit(self._test_traceback)
+        with self.assertRaises(Exception) as cm:
+            future.result()
+
+        exc = cm.exception
+        self.assertIs(type(exc), RuntimeError)
+        self.assertEqual(exc.args, (123,))
+        cause = exc.__cause__
+        self.assertIs(type(cause), futures.process._RemoteTraceback)
+        self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
+
+        with support.captured_stderr() as f1:
+            try:
+                raise exc
+            except RuntimeError:
+                sys.excepthook(*sys.exc_info())
+        self.assertIn('raise RuntimeError(123) # some comment',
+                      f1.getvalue())
+
+    @hashlib_helper.requires_hashdigest('md5')
+    def test_ressources_gced_in_workers(self):
+        # Ensure that argument for a job are correctly gc-ed after the job
+        # is finished
+        mgr = self.get_context().Manager()
+        obj = EventfulGCObj(mgr)
+        future = self.executor.submit(id, obj)
+        future.result()
+
+        self.assertTrue(obj.event.wait(timeout=1))
+
+        # explicitly destroy the object to ensure that EventfulGCObj.__del__()
+        # is called while manager is still running.
+        obj = None
+        support.gc_collect()
+
+        mgr.shutdown()
+        mgr.join()
+
+    def test_saturation(self):
+        executor = self.executor
+        mp_context = self.get_context()
+        sem = mp_context.Semaphore(0)
+        job_count = 15 * executor._max_workers
+        for _ in range(job_count):
+            executor.submit(sem.acquire)
+        self.assertEqual(len(executor._processes), executor._max_workers)
+        for _ in range(job_count):
+            sem.release()
+
+    def test_idle_process_reuse_one(self):
+        executor = self.executor
+        assert executor._max_workers >= 4
+        if self.get_context().get_start_method(allow_none=False) == "fork":
+            raise unittest.SkipTest("Incompatible with the fork start method.")
+        executor.submit(mul, 21, 2).result()
+        executor.submit(mul, 6, 7).result()
+        executor.submit(mul, 3, 14).result()
+        self.assertEqual(len(executor._processes), 1)
+
+    def test_idle_process_reuse_multiple(self):
+        executor = self.executor
+        assert executor._max_workers <= 5
+        if self.get_context().get_start_method(allow_none=False) == "fork":
+            raise unittest.SkipTest("Incompatible with the fork start method.")
+        executor.submit(mul, 12, 7).result()
+        executor.submit(mul, 33, 25)
+        executor.submit(mul, 25, 26).result()
+        executor.submit(mul, 18, 29)
+        executor.submit(mul, 1, 2).result()
+        executor.submit(mul, 0, 9)
+        self.assertLessEqual(len(executor._processes), 3)
+        executor.shutdown()
+
+    def test_max_tasks_per_child(self):
+        context = self.get_context()
+        if context.get_start_method(allow_none=False) == "fork":
+            with self.assertRaises(ValueError):
+                self.executor_type(1, mp_context=context, max_tasks_per_child=3)
+            return
+        # not using self.executor as we need to control construction.
+        # arguably this could go in another class w/o that mixin.
+        executor = self.executor_type(
+                1, mp_context=context, max_tasks_per_child=3)
+        f1 = executor.submit(os.getpid)
+        original_pid = f1.result()
+        # The worker pid remains the same as the worker could be reused
+        f2 = executor.submit(os.getpid)
+        self.assertEqual(f2.result(), original_pid)
+        self.assertEqual(len(executor._processes), 1)
+        f3 = executor.submit(os.getpid)
+        self.assertEqual(f3.result(), original_pid)
+
+        # A new worker is spawned, with a statistically different pid,
+        # while the previous was reaped.
+        f4 = executor.submit(os.getpid)
+        new_pid = f4.result()
+        self.assertNotEqual(original_pid, new_pid)
+        self.assertEqual(len(executor._processes), 1)
+
+        executor.shutdown()
+
+    def test_max_tasks_per_child_defaults_to_spawn_context(self):
+        # not using self.executor as we need to control construction.
+        # arguably this could go in another class w/o that mixin.
+        executor = self.executor_type(1, max_tasks_per_child=3)
+        self.assertEqual(executor._mp_context.get_start_method(), "spawn")
+
+    def test_max_tasks_early_shutdown(self):
+        context = self.get_context()
+        if context.get_start_method(allow_none=False) == "fork":
+            raise unittest.SkipTest("Incompatible with the fork start method.")
+        # not using self.executor as we need to control construction.
+        # arguably this could go in another class w/o that mixin.
+        executor = self.executor_type(
+                3, mp_context=context, max_tasks_per_child=1)
+        futures = []
+        for i in range(6):
+            futures.append(executor.submit(mul, i, i))
+        executor.shutdown()
+        for i, future in enumerate(futures):
+            self.assertEqual(future.result(), mul(i, i))
+
+
+create_executor_tests(globals(), ProcessPoolExecutorTest,
+                      executor_mixins=(ProcessPoolForkMixin,
+                                       ProcessPoolForkserverMixin,
+                                       ProcessPoolSpawnMixin))
+
+
+def setUpModule():
+    setup_module()
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py
new file mode 100644 (file)
index 0000000..45dab7a
--- /dev/null
@@ -0,0 +1,343 @@
+import signal
+import sys
+import threading
+import time
+import unittest
+from concurrent import futures
+
+from test import support
+from test.support.script_helper import assert_python_ok
+
+from .util import (
+    BaseTestCase, ThreadPoolMixin, ProcessPoolForkMixin,
+    ProcessPoolForkserverMixin, ProcessPoolSpawnMixin,
+    create_executor_tests, setup_module)
+
+
+def sleep_and_print(t, msg):
+    time.sleep(t)
+    print(msg)
+    sys.stdout.flush()
+
+
+class ExecutorShutdownTest:
+    def test_run_after_shutdown(self):
+        self.executor.shutdown()
+        self.assertRaises(RuntimeError,
+                          self.executor.submit,
+                          pow, 2, 5)
+
+    def test_interpreter_shutdown(self):
+        # Test the atexit hook for shutdown of worker threads and processes
+        rc, out, err = assert_python_ok('-c', """if 1:
+            from concurrent.futures import {executor_type}
+            from time import sleep
+            from test.test_concurrent_futures.test_shutdown import sleep_and_print
+            if __name__ == "__main__":
+                context = '{context}'
+                if context == "":
+                    t = {executor_type}(5)
+                else:
+                    from multiprocessing import get_context
+                    context = get_context(context)
+                    t = {executor_type}(5, mp_context=context)
+                t.submit(sleep_and_print, 1.0, "apple")
+            """.format(executor_type=self.executor_type.__name__,
+                       context=getattr(self, "ctx", "")))
+        # Errors in atexit hooks don't change the process exit code, check
+        # stderr manually.
+        self.assertFalse(err)
+        self.assertEqual(out.strip(), b"apple")
+
+    def test_submit_after_interpreter_shutdown(self):
+        # Test the atexit hook for shutdown of worker threads and processes
+        rc, out, err = assert_python_ok('-c', """if 1:
+            import atexit
+            @atexit.register
+            def run_last():
+                try:
+                    t.submit(id, None)
+                except RuntimeError:
+                    print("runtime-error")
+                    raise
+            from concurrent.futures import {executor_type}
+            if __name__ == "__main__":
+                context = '{context}'
+                if not context:
+                    t = {executor_type}(5)
+                else:
+                    from multiprocessing import get_context
+                    context = get_context(context)
+                    t = {executor_type}(5, mp_context=context)
+                    t.submit(id, 42).result()
+            """.format(executor_type=self.executor_type.__name__,
+                       context=getattr(self, "ctx", "")))
+        # Errors in atexit hooks don't change the process exit code, check
+        # stderr manually.
+        self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
+        self.assertEqual(out.strip(), b"runtime-error")
+
+    def test_hang_issue12364(self):
+        fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
+        self.executor.shutdown()
+        for f in fs:
+            f.result()
+
+    def test_cancel_futures(self):
+        assert self.worker_count <= 5, "test needs few workers"
+        fs = [self.executor.submit(time.sleep, .1) for _ in range(50)]
+        self.executor.shutdown(cancel_futures=True)
+        # We can't guarantee the exact number of cancellations, but we can
+        # guarantee that *some* were cancelled. With few workers, many of
+        # the submitted futures should have been cancelled.
+        cancelled = [fut for fut in fs if fut.cancelled()]
+        self.assertGreater(len(cancelled), 20)
+
+        # Ensure the other futures were able to finish.
+        # Use "not fut.cancelled()" instead of "fut.done()" to include futures
+        # that may have been left in a pending state.
+        others = [fut for fut in fs if not fut.cancelled()]
+        for fut in others:
+            self.assertTrue(fut.done(), msg=f"{fut._state=}")
+            self.assertIsNone(fut.exception())
+
+        # Similar to the number of cancelled futures, we can't guarantee the
+        # exact number that completed. But, we can guarantee that at least
+        # one finished.
+        self.assertGreater(len(others), 0)
+
+    def test_hang_gh83386(self):
+        """shutdown(wait=False) doesn't hang at exit with running futures.
+
+        See https://github.com/python/cpython/issues/83386.
+        """
+        if self.executor_type == futures.ProcessPoolExecutor:
+            raise unittest.SkipTest(
+                "Hangs, see https://github.com/python/cpython/issues/83386")
+
+        rc, out, err = assert_python_ok('-c', """if True:
+            from concurrent.futures import {executor_type}
+            from test.test_concurrent_futures.test_shutdown import sleep_and_print
+            if __name__ == "__main__":
+                if {context!r}: multiprocessing.set_start_method({context!r})
+                t = {executor_type}(max_workers=3)
+                t.submit(sleep_and_print, 1.0, "apple")
+                t.shutdown(wait=False)
+            """.format(executor_type=self.executor_type.__name__,
+                       context=getattr(self, 'ctx', None)))
+        self.assertFalse(err)
+        self.assertEqual(out.strip(), b"apple")
+
+    def test_hang_gh94440(self):
+        """shutdown(wait=True) doesn't hang when a future was submitted and
+        quickly canceled right before shutdown.
+
+        See https://github.com/python/cpython/issues/94440.
+        """
+        if not hasattr(signal, 'alarm'):
+            raise unittest.SkipTest(
+                "Tested platform does not support the alarm signal")
+
+        def timeout(_signum, _frame):
+            raise RuntimeError("timed out waiting for shutdown")
+
+        kwargs = {}
+        if getattr(self, 'ctx', None):
+            kwargs['mp_context'] = self.get_context()
+        executor = self.executor_type(max_workers=1, **kwargs)
+        executor.submit(int).result()
+        old_handler = signal.signal(signal.SIGALRM, timeout)
+        try:
+            signal.alarm(5)
+            executor.submit(int).cancel()
+            executor.shutdown(wait=True)
+        finally:
+            signal.alarm(0)
+            signal.signal(signal.SIGALRM, old_handler)
+
+
+class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
+    def test_threads_terminate(self):
+        def acquire_lock(lock):
+            lock.acquire()
+
+        sem = threading.Semaphore(0)
+        for i in range(3):
+            self.executor.submit(acquire_lock, sem)
+        self.assertEqual(len(self.executor._threads), 3)
+        for i in range(3):
+            sem.release()
+        self.executor.shutdown()
+        for t in self.executor._threads:
+            t.join()
+
+    def test_context_manager_shutdown(self):
+        with futures.ThreadPoolExecutor(max_workers=5) as e:
+            executor = e
+            self.assertEqual(list(e.map(abs, range(-5, 5))),
+                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
+
+        for t in executor._threads:
+            t.join()
+
+    def test_del_shutdown(self):
+        executor = futures.ThreadPoolExecutor(max_workers=5)
+        res = executor.map(abs, range(-5, 5))
+        threads = executor._threads
+        del executor
+
+        for t in threads:
+            t.join()
+
+        # Make sure the results were all computed before the
+        # executor got shutdown.
+        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
+
+    def test_shutdown_no_wait(self):
+        # Ensure that the executor cleans up the threads when calling
+        # shutdown with wait=False
+        executor = futures.ThreadPoolExecutor(max_workers=5)
+        res = executor.map(abs, range(-5, 5))
+        threads = executor._threads
+        executor.shutdown(wait=False)
+        for t in threads:
+            t.join()
+
+        # Make sure the results were all computed before the
+        # executor got shutdown.
+        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
+
+
+    def test_thread_names_assigned(self):
+        executor = futures.ThreadPoolExecutor(
+            max_workers=5, thread_name_prefix='SpecialPool')
+        executor.map(abs, range(-5, 5))
+        threads = executor._threads
+        del executor
+        support.gc_collect()  # For PyPy or other GCs.
+
+        for t in threads:
+            self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
+            t.join()
+
+    def test_thread_names_default(self):
+        executor = futures.ThreadPoolExecutor(max_workers=5)
+        executor.map(abs, range(-5, 5))
+        threads = executor._threads
+        del executor
+        support.gc_collect()  # For PyPy or other GCs.
+
+        for t in threads:
+            # Ensure that our default name is reasonably sane and unique when
+            # no thread_name_prefix was supplied.
+            self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
+            t.join()
+
+    def test_cancel_futures_wait_false(self):
+        # Can only be reliably tested for TPE, since PPE often hangs with
+        # `wait=False` (even without *cancel_futures*).
+        rc, out, err = assert_python_ok('-c', """if True:
+            from concurrent.futures import ThreadPoolExecutor
+            from test.test_concurrent_futures.test_shutdown import sleep_and_print
+            if __name__ == "__main__":
+                t = ThreadPoolExecutor()
+                t.submit(sleep_and_print, .1, "apple")
+                t.shutdown(wait=False, cancel_futures=True)
+            """)
+        # Errors in atexit hooks don't change the process exit code, check
+        # stderr manually.
+        self.assertFalse(err)
+        self.assertEqual(out.strip(), b"apple")
+
+
+class ProcessPoolShutdownTest(ExecutorShutdownTest):
+    def test_processes_terminate(self):
+        def acquire_lock(lock):
+            lock.acquire()
+
+        mp_context = self.get_context()
+        if mp_context.get_start_method(allow_none=False) == "fork":
+            # fork pre-spawns, not on demand.
+            expected_num_processes = self.worker_count
+        else:
+            expected_num_processes = 3
+
+        sem = mp_context.Semaphore(0)
+        for _ in range(3):
+            self.executor.submit(acquire_lock, sem)
+        self.assertEqual(len(self.executor._processes), expected_num_processes)
+        for _ in range(3):
+            sem.release()
+        processes = self.executor._processes
+        self.executor.shutdown()
+
+        for p in processes.values():
+            p.join()
+
+    def test_context_manager_shutdown(self):
+        with futures.ProcessPoolExecutor(
+                max_workers=5, mp_context=self.get_context()) as e:
+            processes = e._processes
+            self.assertEqual(list(e.map(abs, range(-5, 5))),
+                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
+
+        for p in processes.values():
+            p.join()
+
+    def test_del_shutdown(self):
+        executor = futures.ProcessPoolExecutor(
+                max_workers=5, mp_context=self.get_context())
+        res = executor.map(abs, range(-5, 5))
+        executor_manager_thread = executor._executor_manager_thread
+        processes = executor._processes
+        call_queue = executor._call_queue
+        executor_manager_thread = executor._executor_manager_thread
+        del executor
+        support.gc_collect()  # For PyPy or other GCs.
+
+        # Make sure that all the executor resources were properly cleaned by
+        # the shutdown process
+        executor_manager_thread.join()
+        for p in processes.values():
+            p.join()
+        call_queue.join_thread()
+
+        # Make sure the results were all computed before the
+        # executor got shutdown.
+        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
+
+    def test_shutdown_no_wait(self):
+        # Ensure that the executor cleans up the processes when calling
+        # shutdown with wait=False
+        executor = futures.ProcessPoolExecutor(
+                max_workers=5, mp_context=self.get_context())
+        res = executor.map(abs, range(-5, 5))
+        processes = executor._processes
+        call_queue = executor._call_queue
+        executor_manager_thread = executor._executor_manager_thread
+        executor.shutdown(wait=False)
+
+        # Make sure that all the executor resources were properly cleaned by
+        # the shutdown process
+        executor_manager_thread.join()
+        for p in processes.values():
+            p.join()
+        call_queue.join_thread()
+
+        # Make sure the results were all computed before the executor got
+        # shutdown.
+        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
+
+
+create_executor_tests(globals(), ProcessPoolShutdownTest,
+                      executor_mixins=(ProcessPoolForkMixin,
+                                       ProcessPoolForkserverMixin,
+                                       ProcessPoolSpawnMixin))
+
+
+def setUpModule():
+    setup_module()
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/Lib/test/test_concurrent_futures/test_thread_pool.py b/Lib/test/test_concurrent_futures/test_thread_pool.py
new file mode 100644 (file)
index 0000000..812f989
--- /dev/null
@@ -0,0 +1,100 @@
+import contextlib
+import multiprocessing as mp
+import multiprocessing.process
+import multiprocessing.util
+import os
+import threading
+import unittest
+from concurrent import futures
+from test import support
+
+from .executor import ExecutorTest, mul
+from .util import BaseTestCase, ThreadPoolMixin, setup_module
+
+
+class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
+    def test_map_submits_without_iteration(self):
+        """Tests verifying issue 11777."""
+        finished = []
+        def record_finished(n):
+            finished.append(n)
+
+        self.executor.map(record_finished, range(10))
+        self.executor.shutdown(wait=True)
+        self.assertCountEqual(finished, range(10))
+
+    def test_default_workers(self):
+        executor = self.executor_type()
+        expected = min(32, (os.cpu_count() or 1) + 4)
+        self.assertEqual(executor._max_workers, expected)
+
+    def test_saturation(self):
+        executor = self.executor_type(4)
+        def acquire_lock(lock):
+            lock.acquire()
+
+        sem = threading.Semaphore(0)
+        for i in range(15 * executor._max_workers):
+            executor.submit(acquire_lock, sem)
+        self.assertEqual(len(executor._threads), executor._max_workers)
+        for i in range(15 * executor._max_workers):
+            sem.release()
+        executor.shutdown(wait=True)
+
+    def test_idle_thread_reuse(self):
+        executor = self.executor_type()
+        executor.submit(mul, 21, 2).result()
+        executor.submit(mul, 6, 7).result()
+        executor.submit(mul, 3, 14).result()
+        self.assertEqual(len(executor._threads), 1)
+        executor.shutdown(wait=True)
+
+    @unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork')
+    @support.requires_resource('cpu')
+    def test_hang_global_shutdown_lock(self):
+        # bpo-45021: _global_shutdown_lock should be reinitialized in the child
+        # process, otherwise it will never exit
+        def submit(pool):
+            pool.submit(submit, pool)
+
+        with futures.ThreadPoolExecutor(1) as pool:
+            pool.submit(submit, pool)
+
+            for _ in range(50):
+                with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
+                    workers.submit(tuple)
+
+    def test_executor_map_current_future_cancel(self):
+        stop_event = threading.Event()
+        log = []
+
+        def log_n_wait(ident):
+            log.append(f"{ident=} started")
+            try:
+                stop_event.wait()
+            finally:
+                log.append(f"{ident=} stopped")
+
+        with self.executor_type(max_workers=1) as pool:
+            # submit work to saturate the pool
+            fut = pool.submit(log_n_wait, ident="first")
+            try:
+                with contextlib.closing(
+                    pool.map(log_n_wait, ["second", "third"], timeout=0)
+                ) as gen:
+                    with self.assertRaises(TimeoutError):
+                        next(gen)
+            finally:
+                stop_event.set()
+            fut.result()
+        # ident='second' is cancelled as a result of raising a TimeoutError
+        # ident='third' is cancelled because it remained in the collection of futures
+        self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"])
+
+
+def setUpModule():
+    setup_module()
+
+
+if __name__ == "__main__":
+    unittest.main()
index 3f64ca173c02f64ff9aa1b37639b508ba1170947..e4bea8b05aced6eab5000d856d42d125356a2a0a 100644 (file)
@@ -3,7 +3,6 @@ import threading
 import time
 import unittest
 from concurrent import futures
-from test import support
 
 from .util import (
     CANCELLED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE,
@@ -54,7 +53,6 @@ class WaitTests:
                 finished)
         self.assertEqual(set([future1]), pending)
 
-    @support.requires_resource('walltime')
     def test_first_exception(self):
         future1 = self.executor.submit(mul, 2, 21)
         future2 = self.executor.submit(sleep_and_raise, 1.5)
@@ -112,7 +110,6 @@ class WaitTests:
                               future2]), finished)
         self.assertEqual(set(), pending)
 
-    @support.requires_resource('walltime')
     def test_timeout(self):
         future1 = self.executor.submit(mul, 6, 7)
         future2 = self.executor.submit(time.sleep, 6)
diff --git a/Lib/test/test_concurrent_futures/util.py b/Lib/test/test_concurrent_futures/util.py
new file mode 100644 (file)
index 0000000..dc48bec
--- /dev/null
@@ -0,0 +1,141 @@
+import multiprocessing
+import sys
+import time
+import unittest
+from concurrent import futures
+from concurrent.futures._base import (
+    PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
+    )
+from concurrent.futures.process import _check_system_limits
+
+from test import support
+from test.support import threading_helper
+
+
+def create_future(state=PENDING, exception=None, result=None):
+    f = Future()
+    f._state = state
+    f._exception = exception
+    f._result = result
+    return f
+
+
+PENDING_FUTURE = create_future(state=PENDING)
+RUNNING_FUTURE = create_future(state=RUNNING)
+CANCELLED_FUTURE = create_future(state=CANCELLED)
+CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
+EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
+SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
+
+
+class BaseTestCase(unittest.TestCase):
+    def setUp(self):
+        self._thread_key = threading_helper.threading_setup()
+
+    def tearDown(self):
+        support.reap_children()
+        threading_helper.threading_cleanup(*self._thread_key)
+
+
+class ExecutorMixin:
+    worker_count = 5
+    executor_kwargs = {}
+
+    def setUp(self):
+        super().setUp()
+
+        self.t1 = time.monotonic()
+        if hasattr(self, "ctx"):
+            self.executor = self.executor_type(
+                max_workers=self.worker_count,
+                mp_context=self.get_context(),
+                **self.executor_kwargs)
+        else:
+            self.executor = self.executor_type(
+                max_workers=self.worker_count,
+                **self.executor_kwargs)
+
+    def tearDown(self):
+        self.executor.shutdown(wait=True)
+        self.executor = None
+
+        dt = time.monotonic() - self.t1
+        if support.verbose:
+            print("%.2fs" % dt, end=' ')
+        self.assertLess(dt, 300, "synchronization issue: test lasted too long")
+
+        super().tearDown()
+
+    def get_context(self):
+        return multiprocessing.get_context(self.ctx)
+
+
+class ThreadPoolMixin(ExecutorMixin):
+    executor_type = futures.ThreadPoolExecutor
+
+
+class ProcessPoolForkMixin(ExecutorMixin):
+    executor_type = futures.ProcessPoolExecutor
+    ctx = "fork"
+
+    def get_context(self):
+        try:
+            _check_system_limits()
+        except NotImplementedError:
+            self.skipTest("ProcessPoolExecutor unavailable on this system")
+        if sys.platform == "win32":
+            self.skipTest("require unix system")
+        return super().get_context()
+
+
+class ProcessPoolSpawnMixin(ExecutorMixin):
+    executor_type = futures.ProcessPoolExecutor
+    ctx = "spawn"
+
+    def get_context(self):
+        try:
+            _check_system_limits()
+        except NotImplementedError:
+            self.skipTest("ProcessPoolExecutor unavailable on this system")
+        return super().get_context()
+
+
+class ProcessPoolForkserverMixin(ExecutorMixin):
+    executor_type = futures.ProcessPoolExecutor
+    ctx = "forkserver"
+
+    def get_context(self):
+        try:
+            _check_system_limits()
+        except NotImplementedError:
+            self.skipTest("ProcessPoolExecutor unavailable on this system")
+        if sys.platform == "win32":
+            self.skipTest("require unix system")
+        return super().get_context()
+
+
+def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,),
+                          executor_mixins=(ThreadPoolMixin,
+                                           ProcessPoolForkMixin,
+                                           ProcessPoolForkserverMixin,
+                                           ProcessPoolSpawnMixin)):
+    def strip_mixin(name):
+        if name.endswith(('Mixin', 'Tests')):
+            return name[:-5]
+        elif name.endswith('Test'):
+            return name[:-4]
+        else:
+            return name
+
+    module = remote_globals['__name__']
+    for exe in executor_mixins:
+        name = ("%s%sTest"
+                % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__)))
+        cls = type(name, (mixin,) + (exe,) + bases, {'__module__': module})
+        remote_globals[name] = cls
+
+
+def setup_module():
+    unittest.addModuleCleanup(multiprocessing.util._cleanup_tests)
+    thread_info = threading_helper.threading_setup()
+    unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
diff --git a/Misc/NEWS.d/next/Tests/2023-08-24-06-10-36.gh-issue-108388.YCVB0D.rst b/Misc/NEWS.d/next/Tests/2023-08-24-06-10-36.gh-issue-108388.YCVB0D.rst
new file mode 100644 (file)
index 0000000..ddff07b
--- /dev/null
@@ -0,0 +1,2 @@
+Convert test_concurrent_futures to a package of 7 sub-tests. Patch by Victor
+Stinner.