exc_info=True)
+class AsyncHTTPClient2(object):
+ """Alternate implementation of AsyncHTTPClient.
+
+ This class has the same interface as AsyncHTTPClient (so see that class
+ for usage documentation) but is implemented with a different set of
+ libcurl APIs (curl_multi_socket_action instead of fdset/perform).
+ This implementation will likely become the default in the future, but
+ for now should be considered somewhat experimental.
+
+ The main advantage of this class over the original implementation is
+ that it is immune to the fd > 1024 bug, so applications with a large
+ number of simultaneous requests (e.g. long-polling) may prefer this
+ version.
+
+ Known bugs:
+ * Timeouts connecting to localhost
+ In some situations, this implementation will return a connection
+ timeout when the old implementation would be able to connect. This
+ has only been observed when connecting to localhost when using
+ the kqueue-based IOLoop (mac/bsd), but it may also occur on epoll (linux)
+ and, in principle, for non-localhost sites.
+ While the bug is unrelated to IPv6, disabling IPv6 will avoid the
+ most common manifestations of the bug (use a prepare_curl_callback that
+ calls curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)).
+ The underlying cause is a libcurl bug that has been confirmed to be
+ present in versions 7.20.0 and 7.21.0:
+ http://sourceforge.net/tracker/?func=detail&aid=3017819&group_id=976&atid=100976
+ """
+ _ASYNC_CLIENTS = weakref.WeakKeyDictionary()
+
+ def __new__(cls, io_loop=None, max_clients=10,
+ max_simultaneous_connections=None):
+ # There is one client per IOLoop since they share curl instances
+ io_loop = io_loop or ioloop.IOLoop.instance()
+ if io_loop in cls._ASYNC_CLIENTS:
+ return cls._ASYNC_CLIENTS[io_loop]
+ else:
+ instance = super(AsyncHTTPClient2, cls).__new__(cls)
+ instance.io_loop = io_loop
+ instance._multi = pycurl.CurlMulti()
+ instance._multi.setopt(pycurl.M_TIMERFUNCTION,
+ instance._handle_timer)
+ instance._multi.setopt(pycurl.M_SOCKETFUNCTION,
+ instance._handle_socket)
+ instance._curls = [_curl_create(max_simultaneous_connections)
+ for i in xrange(max_clients)]
+ instance._free_list = instance._curls[:]
+ instance._requests = collections.deque()
+ instance._fds = {}
+ cls._ASYNC_CLIENTS[io_loop] = instance
+ return instance
+
+ def close(self):
+ """Destroys this http client, freeing any file descriptors used.
+ Not needed in normal use, but may be helpful in unittests that
+ create and destroy http clients. No other methods may be called
+ on the AsyncHTTPClient after close().
+ """
+ del AsyncHTTPClient2._ASYNC_CLIENTS[self.io_loop]
+ for curl in self._curls:
+ curl.close()
+ self._multi.close()
+ self._closed = True
+
+ def fetch(self, request, callback, **kwargs):
+ """Executes an HTTPRequest, calling callback with an HTTPResponse.
+
+ If an error occurs during the fetch, the HTTPResponse given to the
+ callback has a non-None error attribute that contains the exception
+ encountered during the request. You can call response.reraise() to
+ throw the exception (if any) in the callback.
+ """
+ if not isinstance(request, HTTPRequest):
+ request = HTTPRequest(url=request, **kwargs)
+ self._requests.append((request, callback))
+ self._process_queue()
+ self.io_loop.add_callback(self._handle_timeout)
+
+ def _handle_socket(self, event, fd, multi, data):
+ """Called by libcurl when it wants to change the file descriptors
+ it cares about.
+ """
+ event_map = {
+ pycurl.POLL_NONE: ioloop.IOLoop.NONE,
+ pycurl.POLL_IN: ioloop.IOLoop.READ,
+ pycurl.POLL_OUT: ioloop.IOLoop.WRITE,
+ pycurl.POLL_INOUT: ioloop.IOLoop.READ | ioloop.IOLoop.WRITE
+ }
+ if event == pycurl.POLL_REMOVE:
+ self.io_loop.remove_handler(fd)
+ del self._fds[fd]
+ else:
+ ioloop_event = event_map[event]
+ if fd not in self._fds:
+ self._fds[fd] = ioloop_event
+ self.io_loop.add_handler(fd, self._handle_events,
+ ioloop_event)
+ else:
+ self._fds[fd] = ioloop_event
+ self.io_loop.update_handler(fd, ioloop_event)
+
+ def _handle_timer(self, msecs):
+ """Called by libcurl to schedule a timeout."""
+ self.io_loop.add_timeout(
+ time.time() + msecs/1000.0, self._handle_timeout)
+
+ def _handle_events(self, fd, events):
+ """Called by IOLoop when there is activity on one of our
+ file descriptors.
+ """
+ action = 0
+ if events & ioloop.IOLoop.READ: action |= pycurl.CSELECT_IN
+ if events & ioloop.IOLoop.WRITE: action |= pycurl.CSELECT_OUT
+ while True:
+ try:
+ ret, num_handles = self._multi.socket_action(fd, action)
+ except Exception, e:
+ ret = e[0]
+ if ret != pycurl.E_CALL_MULTI_PERFORM:
+ break
+ self._finish_pending_requests()
+
+ def _handle_timeout(self):
+ """Called by IOLoop when the requested timeout has passed."""
+ while True:
+ try:
+ ret, num_handles = self._multi.socket_action(
+ pycurl.SOCKET_TIMEOUT, 0)
+ except Exception, e:
+ ret = e[0]
+ if ret != pycurl.E_CALL_MULTI_PERFORM:
+ break
+ self._finish_pending_requests()
+
+ def _finish_pending_requests(self):
+ """Process any requests that were completed by the last
+ call to multi.socket_action.
+ """
+ while True:
+ num_q, ok_list, err_list = self._multi.info_read()
+ for curl in ok_list:
+ self._finish(curl)
+ for curl, errnum, errmsg in err_list:
+ self._finish(curl, errnum, errmsg)
+ if num_q == 0:
+ break
+ self._process_queue()
+
+ def _process_queue(self):
+ while True:
+ started = 0
+ while self._free_list and self._requests:
+ started += 1
+ curl = self._free_list.pop()
+ (request, callback) = self._requests.popleft()
+ curl.info = {
+ "headers": {},
+ "buffer": cStringIO.StringIO(),
+ "request": request,
+ "callback": callback,
+ "start_time": time.time(),
+ }
+ _curl_setup_request(curl, request, curl.info["buffer"],
+ curl.info["headers"])
+ self._multi.add_handle(curl)
+
+ if not started:
+ break
+
+ def _finish(self, curl, curl_error=None, curl_message=None):
+ info = curl.info
+ curl.info = None
+ self._multi.remove_handle(curl)
+ self._free_list.append(curl)
+ buffer = info["buffer"]
+ if curl_error:
+ error = CurlError(curl_error, curl_message)
+ code = error.code
+ effective_url = None
+ buffer.close()
+ buffer = None
+ else:
+ error = None
+ code = curl.getinfo(pycurl.HTTP_CODE)
+ effective_url = curl.getinfo(pycurl.EFFECTIVE_URL)
+ buffer.seek(0)
+ try:
+ info["callback"](HTTPResponse(
+ request=info["request"], code=code, headers=info["headers"],
+ buffer=buffer, effective_url=effective_url, error=error,
+ request_time=time.time() - info["start_time"]))
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except:
+ logging.error("Exception in callback %r", info["callback"],
+ exc_info=True)
+
+
class HTTPRequest(object):
def __init__(self, url, method="GET", headers={}, body=None,
auth_username=None, auth_password=None,