except NameError:
long = int # py3
+
class AuthError(Exception):
pass
else:
FUTURES = (futures.Future, Future)
+
def is_future(x):
return isinstance(x, FUTURES)
moment = Future()
moment.__doc__ = \
-"""A special object which may be yielded to allow the IOLoop to run for
+ """A special object which may be yielded to allow the IOLoop to run for
one iteration.
This is not needed in normal use but it can be helpful in long-running
"""
moment.set_result(None)
+
class Runner(object):
"""Internal implementation of `tornado.gen.engine`.
"yielded unknown object %r" % (yielded,)))
return True
-
def result_callback(self, key):
return stack_context.wrap(_argument_adapter(
functools.partial(self.set_result, key)))
Arguments = collections.namedtuple('Arguments', ['args', 'kwargs'])
+
def _argument_adapter(callback):
"""Returns a function that when invoked runs ``callback`` with one arg.
from tornado import stack_context
from tornado.util import GzipDecompressor
+
class HTTP1ConnectionParameters(object):
"""Parameters for `.HTTP1Connection` and `.HTTP1ServerConnection`.
"""
self.body_timeout = body_timeout
self.use_gzip = use_gzip
+
class HTTP1Connection(httputil.HTTPConnection):
"""Implements the HTTP/1.x protocol.
need_delegate_close = False
try:
header_future = self.stream.read_until_regex(
- b"\r?\n\r?\n",
- max_bytes=self.params.max_header_size)
+ b"\r?\n\r?\n",
+ max_bytes=self.params.max_header_size)
if self.params.header_timeout is None:
header_data = yield header_future
else:
skip_body = False
if self.is_client:
if (self._request_start_line is not None and
- self._request_start_line.method == 'HEAD'):
+ self._request_start_line.method == 'HEAD'):
skip_body = True
code = start_line.code
if code == 304:
yield self._read_message(delegate)
else:
if (headers.get("Expect") == "100-continue" and
- not self._write_finished):
+ not self._write_finished):
self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n")
if not skip_body:
body_future = self._read_body(headers, delegate)
# response, and we're not detached, register a close callback
# on the stream (we didn't need one while we were reading)
if (not self._finish_future.done() and
- self.stream is not None and
- not self.stream.closed()):
+ self.stream is not None and
+ not self.stream.closed()):
self.stream.set_close_callback(self._on_connection_close)
yield self._finish_future
if self.is_client and self._disconnect_on_finish:
def finish(self):
"""Implements `.HTTPConnection.finish`."""
if (self._expected_content_remaining is not None and
- self._expected_content_remaining != 0 and
- not self.stream.closed()):
+ self._expected_content_remaining != 0 and
+ not self.stream.closed()):
self.stream.close()
raise httputil.HTTPOutputException(
"Tried to write %d bytes less than Content-Length" %
self.effective_url = effective_url
if error is None:
if self.code < 200 or self.code >= 300:
- self.error = HTTPError(self.code, message=self.reason,
+ self.error = HTTPError(self.code, message=self.reason,
response=self)
else:
self.error = None
self.address_family = None
# In HTTPServerRequest we want an IP, not a full socket address.
if (self.address_family in (socket.AF_INET, socket.AF_INET6) and
- address is not None):
+ address is not None):
self.remote_ip = address[0]
else:
# Unix (or other) socket; fake the remote address.
self._orig_remote_ip = self.remote_ip
self._orig_protocol = self.protocol
-
def __str__(self):
if self.address_family in (socket.AF_INET, socket.AF_INET6):
return self.remote_ip
from ssl import SSLError
except ImportError:
# ssl is unavailable on app engine.
- class SSLError(Exception): pass
+ class SSLError(Exception):
+ pass
+
class _NormalizedHeaderCache(dict):
"""Dynamic cached mapping of header names to Http-Header-Case.
pass
-
class UnsatisfiableReadError(Exception):
"""Exception raised when a read cannot be satisfied.
# A chunk size that is too close to max_buffer_size can cause
# spurious failures.
self.read_chunk_size = min(read_chunk_size or 65536,
- self.max_buffer_size//2)
+ self.max_buffer_size // 2)
self.error = None
self._read_buffer = collections.deque()
self._write_buffer = collections.deque()
self._connect_future = None
for future in futures:
if (isinstance(self.error, (socket.error, IOError)) and
- errno_from_exception(self.error) in _ERRNO_CONNRESET):
+ errno_from_exception(self.error) in _ERRNO_CONNRESET):
# Treat connection resets as closed connections so
# clients only have to catch one kind of exception
# to avoid logging.
#
# If we've reached target_bytes, we know we're done.
if (target_bytes is not None and
- self._read_buffer_size >= target_bytes):
+ self._read_buffer_size >= target_bytes):
break
# Otherwise, we need to call the more expensive find_read_pos.
def _check_max_bytes(self, delimiter, size):
if (self._read_max_bytes is not None and
- size > self._read_max_bytes):
+ size > self._read_max_bytes):
raise UnsatisfiableReadError(
"delimiter %r not found within %d bytes" % (
delimiter, self._read_max_bytes))
.. versionadded:: 3.3
"""
if (self._read_callback or self._read_future or
- self._write_callback or self._write_future or
- self._connect_callback or self._connect_future or
- self._pending_callbacks or self._closed or
- self._read_buffer or self._write_buffer):
+ self._write_callback or self._write_future or
+ self._connect_callback or self._connect_future or
+ self._pending_callbacks or self._closed or
+ self._read_buffer or self._write_buffer):
raise ValueError("IOStream is not idle; cannot convert to SSL")
if ssl_options is None:
ssl_options = {}
ssl_stream.read_chunk_size = self.read_chunk_size
return future
-
def _handle_connect(self):
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
DEFAULT_FORMAT = '%(color)s[%(levelname)1.1s %(asctime)s %(module)s:%(lineno)d]%(end_color)s %(message)s'
DEFAULT_DATE_FORMAT = '%y%m%d %H:%M:%S'
DEFAULT_COLORS = {
- logging.DEBUG: 4, # Blue
- logging.INFO: 2, # Green
- logging.WARNING: 3, # Yellow
- logging.ERROR: 1, # Red
+ logging.DEBUG: 4, # Blue
+ logging.INFO: 2, # Green
+ logging.WARNING: 3, # Yellow
+ logging.ERROR: 1, # Red
}
def __init__(self, color=True, fmt=DEFAULT_FORMAT,
0, flags)):
af, socktype, proto, canonname, sockaddr = res
if (platform.system() == 'Darwin' and address == 'localhost' and
- af == socket.AF_INET6 and sockaddr[3] != 0):
+ af == socket.AF_INET6 and sockaddr[3] != 0):
# Mac OS X includes a link-local address fe80::1%lo0 in the
# getaddrinfo results for 'localhost'. However, the firewall
# doesn't understand that this is a local address and will
from tornado.platform.windows import set_close_exec
elif 'APPENGINE_RUNTIME' in os.environ:
from tornado.platform.common import Waker
- def set_close_exec(fd): pass
+ def set_close_exec(fd):
+ pass
else:
from tornado.platform.posix import set_close_exec, Waker
except ImportError:
certifi = None
+
def _default_ca_certs():
if certifi is None:
raise Exception("The 'certifi' package is required to use https "
"in simple_httpclient")
return certifi.where()
+
class SimpleAsyncHTTPClient(AsyncHTTPClient):
"""Non-blocking HTTP client with no external dependencies.
if not self.request.allow_nonstandard_methods:
if self.request.method in ("POST", "PATCH", "PUT"):
if (self.request.body is None and
- self.request.body_producer is None):
+ self.request.body_producer is None):
raise AssertionError(
'Body must not be empty for "%s" request'
% self.request.method)
else:
if (self.request.body is not None or
- self.request.body_producer is not None):
+ self.request.body_producer is not None):
raise AssertionError(
'Body must be empty for "%s" request'
% self.request.method)
if self.request.use_gzip:
self.request.headers["Accept-Encoding"] = "gzip"
req_path = ((self.parsed.path or '/') +
- (('?' + self.parsed.query) if self.parsed.query else ''))
+ (('?' + self.parsed.query) if self.parsed.query else ''))
self.stream.set_nodelay(True)
self.connection = HTTP1Connection(
self.stream, True,
if start_read:
self._read_response()
-
def _read_response(self):
# Ensure that any exception raised in read_response ends up in our
# stack context.
_INITIAL_CONNECT_TIMEOUT = 0.3
+
class _Connector(object):
"""A stateless implementation of the "Happy Eyeballs" algorithm.
# ssl is not available on Google App Engine.
ssl = None
+
class TCPServer(object):
r"""A non-blocking, single-threaded TCP server.
if allowed_parents is not None:
if not in_block:
raise ParseError("%s outside %s block" %
- (operator, allowed_parents))
+ (operator, allowed_parents))
if in_block not in allowed_parents:
raise ParseError("%s block cannot be attached to %s block" % (operator, in_block))
body.chunks.append(_IntermediateControlBlock(contents, line))
'Digest realm="%s", nonce="%s", opaque="%s"' %
(realm, nonce, opaque))
+
class CustomReasonHandler(RequestHandler):
def get(self):
self.set_status(200, "Custom reason")
+
class CustomFailReasonHandler(RequestHandler):
def get(self):
self.set_status(400, "Custom reason")
+
@unittest.skipIf(pycurl is None, "pycurl module not present")
class CurlHTTPClientTestCase(AsyncHTTPTestCase):
def setUp(self):
self.io_loop.add_timeout(datetime.timedelta(seconds=0.1),
lambda: future.set_result('asdf'))
result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
- future)
+ future)
self.assertEqual(result, 'asdf')
@gen_test
response = self.fetch('/', method='POST', body='foo=bar')
self.assertEquals(json_decode(response.body), {u('foo'): [u('bar')]})
+
class GzipTest(GzipBaseTest, AsyncHTTPTestCase):
def get_httpserver_options(self):
return dict(gzip=True)
response = self.post_gzip('foo=bar')
self.assertEquals(json_decode(response.body), {u('foo'): [u('bar')]})
+
class GzipUnsupportedTest(GzipBaseTest, AsyncHTTPTestCase):
def test_gzip_unsupported(self):
# Gzip support is opt-in; without it the server fails to parse
self.assertEqual([16, 16, 16, 2], chunks)
def test_compressed_body(self):
- self.fetch_chunk_sizes(body=self.compress(self.BODY),
- headers={'Content-Encoding': 'gzip'})
- # Compression creates irregular boundaries so the assertions
- # in fetch_chunk_sizes are as specific as we can get.
+ self.fetch_chunk_sizes(body=self.compress(self.BODY),
+ headers={'Content-Encoding': 'gzip'})
+ # Compression creates irregular boundaries so the assertions
+ # in fetch_chunk_sizes are as specific as we can get.
def test_chunked_body(self):
def body_producer(write):
return SimpleAsyncHTTPClient(io_loop=self.io_loop)
def test_small_body(self):
- response = self.fetch('/buffered', method='PUT', body=b'a'*4096)
+ response = self.fetch('/buffered', method='PUT', body=b'a' * 4096)
self.assertEqual(response.body, b'4096')
- response = self.fetch('/streaming', method='PUT', body=b'a'*4096)
+ response = self.fetch('/streaming', method='PUT', body=b'a' * 4096)
self.assertEqual(response.body, b'4096')
def test_large_body_buffered(self):
with ExpectLog(gen_log, '.*Content-Length too long'):
- response = self.fetch('/buffered', method='PUT', body=b'a'*10240)
+ response = self.fetch('/buffered', method='PUT', body=b'a' * 10240)
self.assertEqual(response.code, 599)
def test_large_body_buffered_chunked(self):
with ExpectLog(gen_log, '.*chunked body too large'):
response = self.fetch('/buffered', method='PUT',
- body_producer=lambda write: write(b'a'*10240))
+ body_producer=lambda write: write(b'a' * 10240))
self.assertEqual(response.code, 599)
def test_large_body_streaming(self):
with ExpectLog(gen_log, '.*Content-Length too long'):
- response = self.fetch('/streaming', method='PUT', body=b'a'*10240)
+ response = self.fetch('/streaming', method='PUT', body=b'a' * 10240)
self.assertEqual(response.code, 599)
def test_large_body_streaming_chunked(self):
with ExpectLog(gen_log, '.*chunked body too large'):
response = self.fetch('/streaming', method='PUT',
- body_producer=lambda write: write(b'a'*10240))
+ body_producer=lambda write: write(b'a' * 10240))
self.assertEqual(response.code, 599)
def test_large_body_streaming_override(self):
response = self.fetch('/streaming?expected_size=10240', method='PUT',
- body=b'a'*10240)
+ body=b'a' * 10240)
self.assertEqual(response.body, b'10240')
def test_large_body_streaming_chunked_override(self):
response = self.fetch('/streaming?expected_size=10240', method='PUT',
- body_producer=lambda write: write(b'a'*10240))
+ body_producer=lambda write: write(b'a' * 10240))
self.assertEqual(response.body, b'10240')
@gen_test
# Use a raw stream so we can make sure it's all on one connection.
stream.write(b'PUT /streaming?expected_size=10240 HTTP/1.1\r\n'
b'Content-Length: 10240\r\n\r\n')
- stream.write(b'a'*10240)
+ stream.write(b'a' * 10240)
headers, response = yield gen.Task(read_stream_body, stream)
self.assertEqual(response, b'10240')
# Without the ?expected_size parameter, we get the old default value
import ssl
import sys
+
def _server_ssl_options():
return dict(
certfile=os.path.join(os.path.dirname(__file__), 'test.crt'),
keyfile=os.path.join(os.path.dirname(__file__), 'test.key'),
)
+
class HelloHandler(RequestHandler):
def get(self):
self.write("Hello")
server.close()
client.close()
+
class TestIOStreamWebHTTP(TestIOStreamWebMixin, AsyncHTTPTestCase):
def _make_client_iostream(self):
return IOStream(socket.socket(), io_loop=self.io_loop)
self.stream.write("\n")
return result
+
def main():
# The -W command-line option does not work in a virtualenv with
# python 3 (as of virtualenv 1.7), so configure warnings
# and AF_INET6 because some installations do not have AF_INET6.
AF1, AF2 = 1, 2
+
class TestTCPServer(TCPServer):
def __init__(self, family):
super(TestTCPServer, self).__init__()
for stream in self.streams:
stream.close()
+
class TCPClientTest(AsyncTestCase):
def setUp(self):
super(TCPClientTest, self).setUp()
if socket.AF_INET6 not in families:
self.skipTest("localhost does not resolve to ipv6")
-
@gen_test
def do_test_connect(self, family, host):
port = self.start_server(family)
#}{{i
}}{% end
%}""",
- })
+ })
self.assertEqual(loader.load("foo.txt").generate(items=range(5)),
b"0, 1, 2, 3, 4")
sig = match.group(2)
self.assertEqual(
_create_signature_v1(handler.application.settings["cookie_secret"],
- 'foo', '12345678', timestamp),
+ 'foo', '12345678', timestamp),
sig)
# shifting digits from payload to timestamp doesn't alter signature
# (this is not desirable behavior, just confirming that that's how it
# works)
self.assertEqual(
_create_signature_v1(handler.application.settings["cookie_secret"],
- 'foo', '1234', b'5678' + timestamp),
+ 'foo', '1234', b'5678' + timestamp),
sig)
# tamper with the cookie
handler._cookies['foo'] = utf8('1234|5678%s|%s' % (
"""
return getattr(self.orig_method, name)
+
class AsyncTestCase(unittest.TestCase):
"""`~unittest.TestCase` subclass for testing `.IOLoop`-based
asynchronous code.
except Exception as e:
self._handle_request_exception(e)
if (self._prepared_future is not None and
- not self._prepared_future.done()):
+ not self._prepared_future.done()):
# In case we failed before setting _prepared_future, do it
# now (to unblock the HTTP server). Note that this is not
# in a finally block to avoid GC issues prior to Python 3.4.
StaticFileHandler.reset()
self.handler = self.handler_class(self.application, self.request,
- **self.handler_kwargs)
+ **self.handler_kwargs)
transforms = [t(self.request) for t in self.application.transforms]
if self.stream_request_body:
return self.handler._prepared_future
-
class HTTPError(Exception):
"""An exception that will turn into an HTTP error response.
# A leading version number in decimal with no leading zeros, followed by a pipe.
_signed_value_version_re = re.compile(br"^([1-9][0-9]*)\|(.*)$")
-def decode_signed_value(secret, name, value, max_age_days=31, clock=None,min_version=None):
+
+def decode_signed_value(secret, name, value, max_age_days=31, clock=None, min_version=None):
if clock is None:
clock = time.time
if min_version is None:
else:
return None
+
def _decode_signed_value_v1(secret, name, value, max_age_days, clock):
parts = utf8(value).split(b"|")
if len(parts) != 3:
field_value = rest[:n]
# In python 3, indexing bytes returns small integers; we must
# use a slice to get a byte string as in python 2.
- if rest[n:n+1] != b'|':
+ if rest[n:n + 1] != b'|':
raise ValueError("malformed v2 signed value field")
- rest = rest[n+1:]
+ rest = rest[n + 1:]
return field_value, rest
rest = value[2:] # remove version number
try:
hash.update(utf8(part))
return utf8(hash.hexdigest())
+
def _create_signature_v2(secret, s):
hash = hmac.new(utf8(secret), digestmod=hashlib.sha256)
hash.update(utf8(s))
return utf8(hash.hexdigest())
+
def _unquote_or_none(s):
"""None-safe wrapper around url_unescape to handle unamteched optional
groups correctly.
return unmasked.tostring()
if (os.environ.get('TORNADO_NO_EXTENSION') or
- os.environ.get('TORNADO_EXTENSION') == '0'):
+ os.environ.get('TORNADO_EXTENSION') == '0'):
# These environment variables exist to make it easier to do performance
# comparisons; they are not guaranteed to remain supported in the future.
_websocket_mask = _websocket_mask_python
def finish(self):
if (self._expected_content_remaining is not None and
- self._expected_content_remaining != 0):
+ self._expected_content_remaining != 0):
self._error = httputil.HTTPOutputException(
"Tried to write %d bytes less than Content-Length" %
self._expected_content_remaining)