PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
BrokenExecutor)
from concurrent.futures.process import BrokenProcessPool, _check_system_limits
-from multiprocessing import get_context
import multiprocessing.process
import multiprocessing.util
+import multiprocessing as mp
if support.check_sanitizer(address=True, memory=True):
self.executor = self.executor_type(
max_workers=self.worker_count,
**self.executor_kwargs)
- self._prime_executor()
def tearDown(self):
self.executor.shutdown(wait=True)
super().tearDown()
def get_context(self):
- return get_context(self.ctx)
-
- def _prime_executor(self):
- # Make sure that the executor is ready to do work before running the
- # tests. This should reduce the probability of timeouts in the tests.
- futures = [self.executor.submit(time.sleep, 0.1)
- for _ in range(self.worker_count)]
- for f in futures:
- f.result()
+ return mp.get_context(self.ctx)
class ThreadPoolMixin(ExecutorMixin):
with self.assertRaises(BrokenExecutor):
self.executor.submit(get_init_status)
- def _prime_executor(self):
- pass
-
@contextlib.contextmanager
def _assert_logged(self, msg):
if self.log_queue is not None:
f.result()
def test_cancel_futures(self):
- executor = self.executor_type(max_workers=3)
- fs = [executor.submit(time.sleep, .1) for _ in range(50)]
- executor.shutdown(cancel_futures=True)
+ assert self.worker_count <= 5, "test needs few workers"
+ fs = [self.executor.submit(time.sleep, .1) for _ in range(50)]
+ self.executor.shutdown(cancel_futures=True)
# We can't guarantee the exact number of cancellations, but we can
- # guarantee that *some* were cancelled. With setting max_workers to 3,
- # most of the submitted futures should have been cancelled.
+ # guarantee that *some* were cancelled. With few workers, many of
+ # the submitted futures should have been cancelled.
cancelled = [fut for fut in fs if fut.cancelled()]
- self.assertTrue(len(cancelled) >= 35, msg=f"{len(cancelled)=}")
+ self.assertGreater(len(cancelled), 20)
# Ensure the other futures were able to finish.
# Use "not fut.cancelled()" instead of "fut.done()" to include futures
# Similar to the number of cancelled futures, we can't guarantee the
# exact number that completed. But, we can guarantee that at least
# one finished.
- self.assertTrue(len(others) > 0, msg=f"{len(others)=}")
+ self.assertGreater(len(others), 0)
- def test_hang_issue39205(self):
+ def test_hang_gh83386(self):
"""shutdown(wait=False) doesn't hang at exit with running futures.
- See https://bugs.python.org/issue39205.
+ See https://github.com/python/cpython/issues/83386.
"""
if self.executor_type == futures.ProcessPoolExecutor:
raise unittest.SkipTest(
- "Hangs due to https://bugs.python.org/issue39205")
+ "Hangs, see https://github.com/python/cpython/issues/83386")
rc, out, err = assert_python_ok('-c', """if True:
from concurrent.futures import {executor_type}
from test.test_concurrent_futures import sleep_and_print
if __name__ == "__main__":
+ if {context!r}: multiprocessing.set_start_method({context!r})
t = {executor_type}(max_workers=3)
t.submit(sleep_and_print, 1.0, "apple")
t.shutdown(wait=False)
- """.format(executor_type=self.executor_type.__name__))
+ """.format(executor_type=self.executor_type.__name__,
+ context=getattr(self, 'ctx', None)))
self.assertFalse(err)
self.assertEqual(out.strip(), b"apple")
class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
- def _prime_executor(self):
- pass
-
def test_threads_terminate(self):
def acquire_lock(lock):
lock.acquire()
class ProcessPoolShutdownTest(ExecutorShutdownTest):
- def _prime_executor(self):
- pass
-
def test_processes_terminate(self):
def acquire_lock(lock):
lock.acquire()
- mp_context = get_context()
+ mp_context = self.get_context()
sem = mp_context.Semaphore(0)
for _ in range(3):
self.executor.submit(acquire_lock, sem)
p.join()
def test_context_manager_shutdown(self):
- with futures.ProcessPoolExecutor(max_workers=5) as e:
+ with futures.ProcessPoolExecutor(
+ max_workers=5, mp_context=self.get_context()) as e:
processes = e._processes
self.assertEqual(list(e.map(abs, range(-5, 5))),
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
p.join()
def test_del_shutdown(self):
- executor = futures.ProcessPoolExecutor(max_workers=5)
+ executor = futures.ProcessPoolExecutor(
+ max_workers=5, mp_context=self.get_context())
res = executor.map(abs, range(-5, 5))
executor_manager_thread = executor._executor_manager_thread
processes = executor._processes
def test_shutdown_no_wait(self):
# Ensure that the executor cleans up the processes when calling
# shutdown with wait=False
- executor = futures.ProcessPoolExecutor(max_workers=5)
+ executor = futures.ProcessPoolExecutor(
+ max_workers=5, mp_context=self.get_context())
res = executor.map(abs, range(-5, 5))
processes = executor._processes
call_queue = executor._call_queue
pool.submit(submit, pool)
for _ in range(50):
- with futures.ProcessPoolExecutor(1, mp_context=get_context('fork')) as workers:
+ with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
workers.submit(tuple)
def test_ressources_gced_in_workers(self):
# Ensure that argument for a job are correctly gc-ed after the job
# is finished
- mgr = get_context(self.ctx).Manager()
+ mgr = self.get_context().Manager()
obj = EventfulGCObj(mgr)
future = self.executor.submit(id, obj)
future.result()
mgr.join()
def test_saturation(self):
- executor = self.executor_type(4)
- mp_context = get_context()
+ executor = self.executor
+ mp_context = self.get_context()
sem = mp_context.Semaphore(0)
job_count = 15 * executor._max_workers
- try:
- for _ in range(job_count):
- executor.submit(sem.acquire)
- self.assertEqual(len(executor._processes), executor._max_workers)
- for _ in range(job_count):
- sem.release()
- finally:
- executor.shutdown()
+ for _ in range(job_count):
+ executor.submit(sem.acquire)
+ self.assertEqual(len(executor._processes), executor._max_workers)
+ for _ in range(job_count):
+ sem.release()
def test_idle_process_reuse_one(self):
- executor = self.executor_type(4)
+ executor = self.executor
+ assert executor._max_workers >= 4
executor.submit(mul, 21, 2).result()
executor.submit(mul, 6, 7).result()
executor.submit(mul, 3, 14).result()
self.assertEqual(len(executor._processes), 1)
- executor.shutdown()
def test_idle_process_reuse_multiple(self):
- executor = self.executor_type(4)
+ executor = self.executor
+ assert executor._max_workers <= 5
executor.submit(mul, 12, 7).result()
executor.submit(mul, 33, 25)
executor.submit(mul, 25, 26).result()
executor.submit(mul, 18, 29)
- self.assertLessEqual(len(executor._processes), 2)
+ executor.submit(mul, 1, 2).result()
+ executor.submit(mul, 0, 9)
+ self.assertLessEqual(len(executor._processes), 3)
executor.shutdown()
def test_max_tasks_per_child(self):
- executor = self.executor_type(1, max_tasks_per_child=3)
+ # not using self.executor as we need to control construction.
+ # arguably this could go in another class w/o that mixin.
+ executor = self.executor_type(
+ 1, mp_context=self.get_context(), max_tasks_per_child=3)
f1 = executor.submit(os.getpid)
original_pid = f1.result()
# The worker pid remains the same as the worker could be reused
executor.shutdown()
def test_max_tasks_early_shutdown(self):
- executor = self.executor_type(3, max_tasks_per_child=1)
+ # not using self.executor as we need to control construction.
+ # arguably this could go in another class w/o that mixin.
+ executor = self.executor_type(
+ 3, mp_context=self.get_context(), max_tasks_per_child=1)
futures = []
for i in range(6):
futures.append(executor.submit(mul, i, i))
self.executor.shutdown(wait=True)
executor = self.executor_type(
- max_workers=2, mp_context=get_context(self.ctx))
+ max_workers=2, mp_context=self.get_context())
res = executor.submit(func, *args)
if ignore_stderr:
# if a worker fails after the shutdown call.
self.executor.shutdown(wait=True)
with self.executor_type(max_workers=2,
- mp_context=get_context(self.ctx)) as executor:
+ mp_context=self.get_context()) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock
f = executor.submit(_crash, delay=.1)
executor.shutdown(wait=True)
# Reported in bpo-39104.
self.executor.shutdown(wait=True)
with self.executor_type(max_workers=2,
- mp_context=get_context(self.ctx)) as executor:
+ mp_context=self.get_context()) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock
# Start the executor and get the executor_manager_thread to collect