]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Add a second implementation of AsyncHTTPClient, using a new libcurl api.
authorBen Darnell <bdarnell@beaker.local>
Sun, 20 Jun 2010 02:38:02 +0000 (19:38 -0700)
committerBen Darnell <bdarnell@beaker.local>
Sun, 20 Jun 2010 02:55:40 +0000 (19:55 -0700)
Adapted from Jacob Kristhammar's work:
http://github.com/sris/tornado/blob/master/tornado/httpclient2.py

tornado/httpclient.py

index 91ac123deff33f2f193b11220a7f085d41d41a26..730853863134d2a2a98fc975a0fc6d1db26159bc 100644 (file)
@@ -300,6 +300,204 @@ class AsyncHTTPClient(object):
                           exc_info=True)
 
 
+class AsyncHTTPClient2(object):
+    """Alternate implementation of AsyncHTTPClient.
+
+    This class has the same interface as AsyncHTTPClient (so see that class
+    for usage documentation) but is implemented with a different set of
+    libcurl APIs (curl_multi_socket_action instead of fdset/perform).
+    This implementation will likely become the default in the future, but
+    for now should be considered somewhat experimental.
+
+    The main advantage of this class over the original implementation is
+    that it is immune to the fd > 1024 bug, so applications with a large
+    number of simultaneous requests (e.g. long-polling) may prefer this
+    version.
+
+    Known bugs:
+    * Timeouts connecting to localhost
+    In some situations, this implementation will return a connection
+    timeout when the old implementation would be able to connect.  This
+    has only been observed when connecting to localhost when using
+    the kqueue-based IOLoop (mac/bsd), but it may also occur on epoll (linux)
+    and, in principle, for non-localhost sites.
+    While the bug is unrelated to IPv6, disabling IPv6 will avoid the
+    most common manifestations of the bug (use a prepare_curl_callback that
+    calls curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)).
+    The underlying cause is a libcurl bug that has been confirmed to be
+    present in versions 7.20.0 and 7.21.0:
+    http://sourceforge.net/tracker/?func=detail&aid=3017819&group_id=976&atid=100976
+    """
+    _ASYNC_CLIENTS = weakref.WeakKeyDictionary()
+
+    def __new__(cls, io_loop=None, max_clients=10,
+                max_simultaneous_connections=None):
+        # There is one client per IOLoop since they share curl instances
+        io_loop = io_loop or ioloop.IOLoop.instance()
+        if io_loop in cls._ASYNC_CLIENTS:
+            return cls._ASYNC_CLIENTS[io_loop]
+        else:
+            instance = super(AsyncHTTPClient2, cls).__new__(cls)
+            instance.io_loop = io_loop
+            instance._multi = pycurl.CurlMulti()
+            instance._multi.setopt(pycurl.M_TIMERFUNCTION,
+                                   instance._handle_timer)
+            instance._multi.setopt(pycurl.M_SOCKETFUNCTION,
+                                   instance._handle_socket)
+            instance._curls = [_curl_create(max_simultaneous_connections)
+                               for i in xrange(max_clients)]
+            instance._free_list = instance._curls[:]
+            instance._requests = collections.deque()
+            instance._fds = {}
+            cls._ASYNC_CLIENTS[io_loop] = instance
+            return instance
+
+    def close(self):
+        """Destroys this http client, freeing any file descriptors used.
+        Not needed in normal use, but may be helpful in unittests that
+        create and destroy http clients.  No other methods may be called
+        on the AsyncHTTPClient after close().
+        """
+        del AsyncHTTPClient2._ASYNC_CLIENTS[self.io_loop]
+        for curl in self._curls:
+            curl.close()
+        self._multi.close()
+        self._closed = True
+
+    def fetch(self, request, callback, **kwargs):
+        """Executes an HTTPRequest, calling callback with an HTTPResponse.
+
+        If an error occurs during the fetch, the HTTPResponse given to the
+        callback has a non-None error attribute that contains the exception
+        encountered during the request. You can call response.reraise() to
+        throw the exception (if any) in the callback.
+        """
+        if not isinstance(request, HTTPRequest):
+           request = HTTPRequest(url=request, **kwargs)
+        self._requests.append((request, callback))
+        self._process_queue()
+        self.io_loop.add_callback(self._handle_timeout)
+
+    def _handle_socket(self, event, fd, multi, data):
+        """Called by libcurl when it wants to change the file descriptors
+        it cares about.
+        """
+        event_map = {
+            pycurl.POLL_NONE: ioloop.IOLoop.NONE,
+            pycurl.POLL_IN: ioloop.IOLoop.READ,
+            pycurl.POLL_OUT: ioloop.IOLoop.WRITE,
+            pycurl.POLL_INOUT: ioloop.IOLoop.READ | ioloop.IOLoop.WRITE
+        }
+        if event == pycurl.POLL_REMOVE:
+            self.io_loop.remove_handler(fd)
+            del self._fds[fd]
+        else:
+            ioloop_event = event_map[event]
+            if fd not in self._fds:
+                self._fds[fd] = ioloop_event
+                self.io_loop.add_handler(fd, self._handle_events,
+                                         ioloop_event)
+            else:
+                self._fds[fd] = ioloop_event
+                self.io_loop.update_handler(fd, ioloop_event)
+
+    def _handle_timer(self, msecs):
+        """Called by libcurl to schedule a timeout."""
+        self.io_loop.add_timeout(
+            time.time() + msecs/1000.0, self._handle_timeout)
+
+    def _handle_events(self, fd, events):
+        """Called by IOLoop when there is activity on one of our
+        file descriptors.
+        """
+        action = 0
+        if events & ioloop.IOLoop.READ: action |= pycurl.CSELECT_IN
+        if events & ioloop.IOLoop.WRITE: action |= pycurl.CSELECT_OUT
+        while True:
+            try:
+                ret, num_handles = self._multi.socket_action(fd, action)
+            except Exception, e:
+                ret = e[0]
+            if ret != pycurl.E_CALL_MULTI_PERFORM:
+                break
+        self._finish_pending_requests()
+
+    def _handle_timeout(self):
+        """Called by IOLoop when the requested timeout has passed."""
+        while True:
+            try:
+                ret, num_handles = self._multi.socket_action(
+                                        pycurl.SOCKET_TIMEOUT, 0)
+            except Exception, e:
+                ret = e[0]
+            if ret != pycurl.E_CALL_MULTI_PERFORM:
+                break
+        self._finish_pending_requests()
+
+    def _finish_pending_requests(self):
+        """Process any requests that were completed by the last
+        call to multi.socket_action.
+        """
+        while True:
+            num_q, ok_list, err_list = self._multi.info_read()
+            for curl in ok_list:
+                self._finish(curl)
+            for curl, errnum, errmsg in err_list:
+                self._finish(curl, errnum, errmsg)
+            if num_q == 0:
+                break
+        self._process_queue()
+
+    def _process_queue(self):
+        while True:
+            started = 0
+            while self._free_list and self._requests:
+                started += 1
+                curl = self._free_list.pop()
+                (request, callback) = self._requests.popleft()
+                curl.info = {
+                    "headers": {},
+                    "buffer": cStringIO.StringIO(),
+                    "request": request,
+                    "callback": callback,
+                    "start_time": time.time(),
+                }
+                _curl_setup_request(curl, request, curl.info["buffer"],
+                                    curl.info["headers"])
+                self._multi.add_handle(curl)
+
+            if not started:
+                break
+
+    def _finish(self, curl, curl_error=None, curl_message=None):
+        info = curl.info
+        curl.info = None
+        self._multi.remove_handle(curl)
+        self._free_list.append(curl)
+        buffer = info["buffer"]
+        if curl_error:
+            error = CurlError(curl_error, curl_message)
+            code = error.code
+            effective_url = None
+            buffer.close()
+            buffer = None
+        else:
+            error = None
+            code = curl.getinfo(pycurl.HTTP_CODE)
+            effective_url = curl.getinfo(pycurl.EFFECTIVE_URL)
+            buffer.seek(0)
+        try:
+            info["callback"](HTTPResponse(
+                request=info["request"], code=code, headers=info["headers"],
+                buffer=buffer, effective_url=effective_url, error=error,
+                request_time=time.time() - info["start_time"]))
+        except (KeyboardInterrupt, SystemExit):
+            raise
+        except:
+            logging.error("Exception in callback %r", info["callback"],
+                          exc_info=True)
+
+
 class HTTPRequest(object):
     def __init__(self, url, method="GET", headers={}, body=None,
                  auth_username=None, auth_password=None,