-import asyncio
-import logging
import os
import signal
import subprocess
import time
import unittest
+from tornado.process import Subprocess
+from tornado.test.util import skipIfNonUnix
+from tornado.testing import AsyncTestCase, gen_test
+
+# Body of the multi-process test, factored out so it can be launched in a
+# clean Python subprocess. fork_processes() calls os.fork(), which raises
+# DeprecationWarning on Python 3.12+ if the process has more than one
+# thread. Running here in a fresh interpreter avoids picking up threads
+# left running by earlier tests in the suite (e.g. the default asyncio
+# DNS resolver's thread pool), which would otherwise cause the test
+# suite's warnings-as-errors configuration to fail this test.
+_MULTI_PROCESS_TEST_SCRIPT = """\
+import asyncio
+import logging
+import os
+import signal
+import sys
+
from tornado.httpclient import HTTPClient, HTTPError
from tornado.httpserver import HTTPServer
from tornado.log import gen_log
-from tornado.process import Subprocess, fork_processes, task_id
+from tornado.process import fork_processes, task_id
from tornado.simple_httpclient import SimpleAsyncHTTPClient
-from tornado.test.util import skipIfNonUnix
-from tornado.testing import AsyncTestCase, ExpectLog, bind_unused_port, gen_test
+from tornado.testing import ExpectLog, bind_unused_port
from tornado.web import Application, RequestHandler
+class ProcessHandler(RequestHandler):
+ def get(self):
+ if self.get_argument("exit", None):
+ # must use os._exit instead of sys.exit so unittest's
+ # exception handler doesn't catch it
+ os._exit(int(self.get_argument("exit")))
+ if self.get_argument("signal", None):
+ os.kill(os.getpid(), int(self.get_argument("signal")))
+ self.write(str(os.getpid()))
+
+
+def main():
+ # This test doesn't work on twisted because we use the global
+ # reactor and don't restore it to a sane state after the fork
+ # (asyncio has the same issue, but we have a special case in
+ # place for it).
+ with ExpectLog(
+ gen_log, "(Starting .* processes|child .* exited|uncaught exception)"
+ ):
+ sock, port = bind_unused_port()
+
+ def get_url(path):
+ return "http://127.0.0.1:%d%s" % (port, path)
+
+ # ensure that none of these processes live too long
+ signal.alarm(5) # master process
+ try:
+ id = fork_processes(3, max_restarts=3)
+ assert id is not None
+ signal.alarm(5) # child processes
+ except SystemExit as e:
+ # if we exit cleanly from fork_processes, all the child processes
+ # finished with status 0
+ assert e.code == 0, "fork_processes exited with %r" % (e.code,)
+ assert task_id() is None
+ sock.close()
+ return
+ try:
+ if id in (0, 1):
+ assert id == task_id()
+
+ async def f():
+ server = HTTPServer(Application([("/", ProcessHandler)]))
+ server.add_sockets([sock])
+ await asyncio.Event().wait()
+
+ asyncio.run(f())
+ elif id == 2:
+ assert id == task_id()
+ sock.close()
+ # Always use SimpleAsyncHTTPClient here; the curl
+ # version appears to get confused sometimes if the
+ # connection gets closed before it's had a chance to
+ # switch from writing mode to reading mode.
+ client = HTTPClient(SimpleAsyncHTTPClient)
+
+ def fetch(url, fail_ok=False):
+ try:
+ return client.fetch(get_url(url))
+ except HTTPError as e:
+ if not (fail_ok and e.code == 599):
+ raise
+
+ # Make two processes exit abnormally
+ fetch("/?exit=2", fail_ok=True)
+ fetch("/?exit=3", fail_ok=True)
+
+ # They've been restarted, so a new fetch will work
+ int(fetch("/").body)
+
+ # Now the same with signals
+ # Disabled because on the mac a process dying with a signal
+ # can trigger an "Application exited abnormally; send error
+ # report to Apple?" prompt.
+ # fetch("/?signal=%d" % signal.SIGTERM, fail_ok=True)
+ # fetch("/?signal=%d" % signal.SIGABRT, fail_ok=True)
+ # int(fetch("/").body)
+
+ # Now kill them normally so they won't be restarted
+ fetch("/?exit=0", fail_ok=True)
+ # One process left; watch it's pid change
+ pid = int(fetch("/").body)
+ fetch("/?exit=4", fail_ok=True)
+ pid2 = int(fetch("/").body)
+ assert pid != pid2
+
+ # Kill the last one so we shut down cleanly
+ fetch("/?exit=0", fail_ok=True)
+
+ os._exit(0)
+ except Exception:
+ logging.error("exception in child process %d", id, exc_info=True)
+ raise
+
+
+if __name__ == "__main__":
+ main()
+"""
+
+
# Not using AsyncHTTPTestCase because we need control over the IOLoop.
@skipIfNonUnix
class ProcessTest(unittest.TestCase):
- def get_app(self):
- class ProcessHandler(RequestHandler):
- def get(self):
- if self.get_argument("exit", None):
- # must use os._exit instead of sys.exit so unittest's
- # exception handler doesn't catch it
- os._exit(int(self.get_argument("exit")))
- if self.get_argument("signal", None):
- os.kill(os.getpid(), int(self.get_argument("signal")))
- self.write(str(os.getpid()))
-
- return Application([("/", ProcessHandler)])
-
- def tearDown(self):
- if task_id() is not None:
- # We're in a child process, and probably got to this point
- # via an uncaught exception. If we return now, both
- # processes will continue with the rest of the test suite.
- # Exit now so the parent process will restart the child
- # (since we don't have a clean way to signal failure to
- # the parent that won't restart)
- logging.error("aborting child process from tearDown")
- logging.shutdown()
- os._exit(1)
- # In the surviving process, clear the alarm we set earlier
- signal.alarm(0)
- super().tearDown()
-
def test_multi_process(self):
- # This test doesn't work on twisted because we use the global
- # reactor and don't restore it to a sane state after the fork
- # (asyncio has the same issue, but we have a special case in
- # place for it).
- with ExpectLog(
- gen_log, "(Starting .* processes|child .* exited|uncaught exception)"
- ):
- sock, port = bind_unused_port()
-
- def get_url(path):
- return "http://127.0.0.1:%d%s" % (port, path)
-
- # ensure that none of these processes live too long
- signal.alarm(5) # master process
- try:
- id = fork_processes(3, max_restarts=3)
- self.assertIsNotNone(id)
- signal.alarm(5) # child processes
- except SystemExit as e:
- # if we exit cleanly from fork_processes, all the child processes
- # finished with status 0
- self.assertEqual(e.code, 0)
- self.assertIsNone(task_id())
- sock.close()
- return
- try:
- if id in (0, 1):
- self.assertEqual(id, task_id())
-
- async def f():
- server = HTTPServer(self.get_app())
- server.add_sockets([sock])
- await asyncio.Event().wait()
-
- asyncio.run(f())
- elif id == 2:
- self.assertEqual(id, task_id())
- sock.close()
- # Always use SimpleAsyncHTTPClient here; the curl
- # version appears to get confused sometimes if the
- # connection gets closed before it's had a chance to
- # switch from writing mode to reading mode.
- client = HTTPClient(SimpleAsyncHTTPClient)
-
- def fetch(url, fail_ok=False):
- try:
- return client.fetch(get_url(url))
- except HTTPError as e:
- if not (fail_ok and e.code == 599):
- raise
-
- # Make two processes exit abnormally
- fetch("/?exit=2", fail_ok=True)
- fetch("/?exit=3", fail_ok=True)
-
- # They've been restarted, so a new fetch will work
- int(fetch("/").body)
-
- # Now the same with signals
- # Disabled because on the mac a process dying with a signal
- # can trigger an "Application exited abnormally; send error
- # report to Apple?" prompt.
- # fetch("/?signal=%d" % signal.SIGTERM, fail_ok=True)
- # fetch("/?signal=%d" % signal.SIGABRT, fail_ok=True)
- # int(fetch("/").body)
-
- # Now kill them normally so they won't be restarted
- fetch("/?exit=0", fail_ok=True)
- # One process left; watch it's pid change
- pid = int(fetch("/").body)
- fetch("/?exit=4", fail_ok=True)
- pid2 = int(fetch("/").body)
- self.assertNotEqual(pid, pid2)
-
- # Kill the last one so we shut down cleanly
- fetch("/?exit=0", fail_ok=True)
-
- os._exit(0)
- except Exception:
- logging.error("exception in child process %d", id, exc_info=True)
- raise
+ # Run the test body in a fresh interpreter so fork_processes()
+ # starts from a single-threaded state. See the comment on
+ # _MULTI_PROCESS_TEST_SCRIPT.
+ parts = [os.getcwd()]
+ if "PYTHONPATH" in os.environ:
+ parts += os.environ["PYTHONPATH"].split(os.pathsep)
+ env = dict(os.environ, PYTHONPATH=os.pathsep.join(parts))
+
+ result = subprocess.run(
+ [sys.executable, "-c", _MULTI_PROCESS_TEST_SCRIPT],
+ env=env,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ timeout=30,
+ )
+ if result.returncode != 0:
+ self.fail(
+ "test_multi_process subprocess exited with status %d\n"
+ "----- stdout -----\n%s"
+ "----- stderr -----\n%s"
+ % (
+ result.returncode,
+ result.stdout.decode("utf-8", errors="replace"),
+ result.stderr.decode("utf-8", errors="replace"),
+ )
+ )
@skipIfNonUnix