]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
[2.7] bpo-34172: multiprocessing.Pool leaks resources after being deleted (GH-9686)
authortzickel <tzickel@users.noreply.github.com>
Wed, 3 Oct 2018 11:50:04 +0000 (14:50 +0300)
committerAntoine Pitrou <pitrou@free.fr>
Wed, 3 Oct 2018 11:50:04 +0000 (13:50 +0200)
Fix a reference issue inside multiprocessing.Pool that caused the pool to remain alive if it was deleted without being closed or terminated explicitly.

Lib/multiprocessing/pool.py
Lib/test/test_multiprocessing.py
Misc/NEWS.d/next/Library/2018-07-26-10-31-52.bpo-34172.8ovLNi.rst [new file with mode: 0644]

index a47cd0f58a05af04affbc7b37fe42337cd101d64..489c7d67cf3496aec50d83be0b0841e177da12d6 100644 (file)
@@ -162,7 +162,9 @@ class Pool(object):
 
         self._worker_handler = threading.Thread(
             target=Pool._handle_workers,
-            args=(self, )
+            args=(self._cache, self._processes, self._pool, self.Process,
+                  self._inqueue, self._outqueue, self._initializer,
+                  self._initargs, self._maxtasksperchild, self._taskqueue)
             )
         self._worker_handler.daemon = True
         self._worker_handler._state = RUN
@@ -194,42 +196,56 @@ class Pool(object):
             exitpriority=15
             )
 
-    def _join_exited_workers(self):
+    @staticmethod
+    def _join_exited_workers(pool):
         """Cleanup after any worker processes which have exited due to reaching
         their specified lifetime.  Returns True if any workers were cleaned up.
         """
         cleaned = False
-        for i in reversed(range(len(self._pool))):
-            worker = self._pool[i]
+        for i in reversed(range(len(pool))):
+            worker = pool[i]
             if worker.exitcode is not None:
                 # worker exited
                 debug('cleaning up worker %d' % i)
                 worker.join()
                 cleaned = True
-                del self._pool[i]
+                del pool[i]
         return cleaned
 
     def _repopulate_pool(self):
+        return self._repopulate_pool_static(self._processes, self._pool,
+                                            self.Process, self._inqueue,
+                                            self._outqueue, self._initializer,
+                                            self._initargs,
+                                            self._maxtasksperchild)
+
+    @staticmethod
+    def _repopulate_pool_static(processes, pool, Process, inqueue, outqueue,
+                                initializer, initargs, maxtasksperchild):
         """Bring the number of pool processes up to the specified number,
         for use after reaping workers which have exited.
         """
-        for i in range(self._processes - len(self._pool)):
-            w = self.Process(target=worker,
-                             args=(self._inqueue, self._outqueue,
-                                   self._initializer,
-                                   self._initargs, self._maxtasksperchild)
-                            )
-            self._pool.append(w)
+        for i in range(processes - len(pool)):
+            w = Process(target=worker,
+                        args=(inqueue, outqueue,
+                              initializer,
+                              initargs, maxtasksperchild)
+                       )
+            pool.append(w)
             w.name = w.name.replace('Process', 'PoolWorker')
             w.daemon = True
             w.start()
             debug('added worker')
 
-    def _maintain_pool(self):
+    @staticmethod
+    def _maintain_pool(processes, pool, Process, inqueue, outqueue,
+                       initializer, initargs, maxtasksperchild):
         """Clean up any exited workers and start replacements for them.
         """
-        if self._join_exited_workers():
-            self._repopulate_pool()
+        if Pool._join_exited_workers(pool):
+            Pool._repopulate_pool_static(processes, pool, Process, inqueue,
+                                         outqueue, initializer, initargs,
+                                         maxtasksperchild)
 
     def _setup_queues(self):
         from .queues import SimpleQueue
@@ -319,16 +335,18 @@ class Pool(object):
         return result
 
     @staticmethod
-    def _handle_workers(pool):
+    def _handle_workers(cache, processes, pool, Process, inqueue, outqueue,
+                        initializer, initargs, maxtasksperchild, taskqueue):
         thread = threading.current_thread()
 
         # Keep maintaining workers until the cache gets drained, unless the pool
         # is terminated.
-        while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
-            pool._maintain_pool()
+        while thread._state == RUN or (cache and thread._state != TERMINATE):
+            Pool._maintain_pool(processes, pool, Process, inqueue, outqueue,
+                                initializer, initargs, maxtasksperchild)
             time.sleep(0.1)
         # send sentinel to stop workers
-        pool._taskqueue.put(None)
+        taskqueue.put(None)
         debug('worker handler exiting')
 
     @staticmethod
index ff299feed8948ee6328e73df1445fab66af4093e..d3192181e5adedda2968f0d97d6bf7006d5b0656 100644 (file)
@@ -1359,6 +1359,13 @@ class _TestPool(BaseTestCase):
         # they were released too.
         self.assertEqual(CountedObject.n_instances, 0)
 
+    def test_del_pool(self):
+        p = self.Pool(1)
+        wr = weakref.ref(p)
+        del p
+        gc.collect()
+        self.assertIsNone(wr())
+
 
 def unpickleable_result():
     return lambda: 42
diff --git a/Misc/NEWS.d/next/Library/2018-07-26-10-31-52.bpo-34172.8ovLNi.rst b/Misc/NEWS.d/next/Library/2018-07-26-10-31-52.bpo-34172.8ovLNi.rst
new file mode 100644 (file)
index 0000000..d1c5a77
--- /dev/null
@@ -0,0 +1 @@
+Fix a reference issue inside multiprocessing.Pool that caused the pool to remain alive if it was deleted without being closed or terminated explicitly.