and use it throughout the test suite.
import functools
from tornado.escape import url_escape
from tornado.httpclient import AsyncHTTPClient
-from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, LogTrapTestCase
+from tornado.log import app_log
+from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, ExpectLog
from tornado.util import b
from tornado.web import Application, RequestHandler, asynchronous
self.finish('ok')
-class GenWebTest(AsyncHTTPTestCase, LogTrapTestCase):
+class GenWebTest(AsyncHTTPTestCase):
def get_app(self):
return Application([
('/sequence', GenSequenceHandler),
def test_exception_handler(self):
# Make sure we get an error and not a timeout
- response = self.fetch('/exception')
+ with ExpectLog(app_log, "Uncaught exception GET /exception"):
+ response = self.fetch('/exception')
self.assertEqual(500, response.code)
def test_yield_exception_handler(self):
from tornado.escape import json_decode, utf8, _unicode, recursive_unicode, native_str
from tornado.httpserver import HTTPServer
from tornado.httputil import HTTPHeaders
-from tornado.ioloop import IOLoop
from tornado.iostream import IOStream
+from tornado.log import gen_log
from tornado.simple_httpclient import SimpleAsyncHTTPClient
-from tornado.testing import AsyncHTTPTestCase, AsyncHTTPSTestCase, AsyncTestCase, LogTrapTestCase
+from tornado.testing import AsyncHTTPTestCase, AsyncHTTPSTestCase, AsyncTestCase, ExpectLog
from tornado.test.util import unittest
from tornado.util import b, bytes_type
from tornado.web import Application, RequestHandler
"old version of ssl module and/or openssl")
-class BaseSSLTest(AsyncHTTPSTestCase, LogTrapTestCase):
+class BaseSSLTest(AsyncHTTPSTestCase):
def get_app(self):
return Application([('/', HelloWorldRequestHandler,
dict(protocol="https"))])
# Make sure the server closes the connection when it gets a non-ssl
# connection, rather than waiting for a timeout or otherwise
# misbehaving.
- self.http_client.fetch(self.get_url("/"), self.stop,
- request_timeout=3600,
- connect_timeout=3600)
- response = self.wait()
+ with ExpectLog(gen_log, '(SSL Error|uncaught exception)'):
+ self.http_client.fetch(self.get_url("/"), self.stop,
+ request_timeout=3600,
+ connect_timeout=3600)
+ response = self.wait()
self.assertEqual(response.code, 599)
# Python's SSL implementation differs significantly between versions.
from __future__ import absolute_import, division, with_statement
from tornado.httputil import url_concat, parse_multipart_form_data, HTTPHeaders
from tornado.escape import utf8
-from tornado.testing import LogTrapTestCase
+from tornado.log import gen_log
+from tornado.testing import ExpectLog
from tornado.test.util import unittest
from tornado.util import b
import logging
self.assertEqual(url, "https://localhost/path?r=1&t=2")
-class MultipartFormDataTest(LogTrapTestCase):
+class MultipartFormDataTest(unittest.TestCase):
def test_file_upload(self):
data = b("""\
--1234
'a\\b.txt',
]
for filename in filenames:
- logging.info("trying filename %r", filename)
+ logging.debug("trying filename %r", filename)
data = """\
--1234
Content-Disposition: form-data; name="files"; filename="%s"
--1234--''').replace(b("\n"), b("\r\n"))
args = {}
files = {}
- parse_multipart_form_data(b("1234"), data, args, files)
+ with ExpectLog(gen_log, "multipart/form-data missing headers"):
+ parse_multipart_form_data(b("1234"), data, args, files)
self.assertEqual(files, {})
def test_invalid_content_disposition(self):
--1234--''').replace(b("\n"), b("\r\n"))
args = {}
files = {}
- parse_multipart_form_data(b("1234"), data, args, files)
+ with ExpectLog(gen_log, "Invalid multipart/form-data"):
+ parse_multipart_form_data(b("1234"), data, args, files)
self.assertEqual(files, {})
def test_line_does_not_end_with_correct_line_break(self):
Foo--1234--''').replace(b("\n"), b("\r\n"))
args = {}
files = {}
- parse_multipart_form_data(b("1234"), data, args, files)
+ with ExpectLog(gen_log, "Invalid multipart/form-data"):
+ parse_multipart_form_data(b("1234"), data, args, files)
self.assertEqual(files, {})
def test_content_disposition_header_without_name_parameter(self):
--1234--""").replace(b("\n"), b("\r\n"))
args = {}
files = {}
- parse_multipart_form_data(b("1234"), data, args, files)
+ with ExpectLog(gen_log, "multipart/form-data value missing name"):
+ parse_multipart_form_data(b("1234"), data, args, files)
self.assertEqual(files, {})
def test_data_after_final_boundary(self):
from tornado import netutil
from tornado.ioloop import IOLoop
from tornado.iostream import IOStream, SSLIOStream
-from tornado.testing import AsyncHTTPTestCase, AsyncHTTPSTestCase, AsyncTestCase, LogTrapTestCase, get_unused_port
+from tornado.log import gen_log
+from tornado.testing import AsyncHTTPTestCase, AsyncHTTPSTestCase, AsyncTestCase, get_unused_port, ExpectLog
from tornado.test.util import unittest
from tornado.util import b
from tornado.web import RequestHandler, Application
try:
self.wait(lambda: connected[0] and written[0])
finally:
- logging.info((connected, written))
+ logging.debug((connected, written))
stream.read_until_close(self.stop)
data = self.wait()
def connect_callback():
self.connect_called = True
stream.set_close_callback(self.stop)
- stream.connect(("localhost", port), connect_callback)
- self.wait()
+ # log messages vary by platform and ioloop implementation
+ with ExpectLog(gen_log, ".*", required=False):
+ stream.connect(("localhost", port), connect_callback)
+ self.wait()
self.assertFalse(self.connect_called)
self.assertTrue(isinstance(stream.error, socket.error), stream.error)
if sys.platform != 'cygwin':
# instead of a name that's simply unlikely to exist (since
# opendns and some ISPs return bogus addresses for nonexistent
# domains instead of the proper error codes).
- stream.connect(('an invalid domain', 54321))
- self.assertTrue(isinstance(stream.error, socket.gaierror), stream.error)
+ with ExpectLog(gen_log, "Connect error"):
+ stream.connect(('an invalid domain', 54321))
+ self.assertTrue(isinstance(stream.error, socket.gaierror), stream.error)
def test_streaming_callback(self):
server, client = self.make_iostream_pair()
client.close()
-class TestIOStreamWebHTTP(TestIOStreamWebMixin, AsyncHTTPTestCase,
- LogTrapTestCase):
+class TestIOStreamWebHTTP(TestIOStreamWebMixin, AsyncHTTPTestCase):
def _make_client_iostream(self):
return IOStream(socket.socket(), io_loop=self.io_loop)
-class TestIOStreamWebHTTPS(TestIOStreamWebMixin, AsyncHTTPSTestCase,
- LogTrapTestCase):
+class TestIOStreamWebHTTPS(TestIOStreamWebMixin, AsyncHTTPSTestCase):
def _make_client_iostream(self):
return SSLIOStream(socket.socket(), io_loop=self.io_loop)
TestIOStreamWebHTTPS = skipIfNoSSL(TestIOStreamWebHTTPS)
-class TestIOStream(TestIOStreamMixin, AsyncTestCase, LogTrapTestCase):
+class TestIOStream(TestIOStreamMixin, AsyncTestCase):
def _make_server_iostream(self, connection, **kwargs):
return IOStream(connection, io_loop=self.io_loop, **kwargs)
return IOStream(connection, io_loop=self.io_loop, **kwargs)
-class TestIOStreamSSL(TestIOStreamMixin, AsyncTestCase, LogTrapTestCase):
+class TestIOStreamSSL(TestIOStreamMixin, AsyncTestCase):
def _make_server_iostream(self, connection, **kwargs):
ssl_options = dict(
certfile=os.path.join(os.path.dirname(__file__), 'test.crt'),
from tornado.httpclient import HTTPClient, HTTPError
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
+from tornado.log import gen_log
from tornado.netutil import bind_sockets
from tornado.process import fork_processes, task_id
from tornado.simple_httpclient import SimpleAsyncHTTPClient
-from tornado.testing import LogTrapTestCase, get_unused_port
+from tornado.testing import get_unused_port, ExpectLog
from tornado.test.util import unittest
from tornado.web import RequestHandler, Application
# Not using AsyncHTTPTestCase because we need control over the IOLoop.
-# Logging is tricky here so you may want to replace LogTrapTestCase
-# with unittest.TestCase when debugging.
-
-
-class ProcessTest(LogTrapTestCase):
+class ProcessTest(unittest.TestCase):
def get_app(self):
class ProcessHandler(RequestHandler):
def get(self):
super(ProcessTest, self).tearDown()
def test_multi_process(self):
- self.assertFalse(IOLoop.initialized())
- port = get_unused_port()
+ with ExpectLog(gen_log, "(Starting .* processes|child .* exited|uncaught exception)"):
+ self.assertFalse(IOLoop.initialized())
+ port = get_unused_port()
- def get_url(path):
- return "http://127.0.0.1:%d%s" % (port, path)
- sockets = bind_sockets(port, "127.0.0.1")
- # ensure that none of these processes live too long
- signal.alarm(5) # master process
- try:
- id = fork_processes(3, max_restarts=3)
- self.assertTrue(id is not None)
- signal.alarm(5) # child processes
- except SystemExit, e:
- # if we exit cleanly from fork_processes, all the child processes
- # finished with status 0
- self.assertEqual(e.code, 0)
- self.assertTrue(task_id() is None)
- for sock in sockets:
- sock.close()
- return
- try:
- if id in (0, 1):
- self.assertEqual(id, task_id())
- server = HTTPServer(self.get_app())
- server.add_sockets(sockets)
- IOLoop.instance().start()
- elif id == 2:
- self.assertEqual(id, task_id())
+ def get_url(path):
+ return "http://127.0.0.1:%d%s" % (port, path)
+ sockets = bind_sockets(port, "127.0.0.1")
+ # ensure that none of these processes live too long
+ signal.alarm(5) # master process
+ try:
+ id = fork_processes(3, max_restarts=3)
+ self.assertTrue(id is not None)
+ signal.alarm(5) # child processes
+ except SystemExit, e:
+ # if we exit cleanly from fork_processes, all the child processes
+ # finished with status 0
+ self.assertEqual(e.code, 0)
+ self.assertTrue(task_id() is None)
for sock in sockets:
sock.close()
- # Always use SimpleAsyncHTTPClient here; the curl
- # version appears to get confused sometimes if the
- # connection gets closed before it's had a chance to
- # switch from writing mode to reading mode.
- client = HTTPClient(SimpleAsyncHTTPClient)
+ return
+ try:
+ if id in (0, 1):
+ self.assertEqual(id, task_id())
+ server = HTTPServer(self.get_app())
+ server.add_sockets(sockets)
+ IOLoop.instance().start()
+ elif id == 2:
+ self.assertEqual(id, task_id())
+ for sock in sockets:
+ sock.close()
+ # Always use SimpleAsyncHTTPClient here; the curl
+ # version appears to get confused sometimes if the
+ # connection gets closed before it's had a chance to
+ # switch from writing mode to reading mode.
+ client = HTTPClient(SimpleAsyncHTTPClient)
- def fetch(url, fail_ok=False):
- try:
- return client.fetch(get_url(url))
- except HTTPError, e:
- if not (fail_ok and e.code == 599):
- raise
+ def fetch(url, fail_ok=False):
+ try:
+ return client.fetch(get_url(url))
+ except HTTPError, e:
+ if not (fail_ok and e.code == 599):
+ raise
- # Make two processes exit abnormally
- fetch("/?exit=2", fail_ok=True)
- fetch("/?exit=3", fail_ok=True)
+ # Make two processes exit abnormally
+ fetch("/?exit=2", fail_ok=True)
+ fetch("/?exit=3", fail_ok=True)
- # They've been restarted, so a new fetch will work
- int(fetch("/").body)
+ # They've been restarted, so a new fetch will work
+ int(fetch("/").body)
- # Now the same with signals
- # Disabled because on the mac a process dying with a signal
- # can trigger an "Application exited abnormally; send error
- # report to Apple?" prompt.
- #fetch("/?signal=%d" % signal.SIGTERM, fail_ok=True)
- #fetch("/?signal=%d" % signal.SIGABRT, fail_ok=True)
- #int(fetch("/").body)
+ # Now the same with signals
+ # Disabled because on the mac a process dying with a signal
+ # can trigger an "Application exited abnormally; send error
+ # report to Apple?" prompt.
+ #fetch("/?signal=%d" % signal.SIGTERM, fail_ok=True)
+ #fetch("/?signal=%d" % signal.SIGABRT, fail_ok=True)
+ #int(fetch("/").body)
- # Now kill them normally so they won't be restarted
- fetch("/?exit=0", fail_ok=True)
- # One process left; watch it's pid change
- pid = int(fetch("/").body)
- fetch("/?exit=4", fail_ok=True)
- pid2 = int(fetch("/").body)
- self.assertNotEqual(pid, pid2)
+ # Now kill them normally so they won't be restarted
+ fetch("/?exit=0", fail_ok=True)
+ # One process left; watch it's pid change
+ pid = int(fetch("/").body)
+ fetch("/?exit=4", fail_ok=True)
+ pid2 = int(fetch("/").body)
+ self.assertNotEqual(pid, pid2)
- # Kill the last one so we shut down cleanly
- fetch("/?exit=0", fail_ok=True)
+ # Kill the last one so we shut down cleanly
+ fetch("/?exit=0", fail_ok=True)
- os._exit(0)
- except Exception:
- logging.error("exception in child process %d", id, exc_info=True)
- raise
+ os._exit(0)
+ except Exception:
+ logging.error("exception in child process %d", id, exc_info=True)
+ raise
ProcessTest = unittest.skipIf(os.name != 'posix' or sys.platform == 'cygwin',
"non-unix platform")(ProcessTest)
from tornado.httpclient import AsyncHTTPClient
from tornado.httputil import HTTPHeaders
from tornado.ioloop import IOLoop
+from tornado.log import gen_log
from tornado.simple_httpclient import SimpleAsyncHTTPClient, _DEFAULT_CA_CERTS
from tornado.test.httpclient_test import ChunkHandler, CountdownHandler, HelloWorldHandler
from tornado.test import httpclient_test
-from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, LogTrapTestCase, get_unused_port
+from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, get_unused_port, ExpectLog
from tornado.test.util import unittest
from tornado.util import b
from tornado.web import RequestHandler, Application, asynchronous, url
@asynchronous
def get(self):
- logging.info("queuing trigger")
+ logging.debug("queuing trigger")
self.queue.append(self.finish)
if self.get_argument("wake", "true") == "true":
self.wake_callback()
self.write(self.request.headers["Host"])
-class SimpleHTTPClientTestCase(AsyncHTTPTestCase, LogTrapTestCase):
+class SimpleHTTPClientTestCase(AsyncHTTPTestCase):
def setUp(self):
super(SimpleHTTPClientTestCase, self).setUp()
self.http_client = SimpleAsyncHTTPClient(self.io_loop)
self.assertEqual("POST", response.request.method)
def test_request_timeout(self):
- response = self.fetch('/trigger?wake=false', request_timeout=0.1)
+ with ExpectLog(gen_log, "uncaught exception"):
+ response = self.fetch('/trigger?wake=false', request_timeout=0.1)
self.assertEqual(response.code, 599)
self.assertTrue(0.099 < response.request_time < 0.12, response.request_time)
self.assertEqual(str(response.error), "HTTP 599: Timeout")
url = self.get_url("/hello").replace("localhost", "[::1]")
# ipv6 is currently disabled by default and must be explicitly requested
- self.http_client.fetch(url, self.stop)
- response = self.wait()
+ with ExpectLog(gen_log, "uncaught exception"):
+ self.http_client.fetch(url, self.stop)
+ response = self.wait()
self.assertEqual(response.code, 599)
self.http_client.fetch(url, self.stop, allow_ipv6=True)
response = self.fetch("/content_length?value=2,%202,2")
self.assertEqual(response.body, b("ok"))
- response = self.fetch("/content_length?value=2,4")
- self.assertEqual(response.code, 599)
- response = self.fetch("/content_length?value=2,%202,3")
- self.assertEqual(response.code, 599)
+ with ExpectLog(gen_log, "uncaught exception"):
+ response = self.fetch("/content_length?value=2,4")
+ self.assertEqual(response.code, 599)
+ response = self.fetch("/content_length?value=2,%202,3")
+ self.assertEqual(response.code, 599)
def test_head_request(self):
response = self.fetch("/head", method="HEAD")
self.assertEqual(response.headers["Content-length"], "0")
# 204 status with non-zero content length is malformed
- response = self.fetch("/no_content?error=1")
- self.assertEqual(response.code, 599)
+ with ExpectLog(gen_log, "uncaught exception"):
+ response = self.fetch("/no_content?error=1")
+ self.assertEqual(response.code, 599)
def test_host_header(self):
host_re = re.compile(b("^localhost:[0-9]+$"))
def test_connection_refused(self):
port = get_unused_port()
- self.http_client.fetch("http://localhost:%d/" % port, self.stop)
- response = self.wait()
+ with ExpectLog(gen_log, ".*"):
+ self.http_client.fetch("http://localhost:%d/" % port, self.stop)
+ response = self.wait()
self.assertEqual(599, response.code)
if sys.platform != 'cygwin':
#!/usr/bin/env python
from __future__ import absolute_import, division, with_statement
+from tornado.log import app_log
from tornado.stack_context import StackContext, wrap
-from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, LogTrapTestCase
+from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, ExpectLog
from tornado.test.util import unittest
from tornado.util import b
from tornado.web import asynchronous, Application, RequestHandler
@asynchronous
def get(self):
- logging.info('in get()')
+ logging.debug('in get()')
# call self.part2 without a self.async_callback wrapper. Its
# exception should still get thrown
self.io_loop.add_callback(self.part2)
def part2(self):
- logging.info('in part2()')
+ logging.debug('in part2()')
# Go through a third layer to make sure that contexts once restored
# are again passed on to future callbacks
self.io_loop.add_callback(self.part3)
def part3(self):
- logging.info('in part3()')
+ logging.debug('in part3()')
raise Exception('test exception')
def get_error_html(self, status_code, **kwargs):
return 'unexpected failure'
-class HTTPStackContextTest(AsyncHTTPTestCase, LogTrapTestCase):
+class HTTPStackContextTest(AsyncHTTPTestCase):
def get_app(self):
return Application([('/', TestRequestHandler,
dict(io_loop=self.io_loop))])
def test_stack_context(self):
- self.http_client.fetch(self.get_url('/'), self.handle_response)
- self.wait()
+ with ExpectLog(app_log, "Uncaught exception GET /"):
+ self.http_client.fetch(self.get_url('/'), self.handle_response)
+ self.wait()
self.assertEqual(self.response.code, 500)
self.assertTrue(b('got expected exception') in self.response.body)
self.stop()
-class StackContextTest(AsyncTestCase, LogTrapTestCase):
+class StackContextTest(AsyncTestCase):
def setUp(self):
super(StackContextTest, self).setUp()
self.active_contexts = []
from tornado.escape import utf8, native_str, to_unicode
from tornado.template import Template, DictLoader, ParseError, Loader
-from tornado.testing import LogTrapTestCase
+from tornado.testing import ExpectLog
from tornado.test.util import unittest
from tornado.util import b, bytes_type, ObjectDict
pass
-class StackTraceTest(LogTrapTestCase):
+class StackTraceTest(unittest.TestCase):
def test_error_line_number_expression(self):
loader = DictLoader({"test.html": """one
two{{1/0}}
from tornado import gen
from tornado.escape import json_decode, utf8, to_unicode, recursive_unicode, native_str, to_basestring
from tornado.iostream import IOStream
+from tornado.log import app_log, gen_log
from tornado.template import DictLoader
-from tornado.testing import LogTrapTestCase, AsyncHTTPTestCase
+from tornado.testing import AsyncHTTPTestCase, ExpectLog
+from tornado.test.util import unittest
from tornado.util import b, bytes_type, ObjectDict
from tornado.web import RequestHandler, authenticated, Application, asynchronous, url, HTTPError, StaticFileHandler, _create_signature, create_signed_value
self._cookies[name] = value
-class SecureCookieTest(LogTrapTestCase):
+class SecureCookieTest(unittest.TestCase):
def test_round_trip(self):
handler = CookieTestRequestHandler()
handler.set_secure_cookie('foo', b('bar'))
handler._cookies['foo'] = utf8('1234|5678%s|%s' % (
to_basestring(timestamp), to_basestring(sig)))
# it gets rejected
- self.assertTrue(handler.get_secure_cookie('foo') is None)
+ with ExpectLog(gen_log, "Cookie timestamp in future"):
+ self.assertTrue(handler.get_secure_cookie('foo') is None)
def test_arbitrary_bytes(self):
# Secure cookies accept arbitrary data (which is base64 encoded).
self.assertEqual(handler.get_secure_cookie('foo'), b('\xe9'))
-class CookieTest(AsyncHTTPTestCase, LogTrapTestCase):
+class CookieTest(AsyncHTTPTestCase):
def get_app(self):
class SetCookieHandler(RequestHandler):
def get(self):
('foo="a\\"b"', 'a"b'),
]
for header, expected in data:
- logging.info("trying %r", header)
+ logging.debug("trying %r", header)
response = self.fetch("/get", headers={"Cookie": header})
self.assertEqual(response.body, utf8(expected))
self.test.on_connection_close()
-class ConnectionCloseTest(AsyncHTTPTestCase, LogTrapTestCase):
+class ConnectionCloseTest(AsyncHTTPTestCase):
def get_app(self):
return Application([('/', ConnectionCloseHandler, dict(test=self))])
self.wait()
def on_handler_waiting(self):
- logging.info('handler waiting')
+ logging.debug('handler waiting')
self.stream.close()
def on_connection_close(self):
- logging.info('connection closed')
+ logging.debug('connection closed')
self.stop()
self.assertEqual(response.body, b("ok"))
-class ErrorResponseTest(AsyncHTTPTestCase, LogTrapTestCase):
+class ErrorResponseTest(AsyncHTTPTestCase):
def get_app(self):
class DefaultHandler(RequestHandler):
def get(self):
])
def test_default(self):
- response = self.fetch("/default")
- self.assertEqual(response.code, 500)
- self.assertTrue(b("500: Internal Server Error") in response.body)
+ with ExpectLog(app_log, "Uncaught exception"):
+ response = self.fetch("/default")
+ self.assertEqual(response.code, 500)
+ self.assertTrue(b("500: Internal Server Error") in response.body)
- response = self.fetch("/default?status=503")
- self.assertEqual(response.code, 503)
- self.assertTrue(b("503: Service Unavailable") in response.body)
+ response = self.fetch("/default?status=503")
+ self.assertEqual(response.code, 503)
+ self.assertTrue(b("503: Service Unavailable") in response.body)
def test_write_error(self):
- response = self.fetch("/write_error")
- self.assertEqual(response.code, 500)
- self.assertEqual(b("Exception: ZeroDivisionError"), response.body)
+ with ExpectLog(app_log, "Uncaught exception"):
+ response = self.fetch("/write_error")
+ self.assertEqual(response.code, 500)
+ self.assertEqual(b("Exception: ZeroDivisionError"), response.body)
- response = self.fetch("/write_error?status=503")
- self.assertEqual(response.code, 503)
- self.assertEqual(b("Status: 503"), response.body)
+ response = self.fetch("/write_error?status=503")
+ self.assertEqual(response.code, 503)
+ self.assertEqual(b("Status: 503"), response.body)
def test_get_error_html(self):
- response = self.fetch("/get_error_html")
- self.assertEqual(response.code, 500)
- self.assertEqual(b("Exception: ZeroDivisionError"), response.body)
+ with ExpectLog(app_log, "Uncaught exception"):
+ response = self.fetch("/get_error_html")
+ self.assertEqual(response.code, 500)
+ self.assertEqual(b("Exception: ZeroDivisionError"), response.body)
- response = self.fetch("/get_error_html?status=503")
- self.assertEqual(response.code, 503)
- self.assertEqual(b("Status: 503"), response.body)
+ response = self.fetch("/get_error_html?status=503")
+ self.assertEqual(response.code, 503)
+ self.assertEqual(b("Status: 503"), response.body)
def test_failed_write_error(self):
- response = self.fetch("/failed_write_error")
- self.assertEqual(response.code, 500)
- self.assertEqual(b(""), response.body)
+ with ExpectLog(app_log, "Uncaught exception"):
+ response = self.fetch("/failed_write_error")
+ self.assertEqual(response.code, 500)
+ self.assertEqual(b(""), response.body)
class StaticFileTest(AsyncHTTPTestCase):
self.assertTrue('Last-Modified' not in response2.headers)
-class CustomStaticFileTest(AsyncHTTPTestCase, LogTrapTestCase):
+class CustomStaticFileTest(AsyncHTTPTestCase):
def get_app(self):
class MyStaticFileHandler(StaticFileHandler):
def get(self, path):
self.assertEqual(response.body, b("bar"))
def test_static_url(self):
- response = self.fetch("/static_url/foo.txt")
- self.assertEqual(response.body, b("/static/foo.42.txt"))
+ with ExpectLog(gen_log, "Could not open static file"):
+ response = self.fetch("/static_url/foo.txt")
+ self.assertEqual(response.body, b("/static/foo.42.txt"))
class NamedURLSpecGroupsTest(AsyncHTTPTestCase):
import contextlib
import logging
import os
+import re
import signal
import sys
import time
handler.stream = old_stream
+class ExpectLog(logging.Filter):
+ def __init__(self, logger, regex, required=True):
+ if isinstance(logger, basestring):
+ logger = logging.getLogger(logger)
+ self.logger = logger # may be either a Logger or a Handler
+ self.regex = re.compile(regex)
+ self.required = required
+ self.matched = False
+
+ def filter(self, record):
+ message = record.getMessage()
+ if self.regex.match(message):
+ self.matched = True
+ return False
+ return True
+
+ def __enter__(self):
+ self.logger.addFilter(self)
+
+ def __exit__(self, typ, value, tb):
+ self.logger.removeFilter(self)
+ if not typ and self.required and not self.matched:
+ raise Exception("did not get expected log message")
+
+
def main(**kwargs):
"""A simple test runner.