import sys
import threading
import time
-import weakref
from tornado import httputil
from tornado import ioloop
from tornado import stack_context
from tornado.escape import utf8
-from tornado.httpclient import HTTPRequest, HTTPResponse, HTTPError, main
+from tornado.httpclient import HTTPRequest, HTTPResponse, HTTPError, AsyncHTTPClient, main
+
+class CurlAsyncHTTPClient(AsyncHTTPClient):
+ def initialize(self, io_loop=None, max_clients=10,
+ max_simultaneous_connections=None):
+ self.io_loop = io_loop
+ self._multi = pycurl.CurlMulti()
+ self._multi.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout)
+ self._multi.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket)
+ self._curls = [_curl_create(max_simultaneous_connections)
+ for i in xrange(max_clients)]
+ self._free_list = self._curls[:]
+ self._requests = collections.deque()
+ self._fds = {}
+ self._timeout = None
-class AsyncHTTPClient(object):
- """An non-blocking HTTP client backed with pycurl.
-
- Example usage:
-
- import ioloop
-
- def handle_request(response):
- if response.error:
- print "Error:", response.error
- else:
- print response.body
- ioloop.IOLoop.instance().stop()
-
- http_client = httpclient.AsyncHTTPClient()
- http_client.fetch("http://www.google.com/", handle_request)
- ioloop.IOLoop.instance().start()
-
- fetch() can take a string URL or an HTTPRequest instance, which offers
- more options, like executing POST/PUT/DELETE requests.
-
- The keyword argument max_clients to the AsyncHTTPClient constructor
- determines the maximum number of simultaneous fetch() operations that
- can execute in parallel on each IOLoop.
- """
- _ASYNC_CLIENTS = weakref.WeakKeyDictionary()
-
- def __new__(cls, io_loop=None, max_clients=10,
- max_simultaneous_connections=None):
- # There is one client per IOLoop since they share curl instances
- io_loop = io_loop or ioloop.IOLoop.instance()
- if io_loop in cls._ASYNC_CLIENTS:
- return cls._ASYNC_CLIENTS[io_loop]
- else:
- instance = super(AsyncHTTPClient, cls).__new__(cls)
- instance.io_loop = io_loop
- instance._multi = pycurl.CurlMulti()
- instance._multi.setopt(pycurl.M_TIMERFUNCTION,
- instance._set_timeout)
- instance._multi.setopt(pycurl.M_SOCKETFUNCTION,
- instance._handle_socket)
- instance._curls = [_curl_create(max_simultaneous_connections)
- for i in xrange(max_clients)]
- instance._free_list = instance._curls[:]
- instance._requests = collections.deque()
- instance._fds = {}
- instance._timeout = None
- cls._ASYNC_CLIENTS[io_loop] = instance
-
- try:
- instance._socket_action = instance._multi.socket_action
- except AttributeError:
- # socket_action is found in pycurl since 7.18.2 (it's been
- # in libcurl longer than that but wasn't accessible to
- # python).
- logging.warning("socket_action method missing from pycurl; "
- "falling back to socket_all. Upgrading "
- "libcurl and pycurl will improve performance")
- instance._socket_action = \
- lambda fd, action: instance._multi.socket_all()
-
- # libcurl has bugs that sometimes cause it to not report all
- # relevant file descriptors and timeouts to TIMERFUNCTION/
- # SOCKETFUNCTION. Mitigate the effects of such bugs by
- # forcing a periodic scan of all active requests.
- instance._force_timeout_callback = ioloop.PeriodicCallback(
- instance._handle_force_timeout, 1000, io_loop=io_loop)
- instance._force_timeout_callback.start()
-
- return instance
+ try:
+ self._socket_action = self._multi.socket_action
+ except AttributeError:
+ # socket_action is found in pycurl since 7.18.2 (it's been
+ # in libcurl longer than that but wasn't accessible to
+ # python).
+ logging.warning("socket_action method missing from pycurl; "
+ "falling back to socket_all. Upgrading "
+ "libcurl and pycurl will improve performance")
+ self._socket_action = \
+ lambda fd, action: self._multi.socket_all()
+
+ # libcurl has bugs that sometimes cause it to not report all
+ # relevant file descriptors and timeouts to TIMERFUNCTION/
+ # SOCKETFUNCTION. Mitigate the effects of such bugs by
+ # forcing a periodic scan of all active requests.
+ self._force_timeout_callback = ioloop.PeriodicCallback(
+ self._handle_force_timeout, 1000, io_loop=io_loop)
+ self._force_timeout_callback.start()
def close(self):
- """Destroys this http client, freeing any file descriptors used.
- Not needed in normal use, but may be helpful in unittests that
- create and destroy http clients. No other methods may be called
- on the AsyncHTTPClient after close().
- """
- del AsyncHTTPClient._ASYNC_CLIENTS[self.io_loop]
self._force_timeout_callback.stop()
for curl in self._curls:
curl.close()
self._multi.close()
self._closed = True
+ super(CurlAsyncHTTPClient, self).close()
def fetch(self, request, callback, **kwargs):
- """Executes an HTTPRequest, calling callback with an HTTPResponse.
-
- If an error occurs during the fetch, the HTTPResponse given to the
- callback has a non-None error attribute that contains the exception
- encountered during the request. You can call response.rethrow() to
- throw the exception (if any) in the callback.
- """
if not isinstance(request, HTTPRequest):
request = HTTPRequest(url=request, **kwargs)
self._requests.append((request, stack_context.wrap(callback)))
def handle_callback_exception(self, callback):
self.io_loop.handle_callback_exception(callback)
-# For backwards compatibility: Tornado 1.0 included a new implementation of
-# AsyncHTTPClient that has since replaced the original. Define an alias
-# so anything that used AsyncHTTPClient2 still works
-AsyncHTTPClient2 = AsyncHTTPClient
-
class CurlError(HTTPError):
def __init__(self, errno, message):
import httplib
import os
import time
+import weakref
from tornado.escape import utf8
from tornado import httputil
response.rethrow()
return response
+class AsyncHTTPClient(object):
+ """An non-blocking HTTP client.
+
+ Example usage:
+
+ import ioloop
+
+ def handle_request(response):
+ if response.error:
+ print "Error:", response.error
+ else:
+ print response.body
+ ioloop.IOLoop.instance().stop()
+
+ http_client = httpclient.AsyncHTTPClient()
+ http_client.fetch("http://www.google.com/", handle_request)
+ ioloop.IOLoop.instance().start()
+
+ fetch() can take a string URL or an HTTPRequest instance, which offers
+ more options, like executing POST/PUT/DELETE requests.
+
+ The keyword argument max_clients to the AsyncHTTPClient constructor
+ determines the maximum number of simultaneous fetch() operations that
+ can execute in parallel on each IOLoop.
+ """
+ _ASYNC_CLIENTS = weakref.WeakKeyDictionary()
+
+ def __new__(cls, io_loop=None, max_clients=10, force_instance=False,
+ **kwargs):
+ io_loop = io_loop or IOLoop.instance()
+ if io_loop in cls._ASYNC_CLIENTS and not force_instance:
+ return cls._ASYNC_CLIENTS[io_loop]
+ else:
+ if cls is AsyncHTTPClient:
+ cls = AsyncImpl
+ instance = super(AsyncHTTPClient, cls).__new__(cls)
+ instance.initialize(io_loop, max_clients, **kwargs)
+ if not force_instance:
+ cls._ASYNC_CLIENTS[io_loop] = instance
+ return instance
+
+ def close(self):
+ """Destroys this http client, freeing any file descriptors used.
+ Not needed in normal use, but may be helpful in unittests that
+ create and destroy http clients. No other methods may be called
+ on the AsyncHTTPClient after close().
+ """
+ if self._ASYNC_CLIENTS[self.io_loop] is self:
+ del self._ASYNC_CLIENTS[self.io_loop]
+
+ def fetch(self, request, callback, **kwargs):
+ """Executes an HTTPRequest, calling callback with an HTTPResponse.
+
+ If an error occurs during the fetch, the HTTPResponse given to the
+ callback has a non-None error attribute that contains the exception
+ encountered during the request. You can call response.rethrow() to
+ throw the exception (if any) in the callback.
+ """
+ raise NotImplementedError()
+
class HTTPRequest(object):
def __init__(self, url, method="GET", headers=None, body=None,
auth_username=None, auth_password=None,
# and may be removed or replaced with a better way of specifying the preferred
# HTTPClient implementation before the next release.
if os.environ.get("USE_SIMPLE_HTTPCLIENT"):
- from tornado.simple_httpclient import AsyncHTTPClient
+ from tornado.simple_httpclient import SimpleAsyncHTTPClient as AsyncImpl
else:
- from tornado.curl_httpclient import AsyncHTTPClient
+ from tornado.curl_httpclient import CurlAsyncHTTPClient as AsyncImpl
if __name__ == "__main__":
main()
from __future__ import with_statement
from cStringIO import StringIO
-from tornado.httpclient import HTTPRequest, HTTPResponse, HTTPError
+from tornado.httpclient import HTTPRequest, HTTPResponse, HTTPError, AsyncHTTPClient
from tornado.httputil import HTTPHeaders
from tornado.ioloop import IOLoop
from tornado.iostream import IOStream, SSLIOStream
import socket
import time
import urlparse
-import weakref
import zlib
try:
_DEFAULT_CA_CERTS = os.path.dirname(__file__) + '/ca-certificates.crt'
-class AsyncHTTPClient(object):
+class SimpleAsyncHTTPClient(AsyncHTTPClient):
"""Non-blocking HTTP client with no external dependencies.
This class implements an HTTP 1.1 client on top of Tornado's IOStreams.
should use the curl-based AsyncHTTPClient if HTTPS support is required.
"""
- _ASYNC_CLIENTS = weakref.WeakKeyDictionary()
-
- def __new__(cls, io_loop=None, max_clients=10,
- max_simultaneous_connections=None,
- force_instance=False,
- hostname_mapping=None):
+ def initialize(self, io_loop=None, max_clients=10,
+ max_simultaneous_connections=None,
+ hostname_mapping=None):
"""Creates a AsyncHTTPClient.
Only a single AsyncHTTPClient instance exists per IOLoop
settings like /etc/hosts is not possible or desirable (e.g. in
unittests).
"""
- io_loop = io_loop or IOLoop.instance()
- if io_loop in cls._ASYNC_CLIENTS and not force_instance:
- return cls._ASYNC_CLIENTS[io_loop]
- else:
- instance = super(AsyncHTTPClient, cls).__new__(cls)
- instance.io_loop = io_loop
- instance.max_clients = max_clients
- instance.queue = collections.deque()
- instance.active = {}
- instance.hostname_mapping = hostname_mapping
- if not force_instance:
- cls._ASYNC_CLIENTS[io_loop] = instance
- return instance
-
- def close(self):
- pass
+ self.io_loop = io_loop
+ self.max_clients = max_clients
+ self.queue = collections.deque()
+ self.active = {}
+ self.hostname_mapping = hostname_mapping
def fetch(self, request, callback, **kwargs):
if not isinstance(request, HTTPRequest):
raise CertificateError("no appropriate commonName or "
"subjectAltName fields were found")
-# Alias for backwards compatibility
-SimpleAsyncHTTPClient = AsyncHTTPClient
-
def main():
from tornado.options import define, options, parse_command_line
define("print_headers", type=bool, default=False)
define("print_body", type=bool, default=True)
define("follow_redirects", type=bool, default=True)
args = parse_command_line()
- client = AsyncHTTPClient()
+ client = SimpleAsyncHTTPClient()
io_loop = IOLoop.instance()
for arg in args:
def callback(response):
def setUp(self):
super(SimpleHTTPClientTestCase, self).setUp()
# replace the client defined in the parent class
- self.http_client = SimpleAsyncHTTPClient(io_loop=self.io_loop)
+ self.http_client = SimpleAsyncHTTPClient(io_loop=self.io_loop,
+ force_instance=True)
def test_hello_world(self):
response = self.fetch("/hello")