]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
test/process: run test_multi_process in a clean subprocess 3624/head
authormokashang <shangmengjiajiajia@gmail.com>
Tue, 19 May 2026 16:22:52 +0000 (09:22 -0700)
committermokashang <shangmengjiajiajia@gmail.com>
Tue, 19 May 2026 16:22:52 +0000 (09:22 -0700)
When `python3 -m tornado.test` is run in an environment where some test
earlier in the suite has left a thread running, the `os.fork()` inside
`fork_processes()` triggers `DeprecationWarning: This process (pid=...)
is multi-threaded, use of fork() may lead to deadlocks in the child` on
Python 3.12+. The test suite turns DeprecationWarnings from tornado
into errors, so `test_multi_process` then fails. This has been observed
in the Fedora rpm build of tornado on Python 3.15.0b1 (#3623), where it
does not reproduce under tox.

Rather than chase down every thread leak across the suite, isolate
`test_multi_process` so it always starts from a single-threaded state.
The actual fork-and-serve logic is moved into a script string that is
executed via `python -c` in a fresh interpreter, following the pattern
established in autoreload_test for tests that need a clean process. The
outer test method just launches the subprocess and asserts a clean
exit. PYTHONPATH is propagated so the source tree under test is
importable. The script keeps the existing `signal.alarm(5)` timers and
`subprocess.run(timeout=30)` is added as a backstop in case the script
hangs in a way the alarms don't catch.

Tested locally on macOS / Python 3.13 with the full suite plus a
deliberately leaked thread before the test to confirm the new isolation
holds. The `tearDown` / `get_app` helpers and the `asyncio`, `logging`,
and HTTP-related top-level imports are no longer needed and are
removed.

Fixes #3623

tornado/test/process_test.py

index 693d2fd76940a2c130851484aeae093a03c4f7a0..775f99a3f5ff6632187e2c5b3627540696515910 100644 (file)
@@ -1,5 +1,3 @@
-import asyncio
-import logging
 import os
 import signal
 import subprocess
@@ -7,129 +5,163 @@ import sys
 import time
 import unittest
 
+from tornado.process import Subprocess
+from tornado.test.util import skipIfNonUnix
+from tornado.testing import AsyncTestCase, gen_test
+
+# Body of the multi-process test, factored out so it can be launched in a
+# clean Python subprocess. fork_processes() calls os.fork(), which raises
+# DeprecationWarning on Python 3.12+ if the process has more than one
+# thread. Running here in a fresh interpreter avoids picking up threads
+# left running by earlier tests in the suite (e.g. the default asyncio
+# DNS resolver's thread pool), which would otherwise cause the test
+# suite's warnings-as-errors configuration to fail this test.
+_MULTI_PROCESS_TEST_SCRIPT = """\
+import asyncio
+import logging
+import os
+import signal
+import sys
+
 from tornado.httpclient import HTTPClient, HTTPError
 from tornado.httpserver import HTTPServer
 from tornado.log import gen_log
-from tornado.process import Subprocess, fork_processes, task_id
+from tornado.process import fork_processes, task_id
 from tornado.simple_httpclient import SimpleAsyncHTTPClient
-from tornado.test.util import skipIfNonUnix
-from tornado.testing import AsyncTestCase, ExpectLog, bind_unused_port, gen_test
+from tornado.testing import ExpectLog, bind_unused_port
 from tornado.web import Application, RequestHandler
 
 
+class ProcessHandler(RequestHandler):
+    def get(self):
+        if self.get_argument("exit", None):
+            # must use os._exit instead of sys.exit so unittest's
+            # exception handler doesn't catch it
+            os._exit(int(self.get_argument("exit")))
+        if self.get_argument("signal", None):
+            os.kill(os.getpid(), int(self.get_argument("signal")))
+        self.write(str(os.getpid()))
+
+
+def main():
+    # This test doesn't work on twisted because we use the global
+    # reactor and don't restore it to a sane state after the fork
+    # (asyncio has the same issue, but we have a special case in
+    # place for it).
+    with ExpectLog(
+        gen_log, "(Starting .* processes|child .* exited|uncaught exception)"
+    ):
+        sock, port = bind_unused_port()
+
+        def get_url(path):
+            return "http://127.0.0.1:%d%s" % (port, path)
+
+        # ensure that none of these processes live too long
+        signal.alarm(5)  # master process
+        try:
+            id = fork_processes(3, max_restarts=3)
+            assert id is not None
+            signal.alarm(5)  # child processes
+        except SystemExit as e:
+            # if we exit cleanly from fork_processes, all the child processes
+            # finished with status 0
+            assert e.code == 0, "fork_processes exited with %r" % (e.code,)
+            assert task_id() is None
+            sock.close()
+            return
+        try:
+            if id in (0, 1):
+                assert id == task_id()
+
+                async def f():
+                    server = HTTPServer(Application([("/", ProcessHandler)]))
+                    server.add_sockets([sock])
+                    await asyncio.Event().wait()
+
+                asyncio.run(f())
+            elif id == 2:
+                assert id == task_id()
+                sock.close()
+                # Always use SimpleAsyncHTTPClient here; the curl
+                # version appears to get confused sometimes if the
+                # connection gets closed before it's had a chance to
+                # switch from writing mode to reading mode.
+                client = HTTPClient(SimpleAsyncHTTPClient)
+
+                def fetch(url, fail_ok=False):
+                    try:
+                        return client.fetch(get_url(url))
+                    except HTTPError as e:
+                        if not (fail_ok and e.code == 599):
+                            raise
+
+                # Make two processes exit abnormally
+                fetch("/?exit=2", fail_ok=True)
+                fetch("/?exit=3", fail_ok=True)
+
+                # They've been restarted, so a new fetch will work
+                int(fetch("/").body)
+
+                # Now the same with signals
+                # Disabled because on the mac a process dying with a signal
+                # can trigger an "Application exited abnormally; send error
+                # report to Apple?" prompt.
+                # fetch("/?signal=%d" % signal.SIGTERM, fail_ok=True)
+                # fetch("/?signal=%d" % signal.SIGABRT, fail_ok=True)
+                # int(fetch("/").body)
+
+                # Now kill them normally so they won't be restarted
+                fetch("/?exit=0", fail_ok=True)
+                # One process left; watch it's pid change
+                pid = int(fetch("/").body)
+                fetch("/?exit=4", fail_ok=True)
+                pid2 = int(fetch("/").body)
+                assert pid != pid2
+
+                # Kill the last one so we shut down cleanly
+                fetch("/?exit=0", fail_ok=True)
+
+                os._exit(0)
+        except Exception:
+            logging.error("exception in child process %d", id, exc_info=True)
+            raise
+
+
+if __name__ == "__main__":
+    main()
+"""
+
+
 # Not using AsyncHTTPTestCase because we need control over the IOLoop.
 @skipIfNonUnix
 class ProcessTest(unittest.TestCase):
-    def get_app(self):
-        class ProcessHandler(RequestHandler):
-            def get(self):
-                if self.get_argument("exit", None):
-                    # must use os._exit instead of sys.exit so unittest's
-                    # exception handler doesn't catch it
-                    os._exit(int(self.get_argument("exit")))
-                if self.get_argument("signal", None):
-                    os.kill(os.getpid(), int(self.get_argument("signal")))
-                self.write(str(os.getpid()))
-
-        return Application([("/", ProcessHandler)])
-
-    def tearDown(self):
-        if task_id() is not None:
-            # We're in a child process, and probably got to this point
-            # via an uncaught exception.  If we return now, both
-            # processes will continue with the rest of the test suite.
-            # Exit now so the parent process will restart the child
-            # (since we don't have a clean way to signal failure to
-            # the parent that won't restart)
-            logging.error("aborting child process from tearDown")
-            logging.shutdown()
-            os._exit(1)
-        # In the surviving process, clear the alarm we set earlier
-        signal.alarm(0)
-        super().tearDown()
-
     def test_multi_process(self):
-        # This test doesn't work on twisted because we use the global
-        # reactor and don't restore it to a sane state after the fork
-        # (asyncio has the same issue, but we have a special case in
-        # place for it).
-        with ExpectLog(
-            gen_log, "(Starting .* processes|child .* exited|uncaught exception)"
-        ):
-            sock, port = bind_unused_port()
-
-            def get_url(path):
-                return "http://127.0.0.1:%d%s" % (port, path)
-
-            # ensure that none of these processes live too long
-            signal.alarm(5)  # master process
-            try:
-                id = fork_processes(3, max_restarts=3)
-                self.assertIsNotNone(id)
-                signal.alarm(5)  # child processes
-            except SystemExit as e:
-                # if we exit cleanly from fork_processes, all the child processes
-                # finished with status 0
-                self.assertEqual(e.code, 0)
-                self.assertIsNone(task_id())
-                sock.close()
-                return
-            try:
-                if id in (0, 1):
-                    self.assertEqual(id, task_id())
-
-                    async def f():
-                        server = HTTPServer(self.get_app())
-                        server.add_sockets([sock])
-                        await asyncio.Event().wait()
-
-                    asyncio.run(f())
-                elif id == 2:
-                    self.assertEqual(id, task_id())
-                    sock.close()
-                    # Always use SimpleAsyncHTTPClient here; the curl
-                    # version appears to get confused sometimes if the
-                    # connection gets closed before it's had a chance to
-                    # switch from writing mode to reading mode.
-                    client = HTTPClient(SimpleAsyncHTTPClient)
-
-                    def fetch(url, fail_ok=False):
-                        try:
-                            return client.fetch(get_url(url))
-                        except HTTPError as e:
-                            if not (fail_ok and e.code == 599):
-                                raise
-
-                    # Make two processes exit abnormally
-                    fetch("/?exit=2", fail_ok=True)
-                    fetch("/?exit=3", fail_ok=True)
-
-                    # They've been restarted, so a new fetch will work
-                    int(fetch("/").body)
-
-                    # Now the same with signals
-                    # Disabled because on the mac a process dying with a signal
-                    # can trigger an "Application exited abnormally; send error
-                    # report to Apple?" prompt.
-                    # fetch("/?signal=%d" % signal.SIGTERM, fail_ok=True)
-                    # fetch("/?signal=%d" % signal.SIGABRT, fail_ok=True)
-                    # int(fetch("/").body)
-
-                    # Now kill them normally so they won't be restarted
-                    fetch("/?exit=0", fail_ok=True)
-                    # One process left; watch it's pid change
-                    pid = int(fetch("/").body)
-                    fetch("/?exit=4", fail_ok=True)
-                    pid2 = int(fetch("/").body)
-                    self.assertNotEqual(pid, pid2)
-
-                    # Kill the last one so we shut down cleanly
-                    fetch("/?exit=0", fail_ok=True)
-
-                    os._exit(0)
-            except Exception:
-                logging.error("exception in child process %d", id, exc_info=True)
-                raise
+        # Run the test body in a fresh interpreter so fork_processes()
+        # starts from a single-threaded state. See the comment on
+        # _MULTI_PROCESS_TEST_SCRIPT.
+        parts = [os.getcwd()]
+        if "PYTHONPATH" in os.environ:
+            parts += os.environ["PYTHONPATH"].split(os.pathsep)
+        env = dict(os.environ, PYTHONPATH=os.pathsep.join(parts))
+
+        result = subprocess.run(
+            [sys.executable, "-c", _MULTI_PROCESS_TEST_SCRIPT],
+            env=env,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+            timeout=30,
+        )
+        if result.returncode != 0:
+            self.fail(
+                "test_multi_process subprocess exited with status %d\n"
+                "----- stdout -----\n%s"
+                "----- stderr -----\n%s"
+                % (
+                    result.returncode,
+                    result.stdout.decode("utf-8", errors="replace"),
+                    result.stderr.decode("utf-8", errors="replace"),
+                )
+            )
 
 
 @skipIfNonUnix