# while waiting on new results.
del result_item
+ # attempt to increment idle process count
+ executor = self.executor_reference()
+ if executor is not None:
+ executor._idle_worker_semaphore.release()
+ del executor
+
if self.is_shutting_down():
self.flag_executor_shutting_down()
# Shutdown is a two-step process.
self._shutdown_thread = False
self._shutdown_lock = threading.Lock()
+ self._idle_worker_semaphore = threading.Semaphore(0)
self._broken = False
self._queue_count = 0
self._pending_work_items = {}
def _start_executor_manager_thread(self):
if self._executor_manager_thread is None:
# Start the processes so that their sentinels are known.
- self._adjust_process_count()
self._executor_manager_thread = _ExecutorManagerThread(self)
self._executor_manager_thread.start()
_threads_wakeups[self._executor_manager_thread] = \
self._executor_manager_thread_wakeup
def _adjust_process_count(self):
- for _ in range(len(self._processes), self._max_workers):
+ # if there's an idle process, we don't need to spawn a new one.
+ if self._idle_worker_semaphore.acquire(blocking=False):
+ return
+
+ process_count = len(self._processes)
+ if process_count < self._max_workers:
p = self._mp_context.Process(
target=_process_worker,
args=(self._call_queue,
# Wake up queue management thread
self._executor_manager_thread_wakeup.wakeup()
+ self._adjust_process_count()
self._start_executor_manager_thread()
return f
submit.__doc__ = _base.Executor.submit.__doc__
pass
def test_processes_terminate(self):
- self.executor.submit(mul, 21, 2)
- self.executor.submit(mul, 6, 7)
- self.executor.submit(mul, 3, 14)
- self.assertEqual(len(self.executor._processes), 5)
+ def acquire_lock(lock):
+ lock.acquire()
+
+ mp_context = get_context()
+ sem = mp_context.Semaphore(0)
+ for _ in range(3):
+ self.executor.submit(acquire_lock, sem)
+ self.assertEqual(len(self.executor._processes), 3)
+ for _ in range(3):
+ sem.release()
processes = self.executor._processes
self.executor.shutdown()
mgr.shutdown()
mgr.join()
+ def test_saturation(self):
+ executor = self.executor_type(4)
+ mp_context = get_context()
+ sem = mp_context.Semaphore(0)
+ job_count = 15 * executor._max_workers
+ try:
+ for _ in range(job_count):
+ executor.submit(sem.acquire)
+ self.assertEqual(len(executor._processes), executor._max_workers)
+ for _ in range(job_count):
+ sem.release()
+ finally:
+ executor.shutdown()
+
+ def test_idle_process_reuse_one(self):
+ executor = self.executor_type(4)
+ executor.submit(mul, 21, 2).result()
+ executor.submit(mul, 6, 7).result()
+ executor.submit(mul, 3, 14).result()
+ self.assertEqual(len(executor._processes), 1)
+ executor.shutdown()
+
+ def test_idle_process_reuse_multiple(self):
+ executor = self.executor_type(4)
+ executor.submit(mul, 12, 7).result()
+ executor.submit(mul, 33, 25)
+ executor.submit(mul, 25, 26).result()
+ executor.submit(mul, 18, 29)
+ self.assertLessEqual(len(executor._processes), 2)
+ executor.shutdown()
create_executor_tests(ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin,