Add gen.with_timeout wrapper.
def chain_future(a, b):
"""Chain two futures together so that when one completes, so does the other.
- The result (success or failure) of ``a`` will be copied to ``b``.
+ The result (success or failure) of ``a`` will be copied to ``b``, unless
+ ``b`` has already been completed or cancelled by the time ``a`` finishes.
"""
def copy(future):
assert future is a
+ if b.done():
+ return
if (isinstance(a, TracebackFuture) and isinstance(b, TracebackFuture)
and a.exc_info() is not None):
b.set_exc_info(a.exc_info())
import sys
import types
-from tornado.concurrent import Future, TracebackFuture, is_future
+from tornado.concurrent import Future, TracebackFuture, is_future, chain_future
from tornado.ioloop import IOLoop
from tornado import stack_context
pass
+class TimeoutError(Exception):
+ """Exception raised by ``with_timeout``."""
+
+
def engine(func):
"""Callback-oriented decorator for asynchronous generators.
return fut
+def with_timeout(timeout, future, io_loop=None):
+ """Wraps a `.Future` in a timeout.
+
+ Raises `TimeoutError` if the input future does not complete before
+ ``timeout``, which may be specified in any form allowed by
+ `.IOLoop.add_timeout` (i.e. a `datetime.timedelta` or an absolute time
+ relative to `.IOLoop.time`)
+
+ Currently only supports Futures, not other `YieldPoint` classes.
+ """
+ # TODO: allow yield points in addition to futures?
+ # Tricky to do with stack_context semantics.
+ #
+ # It would be more efficient to cancel the input future on timeout instead
+ # of creating a new one, but we can't know if we are the only one waiting
+ # on the input future, so cancelling it might disrupt other callers.
+ result = Future()
+ chain_future(future, result)
+ if io_loop is None:
+ io_loop = IOLoop.current()
+ timeout_handle = io_loop.add_timeout(
+ timeout,
+ lambda: result.set_exception(TimeoutError("Timeout")))
+ io_loop.add_future(future,
+ lambda future: io_loop.remove_timeout(timeout_handle))
+ return result
+
+
_null_future = Future()
_null_future.set_result(None)
from __future__ import absolute_import, division, print_function, with_statement
+import datetime
import socket
from tornado.concurrent import Future
"""
def __init__(self, stream, address, is_client,
no_keep_alive=False, protocol=None, chunk_size=None,
- max_header_size=None):
+ max_header_size=None, header_timeout=None):
self.is_client = is_client
self.stream = stream
self.address = address
self.protocol = "http"
self._chunk_size = chunk_size or 65536
self._max_header_size = max_header_size or 65536
+ self._header_timeout = header_timeout
self._disconnect_on_finish = False
self._clear_request_state()
self.stream.set_close_callback(self._on_connection_close)
assert isinstance(delegate, httputil.HTTPMessageDelegate)
self.message_delegate = delegate
try:
- header_data = yield self.stream.read_until_regex(
- b"\r?\n\r?\n",
- max_bytes=self._max_header_size)
+ header_future = self.stream.read_until_regex(
+ b"\r?\n\r?\n",
+ max_bytes=self._max_header_size)
+ if self._header_timeout is None:
+ header_data = yield header_future
+ else:
+ try:
+ header_data = yield gen.with_timeout(
+ datetime.timedelta(seconds=self._header_timeout),
+ header_future,
+ io_loop=self.stream.io_loop)
+ except gen.TimeoutError:
+ self.close()
+ raise gen.Return(False)
self._reading = True
self._finish_future = Future()
start_line, headers = self._parse_headers(header_data)
"""
def __init__(self, request_callback, no_keep_alive=False, io_loop=None,
xheaders=False, ssl_options=None, protocol=None, gzip=False,
- chunk_size=None, max_header_size=None, **kwargs):
+ chunk_size=None, max_header_size=None,
+ idle_connection_timeout=None, **kwargs):
self.request_callback = request_callback
self.no_keep_alive = no_keep_alive
self.xheaders = xheaders
self.gzip = gzip
self.chunk_size = chunk_size
self.max_header_size = max_header_size
+ self.idle_connection_timeout = idle_connection_timeout or 3600
TCPServer.__init__(self, io_loop=io_loop, ssl_options=ssl_options,
**kwargs)
def handle_stream(self, stream, address):
- conn = HTTP1Connection(stream, address=address, is_client=False,
- no_keep_alive=self.no_keep_alive,
- protocol=self.protocol,
- chunk_size=self.chunk_size,
- max_header_size=self.max_header_size)
+ conn = HTTP1Connection(
+ stream, address=address, is_client=False,
+ no_keep_alive=self.no_keep_alive,
+ protocol=self.protocol,
+ chunk_size=self.chunk_size,
+ max_header_size=self.max_header_size,
+ header_timeout=self.idle_connection_timeout)
conn.start_serving(self, gzip=self.gzip)
def start_request(self, connection):
from __future__ import absolute_import, division, print_function, with_statement
import contextlib
+import datetime
import functools
import sys
import textwrap
import platform
import weakref
-from tornado.concurrent import return_future
+from tornado.concurrent import return_future, Future
from tornado.escape import url_escape
from tornado.httpclient import AsyncHTTPClient
from tornado.ioloop import IOLoop
response = self.fetch('/async_prepare_error')
self.assertEqual(response.code, 403)
+
+class WithTimeoutTest(AsyncTestCase):
+ @gen_test
+ def test_timeout(self):
+ with self.assertRaises(gen.TimeoutError):
+ yield gen.with_timeout(datetime.timedelta(seconds=0.1),
+ Future())
+
+ @gen_test
+ def test_completes_before_timeout(self):
+ future = Future()
+ self.io_loop.add_timeout(datetime.timedelta(seconds=0.1),
+ lambda: future.set_result('asdf'))
+ result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
+ future)
+ self.assertEqual(result, 'asdf')
+
+ @gen_test
+ def test_fails_before_timeout(self):
+ future = Future()
+ self.io_loop.add_timeout(
+ datetime.timedelta(seconds=0.1),
+ lambda: future.set_exception(ZeroDivisionError))
+ with self.assertRaises(ZeroDivisionError):
+ yield gen.with_timeout(datetime.timedelta(seconds=3600), future)
+
+ @gen_test
+ def test_already_resolved(self):
+ future = Future()
+ future.set_result('asdf')
+ result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
+ future)
+ self.assertEqual(result, 'asdf')
+
+
if __name__ == '__main__':
unittest.main()
from tornado.netutil import ssl_options_to_context
from tornado.simple_httpclient import SimpleAsyncHTTPClient
from tornado.testing import AsyncHTTPTestCase, AsyncHTTPSTestCase, AsyncTestCase, ExpectLog
-from tornado.test.util import unittest
+from tornado.test.util import unittest, skipOnTravis
from tornado.util import u, bytes_type
from tornado.web import Application, RequestHandler, asynchronous
from contextlib import closing
with ExpectLog(gen_log, "Unsatisfiable read"):
response = self.fetch("/", headers={'X-Filler': 'a' * 1000})
self.assertEqual(response.code, 599)
+
+
+@skipOnTravis
+class IdleTimeoutTest(AsyncHTTPTestCase):
+ def get_app(self):
+ return Application([('/', HelloWorldRequestHandler)])
+
+ def get_httpserver_options(self):
+ return dict(idle_connection_timeout=0.1)
+
+ def setUp(self):
+ super(IdleTimeoutTest, self).setUp()
+ self.streams = []
+
+ def tearDown(self):
+ super(IdleTimeoutTest, self).tearDown()
+ for stream in self.streams:
+ stream.close()
+
+ def connect(self):
+ stream = IOStream(socket.socket())
+ stream.connect(('localhost', self.get_http_port()), self.stop)
+ self.wait()
+ self.streams.append(stream)
+ return stream
+
+ def test_unused_connection(self):
+ stream = self.connect()
+ stream.set_close_callback(self.stop)
+ self.wait()
+
+ def test_idle_after_use(self):
+ stream = self.connect()
+ stream.set_close_callback(lambda: self.stop("closed"))
+
+ # Use the connection twice to make sure keep-alives are working
+ for i in range(2):
+ stream.write(b"GET / HTTP/1.1\r\n\r\n")
+ stream.read_until(b"\r\n\r\n", self.stop)
+ self.wait()
+ stream.read_bytes(11, self.stop)
+ data = self.wait()
+ self.assertEqual(data, b"Hello world")
+
+ # Now let the timeout trigger and close the connection.
+ data = self.wait()
+ self.assertEqual(data, "closed")