instance = super(AsyncHTTPClient, cls).__new__(cls)
instance.io_loop = io_loop
instance._multi = pycurl.CurlMulti()
- instance._curls = [_curl_create(max_simultaneous_connections)
- for i in xrange(max_clients)]
- instance._free_list = instance._curls[:]
- instance._requests = collections.deque()
- instance._fds = {}
- instance._events = {}
- instance._added_perform_callback = False
- instance._timeout = None
- instance._closed = False
- cls._ASYNC_CLIENTS[io_loop] = instance
- return instance
-
- def close(self):
- """Destroys this http client, freeing any file descriptors used.
- Not needed in normal use, but may be helpful in unittests that
- create and destroy http clients. No other methods may be called
- on the AsyncHTTPClient after close().
- """
- del AsyncHTTPClient._ASYNC_CLIENTS[self.io_loop]
- for curl in self._curls:
- curl.close()
- self._multi.close()
- self._closed = True
-
- def fetch(self, request, callback, **kwargs):
- """Executes an HTTPRequest, calling callback with an HTTPResponse.
-
- If an error occurs during the fetch, the HTTPResponse given to the
- callback has a non-None error attribute that contains the exception
- encountered during the request. You can call response.reraise() to
- throw the exception (if any) in the callback.
- """
- if not isinstance(request, HTTPRequest):
- request = HTTPRequest(url=request, **kwargs)
- self._requests.append((request, callback))
- self._add_perform_callback()
-
- def _add_perform_callback(self):
- if not self._added_perform_callback:
- self.io_loop.add_callback(self._perform)
- self._added_perform_callback = True
-
- def _handle_events(self, fd, events):
- self._events[fd] = events
- self._add_perform_callback()
-
- def _handle_timeout(self):
- self._timeout = None
- self._perform()
-
- def _perform(self):
- self._added_perform_callback = False
-
- if self._closed:
- return
-
- while True:
- while True:
- ret, num_handles = self._multi.perform()
- if ret != pycurl.E_CALL_MULTI_PERFORM:
- break
-
- # Update the set of active file descriptors. It is important
- # that this happen immediately after perform() because
- # fds that have been removed from fdset are free to be reused
- # in user callbacks.
- fds = {}
- (readable, writable, exceptable) = self._multi.fdset()
- for fd in readable:
- fds[fd] = fds.get(fd, 0) | 0x1 | 0x2
- for fd in writable:
- fds[fd] = fds.get(fd, 0) | 0x4
- for fd in exceptable:
- fds[fd] = fds.get(fd, 0) | 0x8 | 0x10
-
- if fds and max(fds.iterkeys()) > 900:
- # Libcurl has a bug in which it behaves unpredictably with
- # file descriptors greater than 1024. (This is because
- # even though it uses poll() instead of select(), it still
- # uses FD_SET internally) Since curl opens its own file
- # descriptors we can't catch this problem when it happens,
- # and the best we can do is detect that it's about to
- # happen. Exiting is a lousy way to handle this error,
- # but there's not much we can do at this point. Exiting
- # (and getting restarted by whatever monitoring process
- # is handling crashed tornado processes) will at least
- # get things working again and hopefully bring the issue
- # to someone's attention.
- # If you run into this issue, you either have a file descriptor
- # leak or need to run more tornado processes (so that none
- # of them are handling more than 1000 simultaneous connections)
- print >> sys.stderr, "ERROR: File descriptor too high for libcurl. Exiting."
- logging.error("File descriptor too high for libcurl. Exiting.")
- sys.exit(1)
-
- for fd in self._fds:
- if fd not in fds:
- try:
- self.io_loop.remove_handler(fd)
- except (OSError, IOError), e:
- if e[0] != errno.ENOENT:
- raise
-
- for fd, events in fds.iteritems():
- old_events = self._fds.get(fd, None)
- if old_events is None:
- self.io_loop.add_handler(fd, self._handle_events, events)
- elif old_events != events:
- try:
- self.io_loop.update_handler(fd, events)
- except (OSError, IOError), e:
- if e[0] == errno.ENOENT:
- self.io_loop.add_handler(fd, self._handle_events,
- events)
- else:
- raise
- self._fds = fds
-
-
- # Handle completed fetches
- completed = 0
- while True:
- num_q, ok_list, err_list = self._multi.info_read()
- for curl in ok_list:
- self._finish(curl)
- completed += 1
- for curl, errnum, errmsg in err_list:
- self._finish(curl, errnum, errmsg)
- completed += 1
- if num_q == 0:
- break
-
- # Start fetching new URLs
- started = 0
- while self._free_list and self._requests:
- started += 1
- curl = self._free_list.pop()
- (request, callback) = self._requests.popleft()
- curl.info = {
- "headers": httputil.HTTPHeaders(),
- "buffer": cStringIO.StringIO(),
- "request": request,
- "callback": callback,
- "start_time": time.time(),
- }
- _curl_setup_request(curl, request, curl.info["buffer"],
- curl.info["headers"])
- self._multi.add_handle(curl)
-
- if not started and not completed:
- break
-
- if self._timeout is not None:
- self.io_loop.remove_timeout(self._timeout)
- self._timeout = None
-
- if num_handles:
- self._timeout = self.io_loop.add_timeout(
- time.time() + 0.2, self._handle_timeout)
-
-
- def _finish(self, curl, curl_error=None, curl_message=None):
- info = curl.info
- curl.info = None
- self._multi.remove_handle(curl)
- self._free_list.append(curl)
- buffer = info["buffer"]
- if curl_error:
- error = CurlError(curl_error, curl_message)
- code = error.code
- body = None
- effective_url = None
- buffer.close()
- buffer = None
- else:
- error = None
- code = curl.getinfo(pycurl.HTTP_CODE)
- effective_url = curl.getinfo(pycurl.EFFECTIVE_URL)
- buffer.seek(0)
- try:
- info["callback"](HTTPResponse(
- request=info["request"], code=code, headers=info["headers"],
- buffer=buffer, effective_url=effective_url, error=error,
- request_time=time.time() - info["start_time"]))
- except (KeyboardInterrupt, SystemExit):
- raise
- except:
- logging.error("Exception in callback %r", info["callback"],
- exc_info=True)
-
-
-class AsyncHTTPClient2(object):
- """Alternate implementation of AsyncHTTPClient.
-
- This class has the same interface as AsyncHTTPClient (so see that class
- for usage documentation) but is implemented with a different set of
- libcurl APIs (curl_multi_socket_action instead of fdset/perform).
- This implementation will likely become the default in the future, but
- for now should be considered somewhat experimental.
-
- The main advantage of this class over the original implementation is
- that it is immune to the fd > 1024 bug, so applications with a large
- number of simultaneous requests (e.g. long-polling) may prefer this
- version.
-
- Known bugs:
- * Timeouts connecting to localhost
- In some situations, this implementation will return a connection
- timeout when the old implementation would be able to connect. This
- has only been observed when connecting to localhost when using
- the kqueue-based IOLoop (mac/bsd), but it may also occur on epoll (linux)
- and, in principle, for non-localhost sites.
- While the bug is unrelated to IPv6, disabling IPv6 will avoid the
- most common manifestations of the bug, so this class disables IPv6 when
- it detects an affected version of libcurl.
- The underlying cause is a libcurl bug in versions up to and including
- 7.21.0 (it will be fixed in the not-yet-released 7.21.1)
- http://sourceforge.net/tracker/?func=detail&aid=3017819&group_id=976&atid=100976
- """
- _ASYNC_CLIENTS = weakref.WeakKeyDictionary()
-
- def __new__(cls, io_loop=None, max_clients=10,
- max_simultaneous_connections=None):
- # There is one client per IOLoop since they share curl instances
- io_loop = io_loop or ioloop.IOLoop.instance()
- if io_loop in cls._ASYNC_CLIENTS:
- return cls._ASYNC_CLIENTS[io_loop]
- else:
- instance = super(AsyncHTTPClient2, cls).__new__(cls)
- instance.io_loop = io_loop
- instance._multi = pycurl.CurlMulti()
instance._multi.setopt(pycurl.M_TIMERFUNCTION,
instance._set_timeout)
instance._multi.setopt(pycurl.M_SOCKETFUNCTION,
create and destroy http clients. No other methods may be called
on the AsyncHTTPClient after close().
"""
- del AsyncHTTPClient2._ASYNC_CLIENTS[self.io_loop]
+ del AsyncHTTPClient._ASYNC_CLIENTS[self.io_loop]
for curl in self._curls:
curl.close()
self._multi.close()
logging.error("Exception in callback %r", info["callback"],
exc_info=True)
+# For backwards compatibility: Tornado 1.0 included a new implementation of
+# AsyncHTTPClient that has since replaced the original. Define an alias
+# so anything that used AsyncHTTPClient2 still works
+AsyncHTTPClient2 = AsyncHTTPClient
class HTTPRequest(object):
def __init__(self, url, method="GET", headers=None, body=None,