]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Better fork_processes: Restart processes when they exit abnormally. Tests!
authorBen Darnell <ben@bendarnell.com>
Tue, 5 Jul 2011 02:11:32 +0000 (19:11 -0700)
committerBen Darnell <ben@bendarnell.com>
Tue, 5 Jul 2011 02:11:32 +0000 (19:11 -0700)
tornado/process.py
tornado/test/process_test.py [new file with mode: 0644]
tornado/test/runtests.py

index 0bd4bde7c6df4f3cf1c4a30687693e955b40865f..2672227c38f99ada00529605d49963fef87f8b0c 100644 (file)
@@ -16,6 +16,7 @@
 
 """Utilities for working with multiple processes."""
 
+import errno
 import logging
 import os
 import sys
@@ -58,36 +59,86 @@ def _reseed_random():
     random.seed(seed)
 
 
-_processes_forked = False
+_task_id = None
 
-def fork_processes(num_processes):
+def fork_processes(num_processes, max_restarts=100):
     """Starts multiple worker processes.
 
-    If num_processes is None or <= 0, we detect the number of cores
+    If ``num_processes`` is None or <= 0, we detect the number of cores
     available on this machine and fork that number of child
-    processes. If num_processes is given and > 0, we fork that
+    processes. If ``num_processes`` is given and > 0, we fork that
     specific number of sub-processes.
 
     Since we use processes and not threads, there is no shared memory
     between any server code.
 
     Note that multiple processes are not compatible with the autoreload
-    module (or the debug=True option to tornado.web.Application).
+    module (or the debug=True option to `tornado.web.Application`).
     When using multiple processes, no IOLoops can be created or
-    referenced until after the call to fork_processes.
+    referenced until after the call to ``fork_processes``.
+
+    In each child process, ``fork_processes`` returns its *task id*, a
+    number between 0 and ``num_processes``.  Processes that exit
+    abnormally (due to a signal or non-zero exit status) are restarted
+    with the same id (up to ``max_restarts`` times).  In the parent
+    process, ``fork_processes`` returns None if all child processes
+    have exited normally, but will otherwise only exit by throwing an
+    exception.
     """
-    global _processes_forked
-    assert not _processes_forked
-    _processes_forked = True
+    global _task_id
+    assert _task_id is None
     if num_processes is None or num_processes <= 0:
         num_processes = cpu_count()
     if ioloop.IOLoop.initialized():
         raise RuntimeError("Cannot run in multiple processes: IOLoop instance "
                            "has already been initialized. You cannot call "
                            "IOLoop.instance() before calling start_processes()")
-    logging.info("Starting %d server processes", num_processes)
-    for i in range(num_processes):
-        if os.fork() == 0:
+    logging.info("Starting %d processes", num_processes)
+    children = {}
+    def start_child(i):
+        pid = os.fork()
+        if pid == 0:
+            # child process
             _reseed_random()
