"""Utilities for working with multiple processes."""
+import errno
import logging
import os
import sys
random.seed(seed)
-_processes_forked = False
+_task_id = None
-def fork_processes(num_processes):
+def fork_processes(num_processes, max_restarts=100):
"""Starts multiple worker processes.
- If num_processes is None or <= 0, we detect the number of cores
+ If ``num_processes`` is None or <= 0, we detect the number of cores
available on this machine and fork that number of child
- processes. If num_processes is given and > 0, we fork that
+ processes. If ``num_processes`` is given and > 0, we fork that
specific number of sub-processes.
Since we use processes and not threads, there is no shared memory
between any server code.
Note that multiple processes are not compatible with the autoreload
- module (or the debug=True option to tornado.web.Application).
+ module (or the debug=True option to `tornado.web.Application`).
When using multiple processes, no IOLoops can be created or
- referenced until after the call to fork_processes.
+ referenced until after the call to ``fork_processes``.
+
+ In each child process, ``fork_processes`` returns its *task id*, a
+ number between 0 and ``num_processes``. Processes that exit
+ abnormally (due to a signal or non-zero exit status) are restarted
+ with the same id (up to ``max_restarts`` times). In the parent
+ process, ``fork_processes`` returns None if all child processes
+ have exited normally, but will otherwise only exit by throwing an
+ exception.
"""
- global _processes_forked
- assert not _processes_forked
- _processes_forked = True
+ global _task_id
+ assert _task_id is None
if num_processes is None or num_processes <= 0:
num_processes = cpu_count()
if ioloop.IOLoop.initialized():
raise RuntimeError("Cannot run in multiple processes: IOLoop instance "
"has already been initialized. You cannot call "
"IOLoop.instance() before calling start_processes()")
- logging.info("Starting %d server processes", num_processes)
- for i in range(num_processes):
- if os.fork() == 0:
+ logging.info("Starting %d processes", num_processes)
+ children = {}
+ def start_child(i):
+ pid = os.fork()
+ if pid == 0:
+ # child process
_reseed_random()
- return
- os.waitpid(-1, 0)
+ global _task_id
+ _task_id = i
+ return i
+ else:
+ children[pid] = i
+ return None
+ for i in range(num_processes):
+ id = start_child(i)
+ if id is not None: return id
+ num_restarts = 0
+ while children:
+ try:
+ pid, status = os.wait()
+ except OSError, e:
+ if e.errno == errno.EINTR:
+ continue
+ raise
+ if pid not in children:
+ continue
+ id = children.pop(pid)
+ if os.WIFSIGNALED(status):
+ logging.warning("child %d (pid %d) killed by signal %d, restarting",
+ id, pid, os.WTERMSIG(status))
+ elif os.WEXITSTATUS(status) != 0:
+ logging.warning("child %d (pid %d) exited with status %d, restarting",
+ id, pid, os.WEXITSTATUS(status))
+ else:
+ logging.info("child %d (pid %d) exited normally", id, pid)
+ continue
+ num_restarts += 1
+ if num_restarts > max_restarts:
+ raise RuntimeError("Too many child restarts, giving up")
+ new_id = start_child(id)
+ if new_id is not None: return new_id
+
+def task_id():
+ """Returns the current task id, if any.
+
+ Returns None if this process was not created by `fork_processes`.
+ """
+ global _task_id
+ return _task_id
--- /dev/null
+#!/usr/bin/env python
+
+import functools
+import os
+import signal
+from tornado.httpclient import HTTPClient
+from tornado.httpserver import HTTPServer
+from tornado.ioloop import IOLoop
+from tornado.netutil import bind_sockets
+from tornado.process import fork_processes, task_id
+from tornado.testing import LogTrapTestCase, get_unused_port
+from tornado.web import RequestHandler, Application
+
+# Not using AsyncHTTPTestCase because we need control over the IOLoop.
+class ProcessTest(LogTrapTestCase):
+ def get_app(self):
+ class ProcessHandler(RequestHandler):
+ def get(self):
+ if self.get_argument("exit", None):
+ # must use os._exit instead of sys.exit so unittest's
+ # exception handler doesn't catch it
+ IOLoop.instance().add_callback(functools.partial(
+ os._exit, int(self.get_argument("exit"))))
+ if self.get_argument("signal", None):
+ IOLoop.instance().add_callback(functools.partial(
+ os.kill, os.getpid(),
+ int(self.get_argument("signal"))))
+ self.write(str(os.getpid()))
+ return Application([("/", ProcessHandler)])
+
+ def tearDown(self):
+ if task_id() is not None:
+ # We're in a child process, and probably got to this point
+ # via an uncaught exception. If we return now, both
+ # processes will continue with the rest of the test suite.
+ # Exit now so the parent process will restart the child
+ # (since we don't have a clean way to signal failure to
+ # the parent that won't restart)
+ os._exit(1)
+ super(ProcessTest, self).tearDown()
+
+ def test_multi_process(self):
+ self.assertFalse(IOLoop.initialized())
+ port = get_unused_port()
+ def get_url(path):
+ return "http://127.0.0.1:%d%s" % (port, path)
+ sockets = bind_sockets(port, "127.0.0.1")
+ # ensure that none of these processes live too long
+ signal.alarm(5) # master process
+ id = fork_processes(3, max_restarts=3)
+ if id is None:
+ # back in the master process; everything worked!
+ self.assertTrue(task_id() is None)
+ for sock in sockets: sock.close()
+ signal.alarm(0)
+ return
+ signal.alarm(5) # child process
+ if id in (0, 1):
+ signal.alarm(5)
+ self.assertEqual(id, task_id())
+ server = HTTPServer(self.get_app())
+ server.add_sockets(sockets)
+ IOLoop.instance().start()
+ elif id == 2:
+ signal.alarm(5)
+ self.assertEqual(id, task_id())
+ for sock in sockets: sock.close()
+ client = HTTPClient()
+
+ # Make both processes exit abnormally
+ client.fetch(get_url("/?exit=2"))
+ client.fetch(get_url("/?exit=3"))
+
+ # They've been restarted, so a new fetch will work
+ int(client.fetch(get_url("/")).body)
+
+ # Now the same with signals
+ # Disabled because on the mac a process dying with a signal
+ # can trigger an "Application exited abnormally; send error
+ # report to Apple?" prompt.
+ #client.fetch(get_url("/?signal=%d" % signal.SIGTERM))
+ #client.fetch(get_url("/?signal=%d" % signal.SIGABRT))
+ #int(client.fetch(get_url("/")).body)
+
+ # Now kill them normally so they won't be restarted
+ client.fetch(get_url("/?exit=0"))
+ # One process left; watch it's pid change
+ pid = int(client.fetch(get_url("/")).body)
+ client.fetch(get_url("/?exit=1"))
+ pid2 = int(client.fetch(get_url("/")).body)
+ self.assertNotEqual(pid, pid2)
+
+ # Kill the last one so we shut down cleanly
+ client.fetch(get_url("/?exit=0"))
+
+ os._exit(0)
+
+
'tornado.test.import_test',
'tornado.test.ioloop_test',
'tornado.test.iostream_test',
+ 'tornado.test.process_test',
'tornado.test.simple_httpclient_test',
'tornado.test.stack_context_test',
'tornado.test.template_test',