From: Codeb Fan Date: Wed, 21 Aug 2013 06:56:29 +0000 (+0800) Subject: Add timeout for requests in SimpleAsyncHTTPClient's queue. X-Git-Tag: v3.2.0b1~52^2~2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e48c794c3408ef5e8ca255297f3c5af3d0ca7a35;p=thirdparty%2Ftornado.git Add timeout for requests in SimpleAsyncHTTPClient's queue. The connect_timeout and request_timeout in HTTPRequest only matter when the _HTTPConnection get created. In heavy-loadded situation requests in SimpleAsyncHTTPClient may have already timeouted before they pop out from queue and get actual processed. This commit adds an timeout callback for every request appended to SimpleAsyncHTTPClient's request queue that cannot be processed immediately. --- diff --git a/tornado/httpclient.py b/tornado/httpclient.py index 676758942..91dc2db8b 100644 --- a/tornado/httpclient.py +++ b/tornado/httpclient.py @@ -241,6 +241,7 @@ class HTTPRequest(object): _DEFAULTS = dict( connect_timeout=20.0, request_timeout=20.0, + async_timeout=20.0, follow_redirects=True, max_redirects=5, use_gzip=True, @@ -251,6 +252,7 @@ class HTTPRequest(object): def __init__(self, url, method="GET", headers=None, body=None, auth_username=None, auth_password=None, auth_mode=None, connect_timeout=None, request_timeout=None, + async_timeout=None, if_modified_since=None, follow_redirects=None, max_redirects=None, user_agent=None, use_gzip=None, network_interface=None, streaming_callback=None, @@ -340,6 +342,7 @@ class HTTPRequest(object): self.auth_mode = auth_mode self.connect_timeout = connect_timeout self.request_timeout = request_timeout + self.async_timeout = async_timeout self.follow_redirects = follow_redirects self.max_redirects = max_redirects self.user_agent = user_agent diff --git a/tornado/simple_httpclient.py b/tornado/simple_httpclient.py index d8dbb271a..75cb9ff7b 100644 --- a/tornado/simple_httpclient.py +++ b/tornado/simple_httpclient.py @@ -72,6 +72,7 @@ class SimpleAsyncHTTPClient(AsyncHTTPClient): self.max_clients = max_clients self.queue = collections.deque() self.active = {} + self.waiting = {} self.max_buffer_size = max_buffer_size if resolver: self.resolver = resolver @@ -89,7 +90,13 @@ class SimpleAsyncHTTPClient(AsyncHTTPClient): self.resolver.close() def fetch_impl(self, request, callback): - self.queue.append((request, callback)) + key = object() + self.queue.append((key, request, callback)) + if not len(self.active) < self.max_clients: + timeout_handle = self.io_loop.add_timeout( + request.start_time + request.async_timeout, + functools.partial(self._on_timeout, key)) + self.waiting[key] = (request, callback, timeout_handle) self._process_queue() if self.queue: gen_log.debug("max_clients limit reached, request queued. " @@ -99,8 +106,8 @@ class SimpleAsyncHTTPClient(AsyncHTTPClient): def _process_queue(self): with stack_context.NullContext(): while self.queue and len(self.active) < self.max_clients: - request, callback = self.queue.popleft() - key = object() + key, request, callback = self.queue.popleft() + self._remove_timeout(key) self.active[key] = (request, callback) release_callback = functools.partial(self._release_fetch, key) self._handle_request(request, release_callback, callback) @@ -113,6 +120,21 @@ class SimpleAsyncHTTPClient(AsyncHTTPClient): del self.active[key] self._process_queue() + def _remove_timeout(self, key): + if key in self.waiting: + request, callback, timeout_handle = self.waiting[key] + self.io_loop.remove_timeout(timeout_handle) + del self.waiting[key] + + def _on_timeout(self, key): + request, callback, timeout_handle = self.waiting[key] + self.queue.remove((key, request, callback)) + timeout_response = HTTPResponse( + request, 599, error=HTTPError(599, "Timeout"), + request_time=self.io_loop.time() - request.start_time) + self.io_loop.add_callback(callback, timeout_response) + del self.waiting[key] + class _HTTPConnection(object): _SUPPORTED_METHODS = set(["GET", "HEAD", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])