except ImportError:
import urllib as urllib_parse # py2
+
class AuthError(Exception):
pass
+
def _auth_future_to_callback(callback, future):
try:
result = future.result()
result = None
callback(result)
+
def _auth_return_future(f):
"""Similar to tornado.concurrent.return_future, but uses the auth
module's legacy callback interface.
inside the function will actually be a future.
"""
replacer = ArgReplacer(f, 'callback')
+
@functools.wraps(f)
def wrapper(*args, **kwargs):
future = Future()
return future
return wrapper
+
class OpenIdMixin(object):
"""Abstract implementation of OpenID and Attribute Exchange.
def _on_authentication_verified(self, future, response):
if response.error or b"is_valid:true" not in response.body:
future.set_exception(AuthError(
- "Invalid OpenID response: %s" % (response.error or
- response.body)))
+ "Invalid OpenID response: %s" % (response.error or
+ response.body)))
return
# Make sure we got back at least an email from attribute exchange
request_cookie = self.get_cookie("_oauth_request_token")
if not request_cookie:
future.set_exception(AuthError(
- "Missing OAuth request token cookie"))
+ "Missing OAuth request token cookie"))
return
self.clear_cookie("_oauth_request_token")
cookie_key, cookie_secret = [base64.b64decode(escape.utf8(i)) for i in request_cookie.split("|")]
if cookie_key != request_key:
future.set_exception(AuthError(
- "Request token does not match cookie"))
+ "Request token does not match cookie"))
return
token = dict(key=cookie_key, secret=cookie_secret)
if oauth_verifier:
def _on_twitter_request(self, future, response):
if response.error:
future.set_exception(AuthError(
- "Error response %s fetching %s" % (response.error,
- response.request.url)))
+ "Error response %s fetching %s" % (response.error,
+ response.request.url)))
return
future.set_result(escape.json_decode(response.body))
def _on_friendfeed_request(self, future, response):
if response.error:
future.set_exception(AuthError(
- "Error response %s fetching %s" % (response.error,
- response.request.url)))
+ "Error response %s fetching %s" % (response.error,
+ response.request.url)))
return
future.set_result(escape.json_decode(response.body))
except ImportError:
futures = None
+
class ReturnValueIgnoredError(Exception):
pass
+
class _DummyFuture(object):
def __init__(self):
self._done = False
_NO_RESULT = object()
+
def return_future(f):
"""Decorator to make a function that returns via callback return a `Future`.
consider using ``@gen.coroutine`` instead of this combination.
"""
replacer = ArgReplacer(f, 'callback')
+
@functools.wraps(f)
def wrapper(*args, **kwargs):
future = TracebackFuture()
return future
return wrapper
+
def chain_future(a, b):
"""Chain two futures together so that when one completes, so does the other.
def copy(future):
assert future is a
if (isinstance(a, TracebackFuture) and isinstance(b, TracebackFuture)
- and a.exc_info() is not None):
+ and a.exc_info() is not None):
b.set_exc_info(a.exc_info())
elif a.exception() is not None:
b.set_exception(a.exception())
def is_ready(self):
finished = list(itertools.takewhile(
- lambda i: i.is_ready(), self.unfinished_children))
+ lambda i: i.is_ready(), self.unfinished_children))
self.unfinished_children.difference_update(finished)
return not self.unfinished_children
If an error occurs during the fetch, we raise an `HTTPError`.
"""
response = self._io_loop.run_sync(functools.partial(
- self._async_client.fetch, request, **kwargs))
+ self._async_client.fetch, request, **kwargs))
response.rethrow()
return response
future = Future()
if callback is not None:
callback = stack_context.wrap(callback)
+
def handle_future(future):
exc = future.exception()
if isinstance(exc, HTTPError) and exc.response is not None:
response = future.result()
self.io_loop.add_callback(callback, response)
future.add_done_callback(handle_future)
+
def handle_response(response):
if response.error:
future.set_exception(response.error)
# _parseparam and _parse_header are copied and modified from python2.7's cgi.py
# The original 2.7 version of this code did not correctly support some
# combinations of semicolons and double quotes.
+
+
def _parseparam(s):
while s[:1] == ';':
s = s[1:]
IOLoop.instance().run_sync(main)
"""
future_cell = [None]
+
def run():
try:
result = func()
raise TimeoutError('Operation timed out after %s seconds' % timeout)
return future_cell[0].result()
-
def time(self):
"""Returns the current time according to the IOLoop's clock.
results.append((family, address))
return results
+
class BlockingResolver(ExecutorResolver):
def initialize(self, io_loop=None):
super(BlockingResolver, self).initialize(io_loop=io_loop)
+
class ThreadedResolver(ExecutorResolver):
def initialize(self, io_loop=None, num_threads=10):
from concurrent.futures import ThreadPoolExecutor
super(ThreadedResolver, self).initialize(
io_loop=io_loop, executor=ThreadPoolExecutor(num_threads))
+
class OverrideResolver(Resolver):
"""Wraps a resolver with a mapping of overrides.
return self.resolver.resolve(host, port, *args, **kwargs)
-
# These are the keyword arguments to ssl.wrap_socket that must be translated
# to their SSLContext equivalents (the other arguments are still passed
# to SSLContext.wrap_socket).
_SSL_CONTEXT_KEYWORDS = frozenset(['ssl_version', 'certfile', 'keyfile',
'cert_reqs', 'ca_certs', 'ciphers'])
+
def ssl_options_to_context(ssl_options):
"""Try to Convert an ssl_options dictionary to an SSLContext object.
if isinstance(ssl_options, dict):
assert all(k in _SSL_CONTEXT_KEYWORDS for k in ssl_options), ssl_options
if (not hasattr(ssl, 'SSLContext') or
- isinstance(ssl_options, ssl.SSLContext)):
+ isinstance(ssl_options, ssl.SSLContext)):
return ssl_options
context = ssl.SSLContext(
ssl_options.get('ssl_version', ssl.PROTOCOL_SSLv23))
class SSLCertificateError(ValueError):
pass
-
def _dnsname_to_pat(dn):
pats = []
for frag in dn.split(r'.'):
pats.append(frag.replace(r'\*', '[^.]*'))
return re.compile(r'\A' + r'\.'.join(pats) + r'\Z', re.IGNORECASE)
-
def ssl_match_hostname(cert, hostname):
"""Verify that *cert* (in decoded format as returned by
SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 rules
from tornado.ioloop import IOLoop
from tornado.netutil import Resolver, is_valid_ip
+
class CaresResolver(Resolver):
"""Name resolver based on the c-ares library.
(family, resolved_family))
result = [
(resolved_family, (resolved, port)),
- ]
+ ]
raise gen.Return(result)
from tornado.netutil import bind_sockets, add_accept_handler, ssl_wrap_socket
from tornado import process
+
class TCPServer(object):
r"""A non-blocking, single-threaded TCP server.
self._OAUTH_AUTHORIZE_URL = test.get_url('/oauth1/server/authorize')
self._TWITTER_BASE_URL = test.get_url('/twitter/api')
-
def get_auth_http_client(self):
return self.settings['http_client']
# The following series of classes demonstrate and test various styles
# of use, with and without generators and futures.
+
+
class CapServer(TCPServer):
def handle_stream(self, stream, address):
logging.info("handle_stream")
if pycurl is not None:
from tornado.curl_httpclient import CurlAsyncHTTPClient
+
@unittest.skipIf(pycurl is None, "pycurl module not present")
class CurlHTTPClientCommonTestCase(httpclient_test.HTTPClientCommonTestCase):
def get_http_client(self):
# can be caught and replaced.
@gen.coroutine
def f2():
- self.io_loop.add_callback(lambda: 1/ 0)
+ self.io_loop.add_callback(lambda: 1 / 0)
try:
yield gen.Task(self.io_loop.add_timeout,
self.io_loop.time() + 10)
# can be caught and ignored.
@gen.coroutine
def f2():
- self.io_loop.add_callback(lambda: 1/ 0)
+ self.io_loop.add_callback(lambda: 1 / 0)
try:
yield gen.Task(self.io_loop.add_timeout,
self.io_loop.time() + 10)
self.finished = True
-
class GenSequenceHandler(RequestHandler):
@asynchronous
@gen.engine
except ImportError:
from cStringIO import StringIO as BytesIO
+
class HelloWorldHandler(RequestHandler):
def get(self):
name = self.get_argument("name", "world")
server.read_until(b"\r\n", self.stop)
data = self.wait()
self.assertEqual(data, b"abcd\r\n")
+
def closed_callback(chunk):
self.fail()
server.read_until_close(callback=closed_callback,
else:
from tornado.platform.twisted import TwistedResolver
+
class _ResolverTestMixin(object):
def test_localhost(self):
self.resolver.resolve('localhost', 80, callback=self.stop)
hostname_mapping={
'www.example.com': '127.0.0.1',
('foo.example.com', 8000): ('127.0.0.1', self.get_http_port()),
- })
+ })
def get_app(self):
- return Application([url("/hello", HelloWorldHandler),])
+ return Application([url("/hello", HelloWorldHandler), ])
def test_hostname_mapping(self):
self.http_client.fetch(
except ImportError:
from io import StringIO # py3
+
class RaiseExcInfoTest(unittest.TestCase):
def test_two_arg_exception(self):
# This test would fail on python 3 if raise_exc_info were simply
wsgi_safe_tests = []
+
def wsgi_safe(cls):
wsgi_safe_tests.append(cls)
return cls
+
class WebTestCase(AsyncHTTPTestCase):
"""Base class for web tests that also supports WSGI mode.
from tornado.web import Application
from tornado.websocket import WebSocketHandler, WebSocketConnect
+
class EchoHandler(WebSocketHandler):
def on_message(self, message):
self.write_message(message, isinstance(message, bytes))
+
class WebSocketTest(AsyncHTTPTestCase):
def get_app(self):
return Application([
- ('/echo', EchoHandler),
- ])
+ ('/echo', EchoHandler),
+ ])
@gen_test
def test_websocket_gen(self):
"""
f = gen.coroutine(f)
+
@functools.wraps(f)
def wrapper(self):
return self.io_loop.run_sync(functools.partial(f, self), timeout=5)
logging.basicConfig()
handler = logger.handlers[0]
if (len(logger.handlers) > 1 or
- not isinstance(handler, logging.StreamHandler)):
+ not isinstance(handler, logging.StreamHandler)):
# Logging has been configured in a way we don't recognize,
# so just leave it alone.
super(LogTrapTestCase, self).run(result)
def clear(self):
"""Resets all headers and content for this response."""
self._headers = httputil.HTTPHeaders({
- "Server": "TornadoServer/%s" % tornado.version,
- "Content-Type": "text/html; charset=UTF-8",
- "Date": httputil.format_timestamp(time.gmtime()),
- })
+ "Server": "TornadoServer/%s" % tornado.version,
+ "Content-Type": "text/html; charset=UTF-8",
+ "Date": httputil.format_timestamp(time.gmtime()),
+ })
self.set_default_headers()
if not self.request.supports_http_1_1():
if self.request.headers.get("Connection") == "Keep-Alive":
except NameError:
xrange = range # py3
+
class WebSocketHandler(tornado.web.RequestHandler):
"""Subclass this class to create a basic WebSocket handler.
return WebSocketProtocol13.compute_accept_value(
self.request.headers.get("Sec-Websocket-Key"))
-
def _accept_connection(self):
subprotocol_header = ''
subprotocols = self.request.headers.get("Sec-WebSocket-Protocol", '')
scheme = {'ws': 'http', 'wss': 'https'}[scheme]
request.url = scheme + sep + rest
request.headers.update({
- 'Upgrade': 'websocket',
- 'Connection': 'Upgrade',
- 'Sec-WebSocket-Key': self.key,
- 'Sec-WebSocket-Version': '13',
- })
+ 'Upgrade': 'websocket',
+ 'Connection': 'Upgrade',
+ 'Sec-WebSocket-Key': self.key,
+ 'Sec-WebSocket-Version': '13',
+ })
super(_WebSocketClientConnection, self).__init__(
io_loop, None, request, lambda: None, lambda response: None,
def _on_close(self):
self.on_message(None)
-
def _handle_1xx(self, code):
assert code == 101
assert self.headers['Upgrade'].lower() == 'websocket'