From: Ben Darnell Date: Tue, 5 Jul 2011 02:11:32 +0000 (-0700) Subject: Better fork_processes: Restart processes when they exit abnormally. Tests! X-Git-Tag: v2.1.0~119 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=5ef04877085853298ccbb2bac066f0210cc0a18c;p=thirdparty%2Ftornado.git Better fork_processes: Restart processes when they exit abnormally. Tests! --- diff --git a/tornado/process.py b/tornado/process.py index 0bd4bde7c..2672227c3 100644 --- a/tornado/process.py +++ b/tornado/process.py @@ -16,6 +16,7 @@ """Utilities for working with multiple processes.""" +import errno import logging import os import sys @@ -58,36 +59,86 @@ def _reseed_random(): random.seed(seed) -_processes_forked = False +_task_id = None -def fork_processes(num_processes): +def fork_processes(num_processes, max_restarts=100): """Starts multiple worker processes. - If num_processes is None or <= 0, we detect the number of cores + If ``num_processes`` is None or <= 0, we detect the number of cores available on this machine and fork that number of child - processes. If num_processes is given and > 0, we fork that + processes. If ``num_processes`` is given and > 0, we fork that specific number of sub-processes. Since we use processes and not threads, there is no shared memory between any server code. Note that multiple processes are not compatible with the autoreload - module (or the debug=True option to tornado.web.Application). + module (or the debug=True option to `tornado.web.Application`). When using multiple processes, no IOLoops can be created or - referenced until after the call to fork_processes. + referenced until after the call to ``fork_processes``. + + In each child process, ``fork_processes`` returns its *task id*, a + number between 0 and ``num_processes``. Processes that exit + abnormally (due to a signal or non-zero exit status) are restarted + with the same id (up to ``max_restarts`` times). In the parent + process, ``fork_processes`` returns None if all child processes + have exited normally, but will otherwise only exit by throwing an + exception. """ - global _processes_forked - assert not _processes_forked - _processes_forked = True + global _task_id + assert _task_id is None if num_processes is None or num_processes <= 0: num_processes = cpu_count() if ioloop.IOLoop.initialized(): raise RuntimeError("Cannot run in multiple processes: IOLoop instance " "has already been initialized. You cannot call " "IOLoop.instance() before calling start_processes()") - logging.info("Starting %d server processes", num_processes) - for i in range(num_processes): - if os.fork() == 0: + logging.info("Starting %d processes", num_processes) + children = {} + def start_child(i): + pid = os.fork() + if pid == 0: + # child process _reseed_random() - return - os.waitpid(-1, 0) + global _task_id + _task_id = i + return i + else: + children[pid] = i + return None + for i in range(num_processes): + id = start_child(i) + if id is not None: return id + num_restarts = 0 + while children: + try: + pid, status = os.wait() + except OSError, e: + if e.errno == errno.EINTR: + continue + raise + if pid not in children: + continue + id = children.pop(pid) + if os.WIFSIGNALED(status): + logging.warning("child %d (pid %d) killed by signal %d, restarting", + id, pid, os.WTERMSIG(status)) + elif os.WEXITSTATUS(status) != 0: + logging.warning("child %d (pid %d) exited with status %d, restarting", + id, pid, os.WEXITSTATUS(status)) + else: + logging.info("child %d (pid %d) exited normally", id, pid) + continue + num_restarts += 1 + if num_restarts > max_restarts: + raise RuntimeError("Too many child restarts, giving up") + new_id = start_child(id) + if new_id is not None: return new_id + +def task_id(): + """Returns the current task id, if any. + + Returns None if this process was not created by `fork_processes`. + """ + global _task_id + return _task_id diff --git a/tornado/test/process_test.py b/tornado/test/process_test.py new file mode 100644 index 000000000..35d3325b1 --- /dev/null +++ b/tornado/test/process_test.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python + +import functools +import os +import signal +from tornado.httpclient import HTTPClient +from tornado.httpserver import HTTPServer +from tornado.ioloop import IOLoop +from tornado.netutil import bind_sockets +from tornado.process import fork_processes, task_id +from tornado.testing import LogTrapTestCase, get_unused_port +from tornado.web import RequestHandler, Application + +# Not using AsyncHTTPTestCase because we need control over the IOLoop. +class ProcessTest(LogTrapTestCase): + def get_app(self): + class ProcessHandler(RequestHandler): + def get(self): + if self.get_argument("exit", None): + # must use os._exit instead of sys.exit so unittest's + # exception handler doesn't catch it + IOLoop.instance().add_callback(functools.partial( + os._exit, int(self.get_argument("exit")))) + if self.get_argument("signal", None): + IOLoop.instance().add_callback(functools.partial( + os.kill, os.getpid(), + int(self.get_argument("signal")))) + self.write(str(os.getpid())) + return Application([("/", ProcessHandler)]) + + def tearDown(self): + if task_id() is not None: + # We're in a child process, and probably got to this point + # via an uncaught exception. If we return now, both + # processes will continue with the rest of the test suite. + # Exit now so the parent process will restart the child + # (since we don't have a clean way to signal failure to + # the parent that won't restart) + os._exit(1) + super(ProcessTest, self).tearDown() + + def test_multi_process(self): + self.assertFalse(IOLoop.initialized()) + port = get_unused_port() + def get_url(path): + return "http://127.0.0.1:%d%s" % (port, path) + sockets = bind_sockets(port, "127.0.0.1") + # ensure that none of these processes live too long + signal.alarm(5) # master process + id = fork_processes(3, max_restarts=3) + if id is None: + # back in the master process; everything worked! + self.assertTrue(task_id() is None) + for sock in sockets: sock.close() + signal.alarm(0) + return + signal.alarm(5) # child process + if id in (0, 1): + signal.alarm(5) + self.assertEqual(id, task_id()) + server = HTTPServer(self.get_app()) + server.add_sockets(sockets) + IOLoop.instance().start() + elif id == 2: + signal.alarm(5) + self.assertEqual(id, task_id()) + for sock in sockets: sock.close() + client = HTTPClient() + + # Make both processes exit abnormally + client.fetch(get_url("/?exit=2")) + client.fetch(get_url("/?exit=3")) + + # They've been restarted, so a new fetch will work + int(client.fetch(get_url("/")).body) + + # Now the same with signals + # Disabled because on the mac a process dying with a signal + # can trigger an "Application exited abnormally; send error + # report to Apple?" prompt. + #client.fetch(get_url("/?signal=%d" % signal.SIGTERM)) + #client.fetch(get_url("/?signal=%d" % signal.SIGABRT)) + #int(client.fetch(get_url("/")).body) + + # Now kill them normally so they won't be restarted + client.fetch(get_url("/?exit=0")) + # One process left; watch it's pid change + pid = int(client.fetch(get_url("/")).body) + client.fetch(get_url("/?exit=1")) + pid2 = int(client.fetch(get_url("/")).body) + self.assertNotEqual(pid, pid2) + + # Kill the last one so we shut down cleanly + client.fetch(get_url("/?exit=0")) + + os._exit(0) + + diff --git a/tornado/test/runtests.py b/tornado/test/runtests.py index e1b1a4f53..7f6d3b68a 100755 --- a/tornado/test/runtests.py +++ b/tornado/test/runtests.py @@ -13,6 +13,7 @@ TEST_MODULES = [ 'tornado.test.import_test', 'tornado.test.ioloop_test', 'tornado.test.iostream_test', + 'tornado.test.process_test', 'tornado.test.simple_httpclient_test', 'tornado.test.stack_context_test', 'tornado.test.template_test',