Multiprocessing and concurrent.futures tests now stop the resource
tracker process when tests complete.
Add ResourceTracker._stop() method to
multiprocessing.resource_tracker.
Add _cleanup_tests() helper function to multiprocessing.util: share
code between multiprocessing and concurrent.futures tests.
(cherry picked from commit
9707e8e22d80ca97bf7a9812816701cecde6d226)
self._fd = None
self._pid = None
+ def _stop(self):
+ with self._lock:
+ if self._fd is None:
+ # not running
+ return
+
+ # closing the "alive" file descriptor stops main()
+ os.close(self._fd)
+ self._fd = None
+
+ os.waitpid(self._pid, 0)
+ self._pid = None
+
def getfd(self):
self.ensure_running()
return self._fd
"""Close each file descriptor given as an argument"""
for fd in fds:
os.close(fd)
+
+
+def _cleanup_tests():
+ """Cleanup multiprocessing resources when multiprocessing tests
+ completed."""
+
+ from test import support
+
+ # cleanup multiprocessing
+ process._cleanup()
+
+ # Stop the ForkServer process if it's running
+ from multiprocessing import forkserver
+ forkserver._forkserver._stop()
+
+ # Stop the ResourceTracker process if it's running
+ from multiprocessing import resource_tracker
+ resource_tracker._resource_tracker._stop()
+
+ # bpo-37421: Explicitly call _run_finalizers() to remove immediately
+ # temporary directories created by multiprocessing.util.get_temp_dir().
+ _run_finalizers()
+ support.gc_collect()
+
+ support.reap_children()
if need_sleep:
time.sleep(0.5)
- multiprocessing.process._cleanup()
-
- # Stop the ForkServer process if it's running
- from multiprocessing import forkserver
- forkserver._forkserver._stop()
-
- # bpo-37421: Explicitly call _run_finalizers() to remove immediately
- # temporary directories created by multiprocessing.util.get_temp_dir().
- multiprocessing.util._run_finalizers()
- test.support.gc_collect()
+ multiprocessing.util._cleanup_tests()
remote_globs['setUpModule'] = setUpModule
remote_globs['tearDownModule'] = tearDownModule
def tearDownModule():
test.support.threading_cleanup(*_threads_key)
- test.support.reap_children()
-
- # cleanup multiprocessing
- multiprocessing.process._cleanup()
- # Stop the ForkServer process if it's running
- from multiprocessing import forkserver
- forkserver._forkserver._stop()
- # bpo-37421: Explicitly call _run_finalizers() to remove immediately
- # temporary directories created by multiprocessing.util.get_temp_dir().
- multiprocessing.util._run_finalizers()
- test.support.gc_collect()
+ multiprocessing.util._cleanup_tests()
if __name__ == "__main__":
--- /dev/null
+Multiprocessing and concurrent.futures tests now stop the resource tracker
+process when tests complete.