From: mokashang Date: Tue, 19 May 2026 16:22:52 +0000 (-0700) Subject: test/process: run test_multi_process in a clean subprocess X-Git-Url: http://git.ipfire.org/gitweb/index.cgi?a=commitdiff_plain;h=d6e55b5a65958abde3f63afae5f43d384a6c8efe;p=thirdparty%2Ftornado.git test/process: run test_multi_process in a clean subprocess When `python3 -m tornado.test` is run in an environment where some test earlier in the suite has left a thread running, the `os.fork()` inside `fork_processes()` triggers `DeprecationWarning: This process (pid=...) is multi-threaded, use of fork() may lead to deadlocks in the child` on Python 3.12+. The test suite turns DeprecationWarnings from tornado into errors, so `test_multi_process` then fails. This has been observed in the Fedora rpm build of tornado on Python 3.15.0b1 (#3623), where it does not reproduce under tox. Rather than chase down every thread leak across the suite, isolate `test_multi_process` so it always starts from a single-threaded state. The actual fork-and-serve logic is moved into a script string that is executed via `python -c` in a fresh interpreter, following the pattern established in autoreload_test for tests that need a clean process. The outer test method just launches the subprocess and asserts a clean exit. PYTHONPATH is propagated so the source tree under test is importable. The script keeps the existing `signal.alarm(5)` timers and `subprocess.run(timeout=30)` is added as a backstop in case the script hangs in a way the alarms don't catch. Tested locally on macOS / Python 3.13 with the full suite plus a deliberately leaked thread before the test to confirm the new isolation holds. The `tearDown` / `get_app` helpers and the `asyncio`, `logging`, and HTTP-related top-level imports are no longer needed and are removed. Fixes #3623 --- diff --git a/tornado/test/process_test.py b/tornado/test/process_test.py index 693d2fd7..775f99a3 100644 --- a/tornado/test/process_test.py +++ b/tornado/test/process_test.py @@ -1,5 +1,3 @@ -import asyncio -import logging import os import signal import subprocess @@ -7,129 +5,163 @@ import sys import time import unittest +from tornado.process import Subprocess +from tornado.test.util import skipIfNonUnix +from tornado.testing import AsyncTestCase, gen_test + +# Body of the multi-process test, factored out so it can be launched in a +# clean Python subprocess. fork_processes() calls os.fork(), which raises +# DeprecationWarning on Python 3.12+ if the process has more than one +# thread. Running here in a fresh interpreter avoids picking up threads +# left running by earlier tests in the suite (e.g. the default asyncio +# DNS resolver's thread pool), which would otherwise cause the test +# suite's warnings-as-errors configuration to fail this test. +_MULTI_PROCESS_TEST_SCRIPT = """\ +import asyncio +import logging +import os +import signal +import sys + from tornado.httpclient import HTTPClient, HTTPError from tornado.httpserver import HTTPServer from tornado.log import gen_log -from tornado.process import Subprocess, fork_processes, task_id +from tornado.process import fork_processes, task_id from tornado.simple_httpclient import SimpleAsyncHTTPClient -from tornado.test.util import skipIfNonUnix -from tornado.testing import AsyncTestCase, ExpectLog, bind_unused_port, gen_test +from tornado.testing import ExpectLog, bind_unused_port from tornado.web import Application, RequestHandler +class ProcessHandler(RequestHandler): + def get(self): + if self.get_argument("exit", None): + # must use os._exit instead of sys.exit so unittest's + # exception handler doesn't catch it + os._exit(int(self.get_argument("exit"))) + if self.get_argument("signal", None): + os.kill(os.getpid(), int(self.get_argument("signal"))) + self.write(str(os.getpid())) + + +def main(): + # This test doesn't work on twisted because we use the global + # reactor and don't restore it to a sane state after the fork + # (asyncio has the same issue, but we have a special case in + # place for it). + with ExpectLog( + gen_log, "(Starting .* processes|child .* exited|uncaught exception)" + ): + sock, port = bind_unused_port() + + def get_url(path): + return "http://127.0.0.1:%d%s" % (port, path) + + # ensure that none of these processes live too long + signal.alarm(5) # master process + try: + id = fork_processes(3, max_restarts=3) + assert id is not None + signal.alarm(5) # child processes + except SystemExit as e: + # if we exit cleanly from fork_processes, all the child processes + # finished with status 0 + assert e.code == 0, "fork_processes exited with %r" % (e.code,) + assert task_id() is None + sock.close() + return + try: + if id in (0, 1): + assert id == task_id() + + async def f(): + server = HTTPServer(Application([("/", ProcessHandler)])) + server.add_sockets([sock]) + await asyncio.Event().wait() + + asyncio.run(f()) + elif id == 2: + assert id == task_id() + sock.close() + # Always use SimpleAsyncHTTPClient here; the curl + # version appears to get confused sometimes if the + # connection gets closed before it's had a chance to + # switch from writing mode to reading mode. + client = HTTPClient(SimpleAsyncHTTPClient) + + def fetch(url, fail_ok=False): + try: + return client.fetch(get_url(url)) + except HTTPError as e: + if not (fail_ok and e.code == 599): + raise + + # Make two processes exit abnormally + fetch("/?exit=2", fail_ok=True) + fetch("/?exit=3", fail_ok=True) + + # They've been restarted, so a new fetch will work + int(fetch("/").body) + + # Now the same with signals + # Disabled because on the mac a process dying with a signal + # can trigger an "Application exited abnormally; send error + # report to Apple?" prompt. + # fetch("/?signal=%d" % signal.SIGTERM, fail_ok=True) + # fetch("/?signal=%d" % signal.SIGABRT, fail_ok=True) + # int(fetch("/").body) + + # Now kill them normally so they won't be restarted + fetch("/?exit=0", fail_ok=True) + # One process left; watch it's pid change + pid = int(fetch("/").body) + fetch("/?exit=4", fail_ok=True) + pid2 = int(fetch("/").body) + assert pid != pid2 + + # Kill the last one so we shut down cleanly + fetch("/?exit=0", fail_ok=True) + + os._exit(0) + except Exception: + logging.error("exception in child process %d", id, exc_info=True) + raise + + +if __name__ == "__main__": + main() +""" + + # Not using AsyncHTTPTestCase because we need control over the IOLoop. @skipIfNonUnix class ProcessTest(unittest.TestCase): - def get_app(self): - class ProcessHandler(RequestHandler): - def get(self): - if self.get_argument("exit", None): - # must use os._exit instead of sys.exit so unittest's - # exception handler doesn't catch it - os._exit(int(self.get_argument("exit"))) - if self.get_argument("signal", None): - os.kill(os.getpid(), int(self.get_argument("signal"))) - self.write(str(os.getpid())) - - return Application([("/", ProcessHandler)]) - - def tearDown(self): - if task_id() is not None: - # We're in a child process, and probably got to this point - # via an uncaught exception. If we return now, both - # processes will continue with the rest of the test suite. - # Exit now so the parent process will restart the child - # (since we don't have a clean way to signal failure to - # the parent that won't restart) - logging.error("aborting child process from tearDown") - logging.shutdown() - os._exit(1) - # In the surviving process, clear the alarm we set earlier - signal.alarm(0) - super().tearDown() - def test_multi_process(self): - # This test doesn't work on twisted because we use the global - # reactor and don't restore it to a sane state after the fork - # (asyncio has the same issue, but we have a special case in - # place for it). - with ExpectLog( - gen_log, "(Starting .* processes|child .* exited|uncaught exception)" - ): - sock, port = bind_unused_port() - - def get_url(path): - return "http://127.0.0.1:%d%s" % (port, path) - - # ensure that none of these processes live too long - signal.alarm(5) # master process - try: - id = fork_processes(3, max_restarts=3) - self.assertIsNotNone(id) - signal.alarm(5) # child processes - except SystemExit as e: - # if we exit cleanly from fork_processes, all the child processes - # finished with status 0 - self.assertEqual(e.code, 0) - self.assertIsNone(task_id()) - sock.close() - return - try: - if id in (0, 1): - self.assertEqual(id, task_id()) - - async def f(): - server = HTTPServer(self.get_app()) - server.add_sockets([sock]) - await asyncio.Event().wait() - - asyncio.run(f()) - elif id == 2: - self.assertEqual(id, task_id()) - sock.close() - # Always use SimpleAsyncHTTPClient here; the curl - # version appears to get confused sometimes if the - # connection gets closed before it's had a chance to - # switch from writing mode to reading mode. - client = HTTPClient(SimpleAsyncHTTPClient) - - def fetch(url, fail_ok=False): - try: - return client.fetch(get_url(url)) - except HTTPError as e: - if not (fail_ok and e.code == 599): - raise - - # Make two processes exit abnormally - fetch("/?exit=2", fail_ok=True) - fetch("/?exit=3", fail_ok=True) - - # They've been restarted, so a new fetch will work - int(fetch("/").body) - - # Now the same with signals - # Disabled because on the mac a process dying with a signal - # can trigger an "Application exited abnormally; send error - # report to Apple?" prompt. - # fetch("/?signal=%d" % signal.SIGTERM, fail_ok=True) - # fetch("/?signal=%d" % signal.SIGABRT, fail_ok=True) - # int(fetch("/").body) - - # Now kill them normally so they won't be restarted - fetch("/?exit=0", fail_ok=True) - # One process left; watch it's pid change - pid = int(fetch("/").body) - fetch("/?exit=4", fail_ok=True) - pid2 = int(fetch("/").body) - self.assertNotEqual(pid, pid2) - - # Kill the last one so we shut down cleanly - fetch("/?exit=0", fail_ok=True) - - os._exit(0) - except Exception: - logging.error("exception in child process %d", id, exc_info=True) - raise + # Run the test body in a fresh interpreter so fork_processes() + # starts from a single-threaded state. See the comment on + # _MULTI_PROCESS_TEST_SCRIPT. + parts = [os.getcwd()] + if "PYTHONPATH" in os.environ: + parts += os.environ["PYTHONPATH"].split(os.pathsep) + env = dict(os.environ, PYTHONPATH=os.pathsep.join(parts)) + + result = subprocess.run( + [sys.executable, "-c", _MULTI_PROCESS_TEST_SCRIPT], + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=30, + ) + if result.returncode != 0: + self.fail( + "test_multi_process subprocess exited with status %d\n" + "----- stdout -----\n%s" + "----- stderr -----\n%s" + % ( + result.returncode, + result.stdout.decode("utf-8", errors="replace"), + result.stderr.decode("utf-8", errors="replace"), + ) + ) @skipIfNonUnix