_DEFAULTS = dict(
connect_timeout=20.0,
request_timeout=20.0,
+ async_timeout=20.0,
follow_redirects=True,
max_redirects=5,
use_gzip=True,
def __init__(self, url, method="GET", headers=None, body=None,
auth_username=None, auth_password=None, auth_mode=None,
connect_timeout=None, request_timeout=None,
+ async_timeout=None,
if_modified_since=None, follow_redirects=None,
max_redirects=None, user_agent=None, use_gzip=None,
network_interface=None, streaming_callback=None,
self.auth_mode = auth_mode
self.connect_timeout = connect_timeout
self.request_timeout = request_timeout
+ self.async_timeout = async_timeout
self.follow_redirects = follow_redirects
self.max_redirects = max_redirects
self.user_agent = user_agent
self.max_clients = max_clients
self.queue = collections.deque()
self.active = {}
+ self.waiting = {}
self.max_buffer_size = max_buffer_size
if resolver:
self.resolver = resolver
self.resolver.close()
def fetch_impl(self, request, callback):
- self.queue.append((request, callback))
+ key = object()
+ self.queue.append((key, request, callback))
+ if not len(self.active) < self.max_clients:
+ timeout_handle = self.io_loop.add_timeout(
+ request.start_time + request.async_timeout,
+ functools.partial(self._on_timeout, key))
+ self.waiting[key] = (request, callback, timeout_handle)
self._process_queue()
if self.queue:
gen_log.debug("max_clients limit reached, request queued. "
def _process_queue(self):
with stack_context.NullContext():
while self.queue and len(self.active) < self.max_clients:
- request, callback = self.queue.popleft()
- key = object()
+ key, request, callback = self.queue.popleft()
+ self._remove_timeout(key)
self.active[key] = (request, callback)
release_callback = functools.partial(self._release_fetch, key)
self._handle_request(request, release_callback, callback)
del self.active[key]
self._process_queue()
+ def _remove_timeout(self, key):
+ if key in self.waiting:
+ request, callback, timeout_handle = self.waiting[key]
+ self.io_loop.remove_timeout(timeout_handle)
+ del self.waiting[key]
+
+ def _on_timeout(self, key):
+ request, callback, timeout_handle = self.waiting[key]
+ self.queue.remove((key, request, callback))
+ timeout_response = HTTPResponse(
+ request, 599, error=HTTPError(599, "Timeout"),
+ request_time=self.io_loop.time() - request.start_time)
+ self.io_loop.add_callback(callback, timeout_response)
+ del self.waiting[key]
+
class _HTTPConnection(object):
_SUPPORTED_METHODS = set(["GET", "HEAD", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])