]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Add timeout for requests in SimpleAsyncHTTPClient's queue.
authorCodeb Fan <codeb2cc@gmail.com>
Wed, 21 Aug 2013 06:56:29 +0000 (14:56 +0800)
committerCodeb Fan <codeb2cc@gmail.com>
Wed, 21 Aug 2013 06:56:29 +0000 (14:56 +0800)
The connect_timeout and request_timeout in HTTPRequest only matter when
the _HTTPConnection get created. In heavy-loadded situation requests in
SimpleAsyncHTTPClient may have already timeouted before they pop out
from queue and get actual processed. This commit adds an timeout
callback for every request appended to SimpleAsyncHTTPClient's request
queue that cannot be processed immediately.

tornado/httpclient.py
tornado/simple_httpclient.py

index 676758942e53be1ebcfc28d553d4fe1eedee86ec..91dc2db8b5373b6ca485d396003284fa7aad7ce3 100644 (file)
@@ -241,6 +241,7 @@ class HTTPRequest(object):
     _DEFAULTS = dict(
         connect_timeout=20.0,
         request_timeout=20.0,
+        async_timeout=20.0,
         follow_redirects=True,
         max_redirects=5,
         use_gzip=True,
@@ -251,6 +252,7 @@ class HTTPRequest(object):
     def __init__(self, url, method="GET", headers=None, body=None,
                  auth_username=None, auth_password=None, auth_mode=None,
                  connect_timeout=None, request_timeout=None,
+                 async_timeout=None,
                  if_modified_since=None, follow_redirects=None,
                  max_redirects=None, user_agent=None, use_gzip=None,
                  network_interface=None, streaming_callback=None,
@@ -340,6 +342,7 @@ class HTTPRequest(object):
         self.auth_mode = auth_mode
         self.connect_timeout = connect_timeout
         self.request_timeout = request_timeout
+        self.async_timeout = async_timeout
         self.follow_redirects = follow_redirects
         self.max_redirects = max_redirects
         self.user_agent = user_agent
index d8dbb271a6d11c0b191a7c733ef5136d1e65ed18..75cb9ff7b849c085ee8a609c6420f0fd2ab0939b 100644 (file)
@@ -72,6 +72,7 @@ class SimpleAsyncHTTPClient(AsyncHTTPClient):
         self.max_clients = max_clients
         self.queue = collections.deque()
         self.active = {}
+        self.waiting = {}
         self.max_buffer_size = max_buffer_size
         if resolver:
             self.resolver = resolver
@@ -89,7 +90,13 @@ class SimpleAsyncHTTPClient(AsyncHTTPClient):
             self.resolver.close()
 
     def fetch_impl(self, request, callback):
-        self.queue.append((request, callback))
+        key = object()
+        self.queue.append((key, request, callback))
+        if not len(self.active) < self.max_clients:
+            timeout_handle = self.io_loop.add_timeout(
+                    request.start_time + request.async_timeout,
+                    functools.partial(self._on_timeout, key))
+            self.waiting[key] = (request, callback, timeout_handle)
         self._process_queue()
         if self.queue:
             gen_log.debug("max_clients limit reached, request queued. "
@@ -99,8 +106,8 @@ class SimpleAsyncHTTPClient(AsyncHTTPClient):
     def _process_queue(self):
         with stack_context.NullContext():
             while self.queue and len(self.active) < self.max_clients:
-                request, callback = self.queue.popleft()
-                key = object()
+                key, request, callback = self.queue.popleft()
+                self._remove_timeout(key)
                 self.active[key] = (request, callback)
                 release_callback = functools.partial(self._release_fetch, key)
                 self._handle_request(request, release_callback, callback)
@@ -113,6 +120,21 @@ class SimpleAsyncHTTPClient(AsyncHTTPClient):
         del self.active[key]
         self._process_queue()
 
+    def _remove_timeout(self, key):
+        if key in self.waiting:
+            request, callback, timeout_handle = self.waiting[key]
+            self.io_loop.remove_timeout(timeout_handle)
+            del self.waiting[key]
+
+    def _on_timeout(self, key):
+        request, callback, timeout_handle = self.waiting[key]
+        self.queue.remove((key, request, callback))
+        timeout_response = HTTPResponse(
+                request, 599, error=HTTPError(599, "Timeout"),
+                request_time=self.io_loop.time() - request.start_time)
+        self.io_loop.add_callback(callback, timeout_response)
+        del self.waiting[key]
+
 
 class _HTTPConnection(object):
     _SUPPORTED_METHODS = set(["GET", "HEAD", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])