-            return
-    os.waitpid(-1, 0)
+            global _task_id
+            _task_id = i
+            return i
+        else:
+            children[pid] = i
+            return None
+    for i in range(num_processes):
+        id = start_child(i)
+        if id is not None: return id
+    num_restarts = 0
+    while children:
+        try:
+            pid, status = os.wait()
+        except OSError, e:
+            if e.errno == errno.EINTR:
+                continue
+            raise
+        if pid not in children:
+            continue
+        id = children.pop(pid)
+        if os.WIFSIGNALED(status):
+            logging.warning("child %d (pid %d) killed by signal %d, restarting",
+                            id, pid, os.WTERMSIG(status))
+        elif os.WEXITSTATUS(status) != 0:
+            logging.warning("child %d (pid %d) exited with status %d, restarting",
+                            id, pid, os.WEXITSTATUS(status))
+        else:
+            logging.info("child %d (pid %d) exited normally", id, pid)
+            continue
+        num_restarts += 1
+        if num_restarts > max_restarts:
+            raise RuntimeError("Too many child restarts, giving up")
+        new_id = start_child(id)
+        if new_id is not None: return new_id
+
+def task_id():
+    """Returns the current task id, if any.
+
+    Returns None if this process was not created by `fork_processes`.
+    """
+    global _task_id
+    return _task_id
diff --git a/tornado/test/process_test.py b/tornado/test/process_test.py
new file mode 100644 (file)
index 0000000..35d3325
--- /dev/null
@@ -0,0 +1,98 @@
+#!/usr/bin/env python
+
+import functools
+import os
+import signal
+from tornado.httpclient import HTTPClient
+from tornado.httpserver import HTTPServer
+from tornado.ioloop import IOLoop
+from tornado.netutil import bind_sockets
+from tornado.process import fork_processes, task_id
+from tornado.testing import LogTrapTestCase, get_unused_port
+from tornado.web import RequestHandler, Application
+
+# Not using AsyncHTTPTestCase because we need control over the IOLoop.
+class ProcessTest(LogTrapTestCase):
+    def get_app(self):
+        class ProcessHandler(RequestHandler):
+            def get(self):
+                if self.get_argument("exit", None):
+                    # must use os._exit instead of sys.exit so unittest's
+                    # exception handler doesn't catch it
+                    IOLoop.instance().add_callback(functools.partial(
+                            os._exit, int(self.get_argument("exit"))))
+                if self.get_argument("signal", None):
+                    IOLoop.instance().add_callback(functools.partial(
+                            os.kill, os.getpid(),
+                            int(self.get_argument("signal"))))
+                self.write(str(os.getpid()))
+        return Application([("/", ProcessHandler)])
+
+    def tearDown(self):
+        if task_id() is not None:
+            # We're in a child process, and probably got to this point
+            # via an uncaught exception.  If we return now, both
+            # processes will continue with the rest of the test suite.
+            # Exit now so the parent process will restart the child
+            # (since we don't have a clean way to signal failure to
+            # the parent that won't restart)
+            os._exit(1)
+        super(ProcessTest, self).tearDown()
+
+    def test_multi_process(self):
+        self.assertFalse(IOLoop.initialized())
+        port = get_unused_port()
+        def get_url(path):
+            return "http://127.0.0.1:%d%s" % (port, path)
+        sockets = bind_sockets(port, "127.0.0.1")
+        # ensure that none of these processes live too long
+        signal.alarm(5)  # master process
+        id = fork_processes(3, max_restarts=3)
+        if id is None:
+            # back in the master process; everything worked!
+            self.assertTrue(task_id() is None)
+            for sock in sockets: sock.close()
+            signal.alarm(0)
+            return
+        signal.alarm(5)  # child process
+        if id in (0, 1):
+            signal.alarm(5)
+            self.assertEqual(id, task_id())
+            server = HTTPServer(self.get_app())
+            server.add_sockets(sockets)
+            IOLoop.instance().start()
+        elif id == 2:
+            signal.alarm(5)
+            self.assertEqual(id, task_id())
+            for sock in sockets: sock.close()
+            client = HTTPClient()
+
+            # Make both processes exit abnormally
+            client.fetch(get_url("/?exit=2"))
+            client.fetch(get_url("/?exit=3"))
+
+            # They've been restarted, so a new fetch will work
+            int(client.fetch(get_url("/")).body)
+            
+            # Now the same with signals
+            # Disabled because on the mac a process dying with a signal
+            # can trigger an "Application exited abnormally; send error
+            # report to Apple?" prompt.
+            #client.fetch(get_url("/?signal=%d" % signal.SIGTERM))
+            #client.fetch(get_url("/?signal=%d" % signal.SIGABRT))
+            #int(client.fetch(get_url("/")).body)
+
+            # Now kill them normally so they won't be restarted
+            client.fetch(get_url("/?exit=0"))
+            # One process left; watch it's pid change
+            pid = int(client.fetch(get_url("/")).body)
+            client.fetch(get_url("/?exit=1"))
+            pid2 = int(client.fetch(get_url("/")).body)
+            self.assertNotEqual(pid, pid2)
+
+            # Kill the last one so we shut down cleanly
+            client.fetch(get_url("/?exit=0"))
+            
+            os._exit(0)
+            
+
index e1b1a4f534b16cf56c7d084a8f6cc8091f723d95..7f6d3b68a64420a6d9b10f992f3e0106cd051148 100755 (executable)
@@ -13,6 +13,7 @@ TEST_MODULES = [
     'tornado.test.import_test',
     'tornado.test.ioloop_test',
     'tornado.test.iostream_test',
+    'tornado.test.process_test',
     'tornado.test.simple_httpclient_test',
     'tornado.test.stack_context_test',
     'tornado.test.template_test',