From: Ben Darnell Date: Fri, 23 Jul 2010 18:12:56 +0000 (-0700) Subject: Remove the old AsyncHTTPClient (fdset) and make AsyncHTTPClient2 X-Git-Tag: v1.1.0~66 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=94f073b7a209d04b463ea3098ab89bbabb8ba7ca;p=thirdparty%2Ftornado.git Remove the old AsyncHTTPClient (fdset) and make AsyncHTTPClient2 (socket_action) the standard. --- diff --git a/tornado/httpclient.py b/tornado/httpclient.py index 4d97eebe4..94bd98c57 100644 --- a/tornado/httpclient.py +++ b/tornado/httpclient.py @@ -115,237 +115,6 @@ class AsyncHTTPClient(object): instance = super(AsyncHTTPClient, cls).__new__(cls) instance.io_loop = io_loop instance._multi = pycurl.CurlMulti() - instance._curls = [_curl_create(max_simultaneous_connections) - for i in xrange(max_clients)] - instance._free_list = instance._curls[:] - instance._requests = collections.deque() - instance._fds = {} - instance._events = {} - instance._added_perform_callback = False - instance._timeout = None - instance._closed = False - cls._ASYNC_CLIENTS[io_loop] = instance - return instance - - def close(self): - """Destroys this http client, freeing any file descriptors used. - Not needed in normal use, but may be helpful in unittests that - create and destroy http clients. No other methods may be called - on the AsyncHTTPClient after close(). - """ - del AsyncHTTPClient._ASYNC_CLIENTS[self.io_loop] - for curl in self._curls: - curl.close() - self._multi.close() - self._closed = True - - def fetch(self, request, callback, **kwargs): - """Executes an HTTPRequest, calling callback with an HTTPResponse. - - If an error occurs during the fetch, the HTTPResponse given to the - callback has a non-None error attribute that contains the exception - encountered during the request. You can call response.reraise() to - throw the exception (if any) in the callback. - """ - if not isinstance(request, HTTPRequest): - request = HTTPRequest(url=request, **kwargs) - self._requests.append((request, callback)) - self._add_perform_callback() - - def _add_perform_callback(self): - if not self._added_perform_callback: - self.io_loop.add_callback(self._perform) - self._added_perform_callback = True - - def _handle_events(self, fd, events): - self._events[fd] = events - self._add_perform_callback() - - def _handle_timeout(self): - self._timeout = None - self._perform() - - def _perform(self): - self._added_perform_callback = False - - if self._closed: - return - - while True: - while True: - ret, num_handles = self._multi.perform() - if ret != pycurl.E_CALL_MULTI_PERFORM: - break - - # Update the set of active file descriptors. It is important - # that this happen immediately after perform() because - # fds that have been removed from fdset are free to be reused - # in user callbacks. - fds = {} - (readable, writable, exceptable) = self._multi.fdset() - for fd in readable: - fds[fd] = fds.get(fd, 0) | 0x1 | 0x2 - for fd in writable: - fds[fd] = fds.get(fd, 0) | 0x4 - for fd in exceptable: - fds[fd] = fds.get(fd, 0) | 0x8 | 0x10 - - if fds and max(fds.iterkeys()) > 900: - # Libcurl has a bug in which it behaves unpredictably with - # file descriptors greater than 1024. (This is because - # even though it uses poll() instead of select(), it still - # uses FD_SET internally) Since curl opens its own file - # descriptors we can't catch this problem when it happens, - # and the best we can do is detect that it's about to - # happen. Exiting is a lousy way to handle this error, - # but there's not much we can do at this point. Exiting - # (and getting restarted by whatever monitoring process - # is handling crashed tornado processes) will at least - # get things working again and hopefully bring the issue - # to someone's attention. - # If you run into this issue, you either have a file descriptor - # leak or need to run more tornado processes (so that none - # of them are handling more than 1000 simultaneous connections) - print >> sys.stderr, "ERROR: File descriptor too high for libcurl. Exiting." - logging.error("File descriptor too high for libcurl. Exiting.") - sys.exit(1) - - for fd in self._fds: - if fd not in fds: - try: - self.io_loop.remove_handler(fd) - except (OSError, IOError), e: - if e[0] != errno.ENOENT: - raise - - for fd, events in fds.iteritems(): - old_events = self._fds.get(fd, None) - if old_events is None: - self.io_loop.add_handler(fd, self._handle_events, events) - elif old_events != events: - try: - self.io_loop.update_handler(fd, events) - except (OSError, IOError), e: - if e[0] == errno.ENOENT: - self.io_loop.add_handler(fd, self._handle_events, - events) - else: - raise - self._fds = fds - - - # Handle completed fetches - completed = 0 - while True: - num_q, ok_list, err_list = self._multi.info_read() - for curl in ok_list: - self._finish(curl) - completed += 1 - for curl, errnum, errmsg in err_list: - self._finish(curl, errnum, errmsg) - completed += 1 - if num_q == 0: - break - - # Start fetching new URLs - started = 0 - while self._free_list and self._requests: - started += 1 - curl = self._free_list.pop() - (request, callback) = self._requests.popleft() - curl.info = { - "headers": httputil.HTTPHeaders(), - "buffer": cStringIO.StringIO(), - "request": request, - "callback": callback, - "start_time": time.time(), - } - _curl_setup_request(curl, request, curl.info["buffer"], - curl.info["headers"]) - self._multi.add_handle(curl) - - if not started and not completed: - break - - if self._timeout is not None: - self.io_loop.remove_timeout(self._timeout) - self._timeout = None - - if num_handles: - self._timeout = self.io_loop.add_timeout( - time.time() + 0.2, self._handle_timeout) - - - def _finish(self, curl, curl_error=None, curl_message=None): - info = curl.info - curl.info = None - self._multi.remove_handle(curl) - self._free_list.append(curl) - buffer = info["buffer"] - if curl_error: - error = CurlError(curl_error, curl_message) - code = error.code - body = None - effective_url = None - buffer.close() - buffer = None - else: - error = None - code = curl.getinfo(pycurl.HTTP_CODE) - effective_url = curl.getinfo(pycurl.EFFECTIVE_URL) - buffer.seek(0) - try: - info["callback"](HTTPResponse( - request=info["request"], code=code, headers=info["headers"], - buffer=buffer, effective_url=effective_url, error=error, - request_time=time.time() - info["start_time"])) - except (KeyboardInterrupt, SystemExit): - raise - except: - logging.error("Exception in callback %r", info["callback"], - exc_info=True) - - -class AsyncHTTPClient2(object): - """Alternate implementation of AsyncHTTPClient. - - This class has the same interface as AsyncHTTPClient (so see that class - for usage documentation) but is implemented with a different set of - libcurl APIs (curl_multi_socket_action instead of fdset/perform). - This implementation will likely become the default in the future, but - for now should be considered somewhat experimental. - - The main advantage of this class over the original implementation is - that it is immune to the fd > 1024 bug, so applications with a large - number of simultaneous requests (e.g. long-polling) may prefer this - version. - - Known bugs: - * Timeouts connecting to localhost - In some situations, this implementation will return a connection - timeout when the old implementation would be able to connect. This - has only been observed when connecting to localhost when using - the kqueue-based IOLoop (mac/bsd), but it may also occur on epoll (linux) - and, in principle, for non-localhost sites. - While the bug is unrelated to IPv6, disabling IPv6 will avoid the - most common manifestations of the bug, so this class disables IPv6 when - it detects an affected version of libcurl. - The underlying cause is a libcurl bug in versions up to and including - 7.21.0 (it will be fixed in the not-yet-released 7.21.1) - http://sourceforge.net/tracker/?func=detail&aid=3017819&group_id=976&atid=100976 - """ - _ASYNC_CLIENTS = weakref.WeakKeyDictionary() - - def __new__(cls, io_loop=None, max_clients=10, - max_simultaneous_connections=None): - # There is one client per IOLoop since they share curl instances - io_loop = io_loop or ioloop.IOLoop.instance() - if io_loop in cls._ASYNC_CLIENTS: - return cls._ASYNC_CLIENTS[io_loop] - else: - instance = super(AsyncHTTPClient2, cls).__new__(cls) - instance.io_loop = io_loop - instance._multi = pycurl.CurlMulti() instance._multi.setopt(pycurl.M_TIMERFUNCTION, instance._set_timeout) instance._multi.setopt(pycurl.M_SOCKETFUNCTION, @@ -365,7 +134,7 @@ class AsyncHTTPClient2(object): create and destroy http clients. No other methods may be called on the AsyncHTTPClient after close(). """ - del AsyncHTTPClient2._ASYNC_CLIENTS[self.io_loop] + del AsyncHTTPClient._ASYNC_CLIENTS[self.io_loop] for curl in self._curls: curl.close() self._multi.close() @@ -529,6 +298,10 @@ class AsyncHTTPClient2(object): logging.error("Exception in callback %r", info["callback"], exc_info=True) +# For backwards compatibility: Tornado 1.0 included a new implementation of +# AsyncHTTPClient that has since replaced the original. Define an alias +# so anything that used AsyncHTTPClient2 still works +AsyncHTTPClient2 = AsyncHTTPClient class HTTPRequest(object): def __init__(self, url, method="GET", headers=None, body=None,