From: Ben Darnell Date: Sun, 20 Jun 2010 02:38:02 +0000 (-0700) Subject: Add a second implementation of AsyncHTTPClient, using a new libcurl api. X-Git-Tag: v1.0.0~24 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=1ba0161e384eba2ec65a9cd5f5f4a677f7e98bd0;p=thirdparty%2Ftornado.git Add a second implementation of AsyncHTTPClient, using a new libcurl api. Adapted from Jacob Kristhammar's work: http://github.com/sris/tornado/blob/master/tornado/httpclient2.py --- diff --git a/tornado/httpclient.py b/tornado/httpclient.py index 91ac123de..730853863 100644 --- a/tornado/httpclient.py +++ b/tornado/httpclient.py @@ -300,6 +300,204 @@ class AsyncHTTPClient(object): exc_info=True) +class AsyncHTTPClient2(object): + """Alternate implementation of AsyncHTTPClient. + + This class has the same interface as AsyncHTTPClient (so see that class + for usage documentation) but is implemented with a different set of + libcurl APIs (curl_multi_socket_action instead of fdset/perform). + This implementation will likely become the default in the future, but + for now should be considered somewhat experimental. + + The main advantage of this class over the original implementation is + that it is immune to the fd > 1024 bug, so applications with a large + number of simultaneous requests (e.g. long-polling) may prefer this + version. + + Known bugs: + * Timeouts connecting to localhost + In some situations, this implementation will return a connection + timeout when the old implementation would be able to connect. This + has only been observed when connecting to localhost when using + the kqueue-based IOLoop (mac/bsd), but it may also occur on epoll (linux) + and, in principle, for non-localhost sites. + While the bug is unrelated to IPv6, disabling IPv6 will avoid the + most common manifestations of the bug (use a prepare_curl_callback that + calls curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)). + The underlying cause is a libcurl bug that has been confirmed to be + present in versions 7.20.0 and 7.21.0: + http://sourceforge.net/tracker/?func=detail&aid=3017819&group_id=976&atid=100976 + """ + _ASYNC_CLIENTS = weakref.WeakKeyDictionary() + + def __new__(cls, io_loop=None, max_clients=10, + max_simultaneous_connections=None): + # There is one client per IOLoop since they share curl instances + io_loop = io_loop or ioloop.IOLoop.instance() + if io_loop in cls._ASYNC_CLIENTS: + return cls._ASYNC_CLIENTS[io_loop] + else: + instance = super(AsyncHTTPClient2, cls).__new__(cls) + instance.io_loop = io_loop + instance._multi = pycurl.CurlMulti() + instance._multi.setopt(pycurl.M_TIMERFUNCTION, + instance._handle_timer) + instance._multi.setopt(pycurl.M_SOCKETFUNCTION, + instance._handle_socket) + instance._curls = [_curl_create(max_simultaneous_connections) + for i in xrange(max_clients)] + instance._free_list = instance._curls[:] + instance._requests = collections.deque() + instance._fds = {} + cls._ASYNC_CLIENTS[io_loop] = instance + return instance + + def close(self): + """Destroys this http client, freeing any file descriptors used. + Not needed in normal use, but may be helpful in unittests that + create and destroy http clients. No other methods may be called + on the AsyncHTTPClient after close(). + """ + del AsyncHTTPClient2._ASYNC_CLIENTS[self.io_loop] + for curl in self._curls: + curl.close() + self._multi.close() + self._closed = True + + def fetch(self, request, callback, **kwargs): + """Executes an HTTPRequest, calling callback with an HTTPResponse. + + If an error occurs during the fetch, the HTTPResponse given to the + callback has a non-None error attribute that contains the exception + encountered during the request. You can call response.reraise() to + throw the exception (if any) in the callback. + """ + if not isinstance(request, HTTPRequest): + request = HTTPRequest(url=request, **kwargs) + self._requests.append((request, callback)) + self._process_queue() + self.io_loop.add_callback(self._handle_timeout) + + def _handle_socket(self, event, fd, multi, data): + """Called by libcurl when it wants to change the file descriptors + it cares about. + """ + event_map = { + pycurl.POLL_NONE: ioloop.IOLoop.NONE, + pycurl.POLL_IN: ioloop.IOLoop.READ, + pycurl.POLL_OUT: ioloop.IOLoop.WRITE, + pycurl.POLL_INOUT: ioloop.IOLoop.READ | ioloop.IOLoop.WRITE + } + if event == pycurl.POLL_REMOVE: + self.io_loop.remove_handler(fd) + del self._fds[fd] + else: + ioloop_event = event_map[event] + if fd not in self._fds: + self._fds[fd] = ioloop_event + self.io_loop.add_handler(fd, self._handle_events, + ioloop_event) + else: + self._fds[fd] = ioloop_event + self.io_loop.update_handler(fd, ioloop_event) + + def _handle_timer(self, msecs): + """Called by libcurl to schedule a timeout.""" + self.io_loop.add_timeout( + time.time() + msecs/1000.0, self._handle_timeout) + + def _handle_events(self, fd, events): + """Called by IOLoop when there is activity on one of our + file descriptors. + """ + action = 0 + if events & ioloop.IOLoop.READ: action |= pycurl.CSELECT_IN + if events & ioloop.IOLoop.WRITE: action |= pycurl.CSELECT_OUT + while True: + try: + ret, num_handles = self._multi.socket_action(fd, action) + except Exception, e: + ret = e[0] + if ret != pycurl.E_CALL_MULTI_PERFORM: + break + self._finish_pending_requests() + + def _handle_timeout(self): + """Called by IOLoop when the requested timeout has passed.""" + while True: + try: + ret, num_handles = self._multi.socket_action( + pycurl.SOCKET_TIMEOUT, 0) + except Exception, e: + ret = e[0] + if ret != pycurl.E_CALL_MULTI_PERFORM: + break + self._finish_pending_requests() + + def _finish_pending_requests(self): + """Process any requests that were completed by the last + call to multi.socket_action. + """ + while True: + num_q, ok_list, err_list = self._multi.info_read() + for curl in ok_list: + self._finish(curl) + for curl, errnum, errmsg in err_list: + self._finish(curl, errnum, errmsg) + if num_q == 0: + break + self._process_queue() + + def _process_queue(self): + while True: + started = 0 + while self._free_list and self._requests: + started += 1 + curl = self._free_list.pop() + (request, callback) = self._requests.popleft() + curl.info = { + "headers": {}, + "buffer": cStringIO.StringIO(), + "request": request, + "callback": callback, + "start_time": time.time(), + } + _curl_setup_request(curl, request, curl.info["buffer"], + curl.info["headers"]) + self._multi.add_handle(curl) + + if not started: + break + + def _finish(self, curl, curl_error=None, curl_message=None): + info = curl.info + curl.info = None + self._multi.remove_handle(curl) + self._free_list.append(curl) + buffer = info["buffer"] + if curl_error: + error = CurlError(curl_error, curl_message) + code = error.code + effective_url = None + buffer.close() + buffer = None + else: + error = None + code = curl.getinfo(pycurl.HTTP_CODE) + effective_url = curl.getinfo(pycurl.EFFECTIVE_URL) + buffer.seek(0) + try: + info["callback"](HTTPResponse( + request=info["request"], code=code, headers=info["headers"], + buffer=buffer, effective_url=effective_url, error=error, + request_time=time.time() - info["start_time"])) + except (KeyboardInterrupt, SystemExit): + raise + except: + logging.error("Exception in callback %r", info["callback"], + exc_info=True) + + class HTTPRequest(object): def __init__(self, url, method="GET", headers={}, body=None, auth_username=None, auth_password=None,