self._worker_handler = threading.Thread(
target=Pool._handle_workers,
- args=(self._cache, self._processes, self._pool, self.Process,
- self._inqueue, self._outqueue, self._initializer,
- self._initargs, self._maxtasksperchild, self._taskqueue)
+ args=(self, )
)
self._worker_handler.daemon = True
self._worker_handler._state = RUN
exitpriority=15
)
- @staticmethod
- def _join_exited_workers(pool):
+ def _join_exited_workers(self):
"""Cleanup after any worker processes which have exited due to reaching
their specified lifetime. Returns True if any workers were cleaned up.
"""
cleaned = False
- for i in reversed(range(len(pool))):
- worker = pool[i]
+ for i in reversed(range(len(self._pool))):
+ worker = self._pool[i]
if worker.exitcode is not None:
# worker exited
debug('cleaning up worker %d' % i)
worker.join()
cleaned = True
- del pool[i]
+ del self._pool[i]
return cleaned
def _repopulate_pool(self):
- return self._repopulate_pool_static(self._processes, self._pool,
- self.Process, self._inqueue,
- self._outqueue, self._initializer,
- self._initargs,
- self._maxtasksperchild)
-
- @staticmethod
- def _repopulate_pool_static(processes, pool, Process, inqueue, outqueue,
- initializer, initargs, maxtasksperchild):
"""Bring the number of pool processes up to the specified number,
for use after reaping workers which have exited.
"""
- for i in range(processes - len(pool)):
- w = Process(target=worker,
- args=(inqueue, outqueue,
- initializer,
- initargs, maxtasksperchild)
- )
- pool.append(w)
+ for i in range(self._processes - len(self._pool)):
+ w = self.Process(target=worker,
+ args=(self._inqueue, self._outqueue,
+ self._initializer,
+ self._initargs, self._maxtasksperchild)
+ )
+ self._pool.append(w)
w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True
w.start()
debug('added worker')
- @staticmethod
- def _maintain_pool(processes, pool, Process, inqueue, outqueue,
- initializer, initargs, maxtasksperchild):
+ def _maintain_pool(self):
"""Clean up any exited workers and start replacements for them.
"""
- if Pool._join_exited_workers(pool):
- Pool._repopulate_pool_static(processes, pool, Process, inqueue,
- outqueue, initializer, initargs,
- maxtasksperchild)
+ if self._join_exited_workers():
+ self._repopulate_pool()
def _setup_queues(self):
from .queues import SimpleQueue
return result
@staticmethod
- def _handle_workers(cache, processes, pool, Process, inqueue, outqueue,
- initializer, initargs, maxtasksperchild, taskqueue):
+ def _handle_workers(pool):
thread = threading.current_thread()
# Keep maintaining workers until the cache gets drained, unless the pool
# is terminated.
- while thread._state == RUN or (cache and thread._state != TERMINATE):
- Pool._maintain_pool(processes, pool, Process, inqueue, outqueue,
- initializer, initargs, maxtasksperchild)
+ while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
+ pool._maintain_pool()
time.sleep(0.1)
# send sentinel to stop workers
- taskqueue.put(None)
+ pool._taskqueue.put(None)
debug('worker handler exiting')
@staticmethod