return args
def _on_authentication_verified(self, callback, response):
- if response.error or b("is_valid:true") not in response.body:
+ if response.error or b"is_valid:true" not in response.body:
gen_log.warning("Invalid OpenID response: %s", response.error or
response.body)
callback(None)
if response.error:
raise Exception("Could not get request token")
request_token = _oauth_parse_response(response.body)
- data = (base64.b64encode(request_token["key"]) + b("|") +
+ data = (base64.b64encode(request_token["key"]) + b"|" +
base64.b64encode(request_token["secret"]))
self.set_cookie("_oauth_request_token", data)
args = dict(oauth_token=request_token["key"])
def _oauth_get_user(self, access_token, callback):
callback = self.async_callback(self._parse_user_response, callback)
self.twitter_request(
- "/users/show/" + escape.native_str(access_token[b("screen_name")]),
+ "/users/show/" + escape.native_str(access_token[b"screen_name"]),
access_token=access_token, callback=callback)
def _parse_user_response(self, callback, user):
key_elems = [escape.utf8(consumer_token["secret"])]
key_elems.append(escape.utf8(token["secret"] if token else ""))
- key = b("&").join(key_elems)
+ key = b"&".join(key_elems)
hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1)
return binascii.b2a_base64(hash.digest())[:-1]
base_string = "&".join(_oauth_escape(e) for e in base_elems)
key_elems = [escape.utf8(urllib_parse.quote(consumer_token["secret"], safe='~'))]
key_elems.append(escape.utf8(urllib_parse.quote(token["secret"], safe='~') if token else ""))
- key = b("&").join(key_elems)
+ key = b"&".join(key_elems)
hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1)
return binascii.b2a_base64(hash.digest())[:-1]
def _oauth_parse_response(body):
p = escape.parse_qs(body, keep_blank_values=False)
- token = dict(key=p[b("oauth_token")][0], secret=p[b("oauth_token_secret")][0])
+ token = dict(key=p[b"oauth_token"][0], secret=p[b"oauth_token_secret"][0])
# Add the extra parameters the Provider included to the token
- special = (b("oauth_token"), b("oauth_token_secret"))
+ special = (b"oauth_token", b"oauth_token_secret")
token.update((k, p[k][0]) for k in p if k not in special)
return token
# Save stack context here, outside of any request. This keeps
# contexts from one request from leaking into the next.
self._header_callback = stack_context.wrap(self._on_headers)
- self.stream.read_until(b("\r\n\r\n"), self._header_callback)
+ self.stream.read_until(b"\r\n\r\n", self._header_callback)
self._write_callback = None
def close(self):
# Use a try/except instead of checking stream.closed()
# directly, because in some cases the stream doesn't discover
# that it's closed until you try to read from it.
- self.stream.read_until(b("\r\n\r\n"), self._header_callback)
+ self.stream.read_until(b"\r\n\r\n", self._header_callback)
except iostream.StreamClosedError:
self.close()
if content_length > self.stream.max_buffer_size:
raise _BadRequestException("Content-Length too long")
if headers.get("Expect") == "100-continue":
- self.stream.write(b("HTTP/1.1 100 (Continue)\r\n\r\n"))
+ self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n")
self.stream.read_bytes(content_length, self._on_request_body)
return
# xmpp). I think we're also supposed to handle backslash-escapes
# here but I'll save that until we see a client that uses them
# in the wild.
- if boundary.startswith(b('"')) and boundary.endswith(b('"')):
+ if boundary.startswith(b'"') and boundary.endswith(b'"'):
boundary = boundary[1:-1]
- final_boundary_index = data.rfind(b("--") + boundary + b("--"))
+ final_boundary_index = data.rfind(b"--" + boundary + b"--")
if final_boundary_index == -1:
gen_log.warning("Invalid multipart/form-data: no final boundary")
return
- parts = data[:final_boundary_index].split(b("--") + boundary + b("\r\n"))
+ parts = data[:final_boundary_index].split(b"--" + boundary + b"\r\n")
for part in parts:
if not part:
continue
- eoh = part.find(b("\r\n\r\n"))
+ eoh = part.find(b"\r\n\r\n")
if eoh == -1:
gen_log.warning("multipart/form-data missing headers")
continue
headers = HTTPHeaders.parse(part[:eoh].decode("utf-8"))
disp_header = headers.get("Content-Disposition", "")
disposition, disp_params = _parse_header(disp_header)
- if disposition != "form-data" or not part.endswith(b("\r\n")):
+ if disposition != "form-data" or not part.endswith(b"\r\n"):
gen_log.warning("Invalid multipart/form-data")
continue
value = part[eoh + 4:-2]
def _consume(self, loc):
if loc == 0:
- return b("")
+ return b""
_merge_prefix(self._read_buffer, loc)
self._read_buffer_size -= loc
return self._read_buffer.popleft()
if prefix:
deque.appendleft(type(prefix[0])().join(prefix))
if not deque:
- deque.appendleft(b(""))
+ deque.appendleft(b"")
def doctests():
def wake(self):
try:
- self.writer.send(b("x"))
+ self.writer.send(b"x")
except (IOError, socket.error):
pass
def wake(self):
try:
- self.writer.write(b("x"))
+ self.writer.write(b"x")
except IOError:
pass
username = self.request.auth_username
password = self.request.auth_password or ''
if username is not None:
- auth = utf8(username) + b(":") + utf8(password)
- self.request.headers["Authorization"] = (b("Basic ") +
+ auth = utf8(username) + b":" + utf8(password)
+ self.request.headers["Authorization"] = (b"Basic " +
base64.b64encode(auth))
if self.request.user_agent:
self.request.headers["User-Agent"] = self.request.user_agent
request_lines = [utf8("%s %s HTTP/1.1" % (self.request.method,
req_path))]
for k, v in self.request.headers.get_all():
- line = utf8(k) + b(": ") + utf8(v)
- if b('\n') in line:
+ line = utf8(k) + b": " + utf8(v)
+ if b'\n' in line:
raise ValueError('Newline in header: ' + repr(line))
request_lines.append(line)
- self.stream.write(b("\r\n").join(request_lines) + b("\r\n\r\n"))
+ self.stream.write(b"\r\n".join(request_lines) + b"\r\n\r\n")
if self.request.body is not None:
self.stream.write(self.request.body)
- self.stream.read_until_regex(b("\r?\n\r?\n"), self._on_headers)
+ self.stream.read_until_regex(b"\r?\n\r?\n", self._on_headers)
def _release(self):
if self.release_callback is not None:
assert match
code = int(match.group(1))
if 100 <= code < 200:
- self.stream.read_until_regex(b("\r?\n\r?\n"), self._on_headers)
+ self.stream.read_until_regex(b"\r?\n\r?\n", self._on_headers)
return
else:
self.code = code
if self.request.method == "HEAD" or self.code == 304:
# HEAD requests and 304 responses never have content, even
# though they may have content-length headers
- self._on_body(b(""))
+ self._on_body(b"")
return
if 100 <= self.code < 200 or self.code == 204:
# These response codes never have bodies
content_length not in (None, 0)):
raise ValueError("Response with code %d should not have body" %
self.code)
- self._on_body(b(""))
+ self._on_body(b"")
return
if (self.request.use_gzip and
self._decompressor = GzipDecompressor()
if self.headers.get("Transfer-Encoding") == "chunked":
self.chunks = []
- self.stream.read_until(b("\r\n"), self._on_chunk_length)
+ self.stream.read_until(b"\r\n", self._on_chunk_length)
elif content_length is not None:
self.stream.read_bytes(content_length, self._on_body)
else:
# all the data has been decompressed, so we don't need to
# decompress again in _on_body
self._decompressor = None
- self._on_body(b('').join(self.chunks))
+ self._on_body(b''.join(self.chunks))
else:
self.stream.read_bytes(length + 2, # chunk ends with \r\n
self._on_chunk_data)
def _on_chunk_data(self, data):
- assert data[-2:] == b("\r\n")
+ assert data[-2:] == b"\r\n"
chunk = data[:-2]
if self._decompressor:
chunk = self._decompressor.decompress(chunk)
self.request.streaming_callback(chunk)
else:
self.chunks.append(chunk)
- self.stream.read_until(b("\r\n"), self._on_chunk_length)
+ self.stream.read_until(b"\r\n", self._on_chunk_length)
# match_hostname was added to the standard library ssl module in python 3.2.
self.finish(user)
def _oauth_get_user(self, access_token, callback):
- if access_token != dict(key=b('uiop'), secret=b('5678')):
+ if access_token != dict(key=b'uiop', secret=b'5678'):
raise Exception("incorrect access token %r" % access_token)
callback(dict(email='foo@example.com'))
def handle_stream(self, stream, address):
logging.info("handle_stream")
self.stream = stream
- self.stream.read_until(b("\n"), self.handle_read)
+ self.stream.read_until(b"\n", self.handle_read)
def handle_read(self, data):
logging.info("handle_read")
data = to_unicode(data)
if data == data.upper():
- self.stream.write(b("error\talready capitalized\n"))
+ self.stream.write(b"error\talready capitalized\n")
else:
# data already has \n
self.stream.write(utf8("ok\t%s" % data.upper()))
def handle_connect(self):
logging.info("handle_connect")
self.stream.write(utf8(self.request_data + "\n"))
- self.stream.read_until(b('\n'), callback=self.handle_read)
+ self.stream.read_until(b'\n', callback=self.handle_read)
def handle_read(self, data):
logging.info("handle_read")
def handle_connect(self):
logging.info("handle_connect")
self.stream.write(utf8(self.request_data + "\n"))
- self.stream.read_until(b('\n'), callback=self.handle_read)
+ self.stream.read_until(b'\n', callback=self.handle_read)
def handle_read(self, data):
logging.info("handle_read")
yield gen.Task(stream.connect, ('127.0.0.1', self.port))
stream.write(utf8(request_data + '\n'))
logging.info('reading')
- data = yield gen.Task(stream.read_until, b('\n'))
+ data = yield gen.Task(stream.read_until, b'\n')
logging.info('returning')
stream.close()
callback(self.process_response(data))
tests = [
("<foo>", "<foo>"),
(u("<foo>"), u("<foo>")),
- (b("<foo>"), b("<foo>")),
+ (b"<foo>", b"<foo>"),
("<>&\"", "<>&""),
("&", "&amp;"),
def test_json_decode(self):
# json_decode accepts both bytes and unicode, but strings it returns
# are always unicode.
- self.assertEqual(json_decode(b('"foo"')), u("foo"))
+ self.assertEqual(json_decode(b'"foo"'), u("foo"))
self.assertEqual(json_decode(u('"foo"')), u("foo"))
# Non-ascii bytes are interpreted as utf8
# accept bytes as well as long as they are utf8.
self.assertEqual(json_decode(json_encode(u("\u00e9"))), u("\u00e9"))
self.assertEqual(json_decode(json_encode(utf8(u("\u00e9")))), u("\u00e9"))
- self.assertRaises(UnicodeDecodeError, json_encode, b("\xe9"))
+ self.assertRaises(UnicodeDecodeError, json_encode, b"\xe9")
client = AsyncHTTPClient(io_loop=io_loop)
response = yield gen.Task(client.fetch, self.get_argument('url'))
response.rethrow()
- self.finish(b("got response: ") + response.body)
+ self.finish(b"got response: " + response.body)
class GenExceptionHandler(RequestHandler):
def test_sequence_handler(self):
response = self.fetch('/sequence')
- self.assertEqual(response.body, b("123"))
+ self.assertEqual(response.body, b"123")
def test_task_handler(self):
response = self.fetch('/task?url=%s' % url_escape(self.get_url('/sequence')))
- self.assertEqual(response.body, b("got response: 123"))
+ self.assertEqual(response.body, b"got response: 123")
def test_exception_handler(self):
# Make sure we get an error and not a timeout
def test_yield_exception_handler(self):
response = self.fetch('/yield_exception')
- self.assertEqual(response.body, b('ok'))
+ self.assertEqual(response.body, b'ok')
response = self.fetch("/hello")
self.assertEqual(response.code, 200)
self.assertEqual(response.headers["Content-Type"], "text/plain")
- self.assertEqual(response.body, b("Hello world!"))
+ self.assertEqual(response.body, b"Hello world!")
self.assertEqual(int(response.request_time), 0)
response = self.fetch("/hello?name=Ben")
- self.assertEqual(response.body, b("Hello Ben!"))
+ self.assertEqual(response.body, b"Hello Ben!")
def test_streaming_callback(self):
# streaming_callback is also tested in test_chunked
response = self.fetch("/hello",
streaming_callback=chunks.append)
# with streaming_callback, data goes to the callback and not response.body
- self.assertEqual(chunks, [b("Hello world!")])
+ self.assertEqual(chunks, [b"Hello world!"])
self.assertFalse(response.body)
def test_post(self):
response = self.fetch("/post", method="POST",
body="arg1=foo&arg2=bar")
self.assertEqual(response.code, 200)
- self.assertEqual(response.body, b("Post arg1: foo, arg2: bar"))
+ self.assertEqual(response.body, b"Post arg1: foo, arg2: bar")
def test_chunked(self):
response = self.fetch("/chunk")
- self.assertEqual(response.body, b("asdfqwer"))
+ self.assertEqual(response.body, b"asdfqwer")
chunks = []
response = self.fetch("/chunk",
streaming_callback=chunks.append)
- self.assertEqual(chunks, [b("asdf"), b("qwer")])
+ self.assertEqual(chunks, [b"asdf", b"qwer"])
self.assertFalse(response.body)
def test_chunked_close(self):
2
0
-""").replace(b("\n"), b("\r\n")), callback=stream.close)
+""").replace(b"\n", b"\r\n"), callback=stream.close)
def accept_callback(conn, address):
# fake an HTTP server using chunked encoding where the final chunks
# and connection close all happen at once
stream = IOStream(conn, io_loop=self.io_loop)
- stream.read_until(b("\r\n\r\n"),
+ stream.read_until(b"\r\n\r\n",
functools.partial(write_response, stream))
netutil.add_accept_handler(sock, accept_callback, self.io_loop)
self.http_client.fetch("http://127.0.0.1:%d/" % port, self.stop)
resp = self.wait()
resp.rethrow()
- self.assertEqual(resp.body, b("12"))
+ self.assertEqual(resp.body, b"12")
self.io_loop.remove_handler(sock.fileno())
def test_streaming_stack_context(self):
def streaming_cb(chunk):
chunks.append(chunk)
- if chunk == b('qwer'):
+ if chunk == b'qwer':
1 / 0
with ExceptionStackContext(error_handler):
self.fetch('/chunk', streaming_callback=streaming_cb)
- self.assertEqual(chunks, [b('asdf'), b('qwer')])
+ self.assertEqual(chunks, [b'asdf', b'qwer'])
self.assertEqual(1, len(exc_info))
self.assertIs(exc_info[0][0], ZeroDivisionError)
def test_basic_auth(self):
self.assertEqual(self.fetch("/auth", auth_username="Aladdin",
auth_password="open sesame").body,
- b("Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ=="))
+ b"Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==")
def test_follow_redirect(self):
response = self.fetch("/countdown/2", follow_redirects=False)
response = self.fetch("/countdown/2")
self.assertEqual(200, response.code)
self.assertTrue(response.effective_url.endswith("/countdown/0"))
- self.assertEqual(b("Zero"), response.body)
+ self.assertEqual(b"Zero", response.body)
def test_credentials_in_url(self):
url = self.get_url("/auth").replace("http://", "http://me:secret@")
self.http_client.fetch(url, self.stop)
response = self.wait()
- self.assertEqual(b("Basic ") + base64.b64encode(b("me:secret")),
+ self.assertEqual(b"Basic " + base64.b64encode(b"me:secret"),
response.body)
def test_body_encoding(self):
unicode_body = u("\xe9")
- byte_body = binascii.a2b_hex(b("e9"))
+ byte_body = binascii.a2b_hex(b"e9")
# unicode string in body gets converted to utf8
response = self.fetch("/echopost", method="POST", body=unicode_body,
streaming_callback=streaming_callback)
self.assertEqual(len(first_line), 1)
self.assertRegexpMatches(first_line[0], 'HTTP/1.[01] 200 OK\r\n')
- self.assertEqual(chunks, [b('asdf'), b('qwer')])
+ self.assertEqual(chunks, [b'asdf', b'qwer'])
def test_header_callback_stack_context(self):
exc_info = []
defaults=defaults)
client.fetch(self.get_url('/user_agent'), callback=self.stop)
response = self.wait()
- self.assertEqual(response.body, b('TestDefaultUserAgent'))
+ self.assertEqual(response.body, b'TestDefaultUserAgent')
def test_304_with_content_length(self):
# According to the spec 304 responses SHOULD NOT include
def test_ssl(self):
response = self.fetch('/')
- self.assertEqual(response.body, b("Hello world"))
+ self.assertEqual(response.body, b"Hello world")
def test_large_post(self):
response = self.fetch('/',
method='POST',
body='A' * 5000)
- self.assertEqual(response.body, b("Got 5000 bytes in POST"))
+ self.assertEqual(response.body, b"Got 5000 bytes in POST")
def test_non_ssl_request(self):
# Make sure the server closes the connection when it gets a non-ssl
def _on_connect(self):
self.stream.write(self.__next_request)
self.__next_request = None
- self.stream.read_until(b("\r\n\r\n"), self._on_headers)
+ self.stream.read_until(b"\r\n\r\n", self._on_headers)
# This test is also called from wsgi_test
None, self.stop,
1024 * 1024)
conn.set_request(
- b("\r\n").join(headers +
+ b"\r\n".join(headers +
[utf8("Content-Length: %d\r\n" % len(body))]) +
- b("\r\n") + body)
+ b"\r\n" + body)
response = self.wait()
client.close()
response.rethrow()
# Encodings here are tricky: Headers are latin1, bodies can be
# anything (we use utf8 by default).
response = self.raw_fetch([
- b("POST /multipart HTTP/1.0"),
- b("Content-Type: multipart/form-data; boundary=1234567890"),
- b("X-Header-encoding-test: \xe9"),
+ b"POST /multipart HTTP/1.0",
+ b"Content-Type: multipart/form-data; boundary=1234567890",
+ b"X-Header-encoding-test: \xe9",
],
- b("\r\n").join([
- b("Content-Disposition: form-data; name=argument"),
- b(""),
+ b"\r\n".join([
+ b"Content-Disposition: form-data; name=argument",
+ b"",
u("\u00e1").encode("utf-8"),
- b("--1234567890"),
+ b"--1234567890",
u('Content-Disposition: form-data; name="files"; filename="\u00f3"').encode("utf8"),
- b(""),
+ b"",
u("\u00fa").encode("utf-8"),
- b("--1234567890--"),
- b(""),
+ b"--1234567890--",
+ b"",
]))
data = json_decode(response.body)
self.assertEqual(u("\u00e9"), data["header"])
stream = IOStream(socket.socket(), io_loop=self.io_loop)
stream.connect(("localhost", self.get_http_port()), callback=self.stop)
self.wait()
- stream.write(b("\r\n").join([b("POST /hello HTTP/1.1"),
- b("Content-Length: 1024"),
- b("Expect: 100-continue"),
- b("Connection: close"),
- b("\r\n")]), callback=self.stop)
+ stream.write(b"\r\n".join([b"POST /hello HTTP/1.1",
+ b"Content-Length: 1024",
+ b"Expect: 100-continue",
+ b"Connection: close",
+ b"\r\n"]), callback=self.stop)
self.wait()
- stream.read_until(b("\r\n\r\n"), self.stop)
+ stream.read_until(b"\r\n\r\n", self.stop)
data = self.wait()
- self.assertTrue(data.startswith(b("HTTP/1.1 100 ")), data)
- stream.write(b("a") * 1024)
- stream.read_until(b("\r\n"), self.stop)
+ self.assertTrue(data.startswith(b"HTTP/1.1 100 "), data)
+ stream.write(b"a" * 1024)
+ stream.read_until(b"\r\n", self.stop)
first_line = self.wait()
- self.assertTrue(first_line.startswith(b("HTTP/1.1 200")), first_line)
- stream.read_until(b("\r\n\r\n"), self.stop)
+ self.assertTrue(first_line.startswith(b"HTTP/1.1 200"), first_line)
+ stream.read_until(b"\r\n\r\n", self.stop)
header_data = self.wait()
headers = HTTPHeaders.parse(native_str(header_data.decode('latin1')))
stream.read_bytes(int(headers["Content-Length"]), self.stop)
body = self.wait()
- self.assertEqual(body, b("Got 1024 bytes in POST"))
+ self.assertEqual(body, b"Got 1024 bytes in POST")
stream.close()
stream = IOStream(socket.socket(socket.AF_UNIX), io_loop=self.io_loop)
stream.connect(sockfile, self.stop)
self.wait()
- stream.write(b("GET /hello HTTP/1.0\r\n\r\n"))
- stream.read_until(b("\r\n"), self.stop)
+ stream.write(b"GET /hello HTTP/1.0\r\n\r\n")
+ stream.read_until(b"\r\n", self.stop)
response = self.wait()
- self.assertEqual(response, b("HTTP/1.0 200 OK\r\n"))
- stream.read_until(b("\r\n\r\n"), self.stop)
+ self.assertEqual(response, b"HTTP/1.0 200 OK\r\n")
+ stream.read_until(b"\r\n\r\n", self.stop)
headers = HTTPHeaders.parse(self.wait().decode('latin1'))
stream.read_bytes(int(headers["Content-Length"]), self.stop)
body = self.wait()
- self.assertEqual(body, b("Hello world"))
+ self.assertEqual(body, b"Hello world")
stream.close()
server.stop()
UnixSocketTest = unittest.skipIf(
def setUp(self):
super(KeepAliveTest, self).setUp()
- self.http_version = b('HTTP/1.1')
+ self.http_version = b'HTTP/1.1'
def tearDown(self):
# We just closed the client side of the socket; let the IOLoop run
self.wait()
def read_headers(self):
- self.stream.read_until(b('\r\n'), self.stop)
+ self.stream.read_until(b'\r\n', self.stop)
first_line = self.wait()
- self.assertTrue(first_line.startswith(self.http_version + b(' 200')), first_line)
- self.stream.read_until(b('\r\n\r\n'), self.stop)
+ self.assertTrue(first_line.startswith(self.http_version + b' 200'), first_line)
+ self.stream.read_until(b'\r\n\r\n', self.stop)
header_bytes = self.wait()
headers = HTTPHeaders.parse(header_bytes.decode('latin1'))
return headers
headers = self.read_headers()
self.stream.read_bytes(int(headers['Content-Length']), self.stop)
body = self.wait()
- self.assertEqual(b('Hello world'), body)
+ self.assertEqual(b'Hello world', body)
def close(self):
self.stream.close()
def test_two_requests(self):
self.connect()
- self.stream.write(b('GET / HTTP/1.1\r\n\r\n'))
+ self.stream.write(b'GET / HTTP/1.1\r\n\r\n')
self.read_response()
- self.stream.write(b('GET / HTTP/1.1\r\n\r\n'))
+ self.stream.write(b'GET / HTTP/1.1\r\n\r\n')
self.read_response()
self.close()
def test_request_close(self):
self.connect()
- self.stream.write(b('GET / HTTP/1.1\r\nConnection: close\r\n\r\n'))
+ self.stream.write(b'GET / HTTP/1.1\r\nConnection: close\r\n\r\n')
self.read_response()
self.stream.read_until_close(callback=self.stop)
data = self.wait()
# keepalive is supported for http 1.0 too, but it's opt-in
def test_http10(self):
- self.http_version = b('HTTP/1.0')
+ self.http_version = b'HTTP/1.0'
self.connect()
- self.stream.write(b('GET / HTTP/1.0\r\n\r\n'))
+ self.stream.write(b'GET / HTTP/1.0\r\n\r\n')
self.read_response()
self.stream.read_until_close(callback=self.stop)
data = self.wait()
self.close()
def test_http10_keepalive(self):
- self.http_version = b('HTTP/1.0')
+ self.http_version = b'HTTP/1.0'
self.connect()
- self.stream.write(b('GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n'))
+ self.stream.write(b'GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n')
self.read_response()
- self.stream.write(b('GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n'))
+ self.stream.write(b'GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n')
self.read_response()
self.close()
def test_pipelined_requests(self):
self.connect()
- self.stream.write(b('GET / HTTP/1.1\r\n\r\nGET / HTTP/1.1\r\n\r\n'))
+ self.stream.write(b'GET / HTTP/1.1\r\n\r\nGET / HTTP/1.1\r\n\r\n')
self.read_response()
self.read_response()
self.close()
def test_pipelined_cancel(self):
self.connect()
- self.stream.write(b('GET / HTTP/1.1\r\n\r\nGET / HTTP/1.1\r\n\r\n'))
+ self.stream.write(b'GET / HTTP/1.1\r\n\r\nGET / HTTP/1.1\r\n\r\n')
# only read once
self.read_response()
self.close()
def test_cancel_during_download(self):
self.connect()
- self.stream.write(b('GET /large HTTP/1.1\r\n\r\n'))
+ self.stream.write(b'GET /large HTTP/1.1\r\n\r\n')
self.read_headers()
self.stream.read_bytes(1024, self.stop)
self.wait()
def test_finish_while_closed(self):
self.connect()
- self.stream.write(b('GET /finish_on_close HTTP/1.1\r\n\r\n'))
+ self.stream.write(b'GET /finish_on_close HTTP/1.1\r\n\r\n')
self.read_headers()
self.close()
Content-Disposition: form-data; name="files"; filename="ab.txt"
Foo
---1234--""").replace(b("\n"), b("\r\n"))
+--1234--""").replace(b"\n", b"\r\n")
args = {}
files = {}
- parse_multipart_form_data(b("1234"), data, args, files)
+ parse_multipart_form_data(b"1234", data, args, files)
file = files["files"][0]
self.assertEqual(file["filename"], "ab.txt")
- self.assertEqual(file["body"], b("Foo"))
+ self.assertEqual(file["body"], b"Foo")
def test_unquoted_names(self):
# quotes are optional unless special characters are present
Content-Disposition: form-data; name=files; filename=ab.txt
Foo
---1234--""").replace(b("\n"), b("\r\n"))
+--1234--""").replace(b"\n", b"\r\n")
args = {}
files = {}
- parse_multipart_form_data(b("1234"), data, args, files)
+ parse_multipart_form_data(b"1234", data, args, files)
file = files["files"][0]
self.assertEqual(file["filename"], "ab.txt")
- self.assertEqual(file["body"], b("Foo"))
+ self.assertEqual(file["body"], b"Foo")
def test_special_filenames(self):
filenames = ['a;b.txt',
data = utf8(data.replace("\n", "\r\n"))
args = {}
files = {}
- parse_multipart_form_data(b("1234"), data, args, files)
+ parse_multipart_form_data(b"1234", data, args, files)
file = files["files"][0]
self.assertEqual(file["filename"], filename)
- self.assertEqual(file["body"], b("Foo"))
+ self.assertEqual(file["body"], b"Foo")
def test_boundary_starts_and_ends_with_quotes(self):
data = b('''\
Content-Disposition: form-data; name="files"; filename="ab.txt"
Foo
---1234--''').replace(b("\n"), b("\r\n"))
+--1234--''').replace(b"\n", b"\r\n")
args = {}
files = {}
- parse_multipart_form_data(b('"1234"'), data, args, files)
+ parse_multipart_form_data(b'"1234"', data, args, files)
file = files["files"][0]
self.assertEqual(file["filename"], "ab.txt")
- self.assertEqual(file["body"], b("Foo"))
+ self.assertEqual(file["body"], b"Foo")
def test_missing_headers(self):
data = b('''\
--1234
Foo
---1234--''').replace(b("\n"), b("\r\n"))
+--1234--''').replace(b"\n", b"\r\n")
args = {}
files = {}
with ExpectLog(gen_log, "multipart/form-data missing headers"):
- parse_multipart_form_data(b("1234"), data, args, files)
+ parse_multipart_form_data(b"1234", data, args, files)
self.assertEqual(files, {})
def test_invalid_content_disposition(self):
Content-Disposition: invalid; name="files"; filename="ab.txt"
Foo
---1234--''').replace(b("\n"), b("\r\n"))
+--1234--''').replace(b"\n", b"\r\n")
args = {}
files = {}
with ExpectLog(gen_log, "Invalid multipart/form-data"):
- parse_multipart_form_data(b("1234"), data, args, files)
+ parse_multipart_form_data(b"1234", data, args, files)
self.assertEqual(files, {})
def test_line_does_not_end_with_correct_line_break(self):
--1234
Content-Disposition: form-data; name="files"; filename="ab.txt"
-Foo--1234--''').replace(b("\n"), b("\r\n"))
+Foo--1234--''').replace(b"\n", b"\r\n")
args = {}
files = {}
with ExpectLog(gen_log, "Invalid multipart/form-data"):
- parse_multipart_form_data(b("1234"), data, args, files)
+ parse_multipart_form_data(b"1234", data, args, files)
self.assertEqual(files, {})
def test_content_disposition_header_without_name_parameter(self):
Content-Disposition: form-data; filename="ab.txt"
Foo
---1234--""").replace(b("\n"), b("\r\n"))
+--1234--""").replace(b"\n", b"\r\n")
args = {}
files = {}
with ExpectLog(gen_log, "multipart/form-data value missing name"):
- parse_multipart_form_data(b("1234"), data, args, files)
+ parse_multipart_form_data(b"1234", data, args, files)
self.assertEqual(files, {})
def test_data_after_final_boundary(self):
Foo
--1234--
-""").replace(b("\n"), b("\r\n"))
+""").replace(b"\n", b"\r\n")
args = {}
files = {}
- parse_multipart_form_data(b("1234"), data, args, files)
+ parse_multipart_form_data(b"1234", data, args, files)
file = files["files"][0]
self.assertEqual(file["filename"], "ab.txt")
- self.assertEqual(file["body"], b("Foo"))
+ self.assertEqual(file["body"], b"Foo")
class HTTPHeadersTest(unittest.TestCase):
stream = self._make_client_iostream()
stream.connect(('localhost', self.get_http_port()), callback=self.stop)
self.wait()
- stream.write(b("GET / HTTP/1.0\r\n\r\n"))
+ stream.write(b"GET / HTTP/1.0\r\n\r\n")
stream.read_until_close(self.stop)
data = self.wait()
- self.assertTrue(data.startswith(b("HTTP/1.0 200")))
- self.assertTrue(data.endswith(b("Hello")))
+ self.assertTrue(data.startswith(b"HTTP/1.0 200"))
+ self.assertTrue(data.endswith(b"Hello"))
def test_read_zero_bytes(self):
self.stream = self._make_client_iostream()
self.stream.connect(("localhost", self.get_http_port()),
callback=self.stop)
self.wait()
- self.stream.write(b("GET / HTTP/1.0\r\n\r\n"))
+ self.stream.write(b"GET / HTTP/1.0\r\n\r\n")
# normal read
self.stream.read_bytes(9, self.stop)
data = self.wait()
- self.assertEqual(data, b("HTTP/1.0 "))
+ self.assertEqual(data, b"HTTP/1.0 ")
# zero bytes
self.stream.read_bytes(0, self.stop)
data = self.wait()
- self.assertEqual(data, b(""))
+ self.assertEqual(data, b"")
# another normal read
self.stream.read_bytes(3, self.stop)
data = self.wait()
- self.assertEqual(data, b("200"))
+ self.assertEqual(data, b"200")
self.stream.close()
def write_callback():
written[0] = True
self.stop()
- stream.write(b("GET / HTTP/1.0\r\nConnection: close\r\n\r\n"),
+ stream.write(b"GET / HTTP/1.0\r\nConnection: close\r\n\r\n",
callback=write_callback)
self.assertTrue(not connected[0])
# by the time the write has flushed, the connection callback has
stream.read_until_close(self.stop)
data = self.wait()
- self.assertTrue(data.endswith(b("Hello")))
+ self.assertTrue(data.endswith(b"Hello"))
stream.close()
# Attempting to write zero bytes should run the callback without
# going into an infinite loop.
server, client = self.make_iostream_pair()
- server.write(b(''), callback=self.stop)
+ server.write(b'', callback=self.stop)
self.wait()
# As a side effect, the stream is now listening for connection
# close (if it wasn't already), but is not listening for writes
# Clear ExceptionStackContext so IOStream catches error
with NullContext():
server.read_bytes(1, callback=lambda data: 1 / 0)
- client.write(b("1"))
+ client.write(b"1")
self.wait()
self.assertTrue(isinstance(server.error, ZeroDivisionError))
finally:
self.stop()
server.read_bytes(6, callback=final_callback,
streaming_callback=streaming_callback)
- client.write(b("1234"))
+ client.write(b"1234")
self.wait(condition=lambda: chunks)
- client.write(b("5678"))
+ client.write(b"5678")
self.wait(condition=lambda: final_called)
- self.assertEqual(chunks, [b("1234"), b("56")])
+ self.assertEqual(chunks, [b"1234", b"56"])
# the rest of the last chunk is still in the buffer
server.read_bytes(2, callback=self.stop)
data = self.wait()
- self.assertEqual(data, b("78"))
+ self.assertEqual(data, b"78")
finally:
server.close()
client.close()
self.stop()
client.read_until_close(callback=callback,
streaming_callback=callback)
- server.write(b("1234"))
+ server.write(b"1234")
self.wait()
- server.write(b("5678"))
+ server.write(b"5678")
self.wait()
server.close()
self.wait()
- self.assertEqual(chunks, [b("1234"), b("5678"), b("")])
+ self.assertEqual(chunks, [b"1234", b"5678", b""])
finally:
server.close()
client.close()
server, client = self.make_iostream_pair()
try:
client.set_close_callback(self.stop)
- server.write(b("12"))
+ server.write(b"12")
chunks = []
def callback1(data):
chunks.append(data)
client.read_bytes(1, callback1)
self.wait() # stopped by close_callback
- self.assertEqual(chunks, [b("1"), b("2")])
+ self.assertEqual(chunks, [b"1", b"2"])
finally:
server.close()
client.close()
# OS socket buffer, so make it small.
server, client = self.make_iostream_pair(read_chunk_size=256)
try:
- server.write(b("A") * 512)
+ server.write(b"A" * 512)
client.read_bytes(256, self.stop)
data = self.wait()
- self.assertEqual(b("A") * 256, data)
+ self.assertEqual(b"A" * 256, data)
server.close()
# Allow the close to propagate to the client side of the
# connection. Using add_callback instead of add_timeout
self.wait()
client.read_bytes(256, self.stop)
data = self.wait()
- self.assertEqual(b("A") * 256, data)
+ self.assertEqual(b"A" * 256, data)
finally:
server.close()
client.close()
server, client = self.make_iostream_pair()
client.set_close_callback(self.stop)
try:
- server.write(b("1234"))
+ server.write(b"1234")
server.close()
self.wait()
client.read_until_close(self.stop)
data = self.wait()
- self.assertEqual(data, b("1234"))
+ self.assertEqual(data, b"1234")
finally:
server.close()
client.close()
server, client = self.make_iostream_pair()
client.set_close_callback(self.stop)
try:
- server.write(b("1234"))
+ server.write(b"1234")
server.close()
self.wait()
streaming_data = []
client.read_until_close(self.stop,
streaming_callback=streaming_data.append)
data = self.wait()
- self.assertEqual(b(''), data)
- self.assertEqual(b('').join(streaming_data), b("1234"))
+ self.assertEqual(b'', data)
+ self.assertEqual(b''.join(streaming_data), b"1234")
finally:
server.close()
client.close()
pass
NUM_KB = 4096
for i in range(NUM_KB):
- client.write(b("A") * 1024)
- client.write(b("\r\n"))
- server.read_until(b("\r\n"), self.stop)
+ client.write(b"A" * 1024)
+ client.write(b"\r\n")
+ server.read_until(b"\r\n", self.stop)
data = self.wait()
self.assertEqual(len(data), NUM_KB * 1024 + 2)
finally:
# Regression test for a bug that was introduced in 2.3
# where the IOStream._close_callback would never be called
# if there were pending reads.
- OK = b("OK\r\n")
+ OK = b"OK\r\n"
server, client = self.make_iostream_pair()
client.set_close_callback(self.stop)
try:
server.write(OK)
- client.read_until(b("\r\n"), self.stop)
+ client.read_until(b"\r\n", self.stop)
res = self.wait()
self.assertEqual(res, OK)
server.close()
- client.read_until(b("\r\n"), lambda x: x)
+ client.read_until(b"\r\n", lambda x: x)
# If _close_callback (self.stop) is not called,
# an AssertionError: Async operation timed out after 5 seconds
# will be raised.
try:
# Start a read that will be fullfilled asynchronously.
server.read_bytes(1, lambda data: None)
- client.write(b('a'))
+ client.write(b'a')
# Stub out read_from_fd to make it fail.
def fake_read_from_fd():
os.close(server.socket.fileno())
rs = PipeIOStream(r, io_loop=self.io_loop)
ws = PipeIOStream(w, io_loop=self.io_loop)
- ws.write(b("hel"))
- ws.write(b("lo world"))
+ ws.write(b"hel")
+ ws.write(b"lo world")
- rs.read_until(b(' '), callback=self.stop)
+ rs.read_until(b' ', callback=self.stop)
data = self.wait()
- self.assertEqual(data, b("hello "))
+ self.assertEqual(data, b"hello ")
rs.read_bytes(3, self.stop)
data = self.wait()
- self.assertEqual(data, b("wor"))
+ self.assertEqual(data, b"wor")
ws.close()
rs.read_until_close(self.stop)
data = self.wait()
- self.assertEqual(data, b("ld"))
+ self.assertEqual(data, b"ld")
rs.close()
TestPipeIOStream = skipIfNonUnix(TestPipeIOStream)
name = tornado.locale.LOCALE_NAMES['es_LA']['name']
self.assertTrue(isinstance(name, unicode_type))
self.assertEqual(name, u('Espa\u00f1ol'))
- self.assertEqual(utf8(name), b('Espa\xc3\xb1ol'))
+ self.assertEqual(utf8(name), b'Espa\xc3\xb1ol')
class LogFormatterTest(unittest.TestCase):
- LINE_RE = re.compile(b("\x01\\[E [0-9]{6} [0-9]{2}:[0-9]{2}:[0-9]{2} log_test:[0-9]+\\]\x02 (.*)"))
+ LINE_RE = re.compile(b"\x01\\[E [0-9]{6} [0-9]{2}:[0-9]{2}:[0-9]{2} log_test:[0-9]+\\]\x02 (.*)")
def setUp(self):
self.formatter = LogFormatter(color=False)
def test_basic_logging(self):
self.logger.error("foo")
- self.assertEqual(self.get_output(), b("foo"))
+ self.assertEqual(self.get_output(), b"foo")
def test_bytes_logging(self):
with ignore_bytes_warning():
# This will be "\xe9" on python 2 or "b'\xe9'" on python 3
- self.logger.error(b("\xe9"))
- self.assertEqual(self.get_output(), utf8(repr(b("\xe9"))))
+ self.logger.error(b"\xe9")
+ self.assertEqual(self.get_output(), utf8(repr(b"\xe9")))
def test_utf8_logging(self):
self.logger.error(u("\u00e9").encode("utf8"))
stdout=Subprocess.STREAM, stderr=subprocess.STDOUT,
io_loop=self.io_loop)
self.addCleanup(lambda: os.kill(subproc.pid, signal.SIGTERM))
- subproc.stdout.read_until(b('>>> '), self.stop)
+ subproc.stdout.read_until(b'>>> ', self.stop)
self.wait()
- subproc.stdin.write(b("print('hello')\n"))
- subproc.stdout.read_until(b('\n'), self.stop)
+ subproc.stdin.write(b"print('hello')\n")
+ subproc.stdout.read_until(b'\n', self.stop)
data = self.wait()
- self.assertEqual(data, b("hello\n"))
+ self.assertEqual(data, b"hello\n")
- subproc.stdout.read_until(b(">>> "), self.stop)
+ subproc.stdout.read_until(b">>> ", self.stop)
self.wait()
- subproc.stdin.write(b("raise SystemExit\n"))
+ subproc.stdin.write(b"raise SystemExit\n")
subproc.stdout.read_until_close(self.stop)
data = self.wait()
- self.assertEqual(data, b(""))
+ self.assertEqual(data, b"")
def test_sigchild(self):
# Twisted's SIGCHLD handler and Subprocess's conflict with each other.
response = self.fetch("/chunk", use_gzip=False,
headers={"Accept-Encoding": "gzip"})
self.assertEqual(response.headers["Content-Encoding"], "gzip")
- self.assertNotEqual(response.body, b("asdfqwer"))
+ self.assertNotEqual(response.body, b"asdfqwer")
# Our test data gets bigger when gzipped. Oops. :)
self.assertEqual(len(response.body), 34)
f = gzip.GzipFile(mode="r", fileobj=response.buffer)
- self.assertEqual(f.read(), b("asdfqwer"))
+ self.assertEqual(f.read(), b"asdfqwer")
def test_max_redirects(self):
response = self.fetch("/countdown/5", max_redirects=3)
self.http_client.fetch(url, self.stop, allow_ipv6=True)
response = self.wait()
- self.assertEqual(response.body, b("Hello world!"))
+ self.assertEqual(response.body, b"Hello world!")
def test_multiple_content_length_accepted(self):
response = self.fetch("/content_length?value=2,2")
- self.assertEqual(response.body, b("ok"))
+ self.assertEqual(response.body, b"ok")
response = self.fetch("/content_length?value=2,%202,2")
- self.assertEqual(response.body, b("ok"))
+ self.assertEqual(response.body, b"ok")
with ExpectLog(gen_log, "uncaught exception"):
response = self.fetch("/content_length?value=2,4")
self.assertEqual(response.code, 200)
self.assertEqual(response.headers["content-length"], "2")
self.assertEqual(response.headers["access-control-allow-origin"], "*")
- self.assertEqual(response.body, b("ok"))
+ self.assertEqual(response.body, b"ok")
def test_no_content(self):
response = self.fetch("/no_content")
self.assertEqual(response.code, 599)
def test_host_header(self):
- host_re = re.compile(b("^localhost:[0-9]+$"))
+ host_re = re.compile(b"^localhost:[0-9]+$")
response = self.fetch("/host_echo")
self.assertTrue(host_re.match(response.body))
def respond_100(self, request):
self.request = request
self.request.connection.stream.write(
- b("HTTP/1.1 100 CONTINUE\r\n\r\n"),
+ b"HTTP/1.1 100 CONTINUE\r\n\r\n",
self.respond_200)
def respond_200(self):
self.request.connection.stream.write(
- b("HTTP/1.1 200 OK\r\nContent-Length: 1\r\n\r\nA"),
+ b"HTTP/1.1 200 OK\r\nContent-Length: 1\r\n\r\nA",
self.request.connection.stream.close)
def get_app(self):
def test_100_continue(self):
res = self.fetch('/')
- self.assertEqual(res.body, b('A'))
+ self.assertEqual(res.body, b'A')
self.http_client.fetch(self.get_url('/'), self.handle_response)
self.wait()
self.assertEqual(self.response.code, 500)
- self.assertTrue(b('got expected exception') in self.response.body)
+ self.assertTrue(b'got expected exception' in self.response.body)
def handle_response(self, response):
self.response = response
def test_simple(self):
template = Template("Hello {{ name }}!")
self.assertEqual(template.generate(name="Ben"),
- b("Hello Ben!"))
+ b"Hello Ben!")
def test_bytes(self):
template = Template("Hello {{ name }}!")
self.assertEqual(template.generate(name=utf8("Ben")),
- b("Hello Ben!"))
+ b"Hello Ben!")
def test_expressions(self):
template = Template("2 + 2 = {{ 2 + 2 }}")
- self.assertEqual(template.generate(), b("2 + 2 = 4"))
+ self.assertEqual(template.generate(), b"2 + 2 = 4")
def test_comment(self):
template = Template("Hello{# TODO i18n #} {{ name }}!")
self.assertEqual(template.generate(name=utf8("Ben")),
- b("Hello Ben!"))
+ b"Hello Ben!")
def test_include(self):
loader = DictLoader({
"header.html": "header text",
})
self.assertEqual(loader.load("index.html").generate(),
- b("header text\nbody text"))
+ b"header text\nbody text")
def test_extends(self):
loader = DictLoader({
""",
})
self.assertEqual(loader.load("page.html").generate(),
- b("<title>page title</title>\n<body>page body</body>\n"))
+ b"<title>page title</title>\n<body>page body</body>\n")
def test_relative_load(self):
loader = DictLoader({
"b/3.html": "ok",
})
self.assertEqual(loader.load("a/1.html").generate(),
- b("ok"))
+ b"ok")
def test_escaping(self):
self.assertRaises(ParseError, lambda: Template("{{"))
self.assertRaises(ParseError, lambda: Template("{%"))
- self.assertEqual(Template("{{!").generate(), b("{{"))
- self.assertEqual(Template("{%!").generate(), b("{%"))
+ self.assertEqual(Template("{{!").generate(), b"{{")
+ self.assertEqual(Template("{%!").generate(), b"{%")
self.assertEqual(Template("{{ 'expr' }} {{!jquery expr}}").generate(),
- b("expr {{jquery expr}}"))
+ b"expr {{jquery expr}}")
def test_unicode_template(self):
template = Template(utf8(u("\u00e9")))
def test_custom_namespace(self):
loader = DictLoader({"test.html": "{{ inc(5) }}"}, namespace={"inc": lambda x: x + 1})
- self.assertEqual(loader.load("test.html").generate(), b("6"))
+ self.assertEqual(loader.load("test.html").generate(), b"6")
def test_apply(self):
def upper(s):
return s.upper()
template = Template(utf8("{% apply upper %}foo{% end %}"))
- self.assertEqual(template.generate(upper=upper), b("FOO"))
+ self.assertEqual(template.generate(upper=upper), b"FOO")
def test_unicode_apply(self):
def upper(s):
def test_if(self):
template = Template(utf8("{% if x > 4 %}yes{% else %}no{% end %}"))
- self.assertEqual(template.generate(x=5), b("yes"))
- self.assertEqual(template.generate(x=3), b("no"))
+ self.assertEqual(template.generate(x=5), b"yes")
+ self.assertEqual(template.generate(x=3), b"no")
def test_if_empty_body(self):
template = Template(utf8("{% if True %}{% else %}{% end %}"))
- self.assertEqual(template.generate(), b(""))
+ self.assertEqual(template.generate(), b"")
def test_try(self):
template = Template(utf8("""{% try %}
{% else %}-else
{% finally %}-finally
{% end %}"""))
- self.assertEqual(template.generate(x=1), b("\ntry\n-else\n-finally\n"))
- self.assertEqual(template.generate(x=0), b("\ntry-except\n-finally\n"))
+ self.assertEqual(template.generate(x=1), b"\ntry\n-else\n-finally\n")
+ self.assertEqual(template.generate(x=0), b"\ntry-except\n-finally\n")
def test_comment_directive(self):
template = Template(utf8("{% comment blah blah %}foo"))
- self.assertEqual(template.generate(), b("foo"))
+ self.assertEqual(template.generate(), b"foo")
def test_break_continue(self):
template = Template(utf8("""\
{% end %}"""))
result = template.generate()
# remove extraneous whitespace
- result = b('').join(result.split())
- self.assertEqual(result, b("013456"))
+ result = b''.join(result.split())
+ self.assertEqual(result, b"013456")
def test_break_outside_loop(self):
try:
loader = DictLoader(self.templates, autoescape=None)
name = "Bobby <table>s"
self.assertEqual(loader.load("escaped.html").generate(name=name),
- b("Bobby <table>s"))
+ b"Bobby <table>s")
self.assertEqual(loader.load("unescaped.html").generate(name=name),
- b("Bobby <table>s"))
+ b"Bobby <table>s")
self.assertEqual(loader.load("default.html").generate(name=name),
- b("Bobby <table>s"))
+ b"Bobby <table>s")
self.assertEqual(loader.load("include.html").generate(name=name),
b("escaped: Bobby <table>s\n"
loader = DictLoader(self.templates, autoescape="xhtml_escape")
name = "Bobby <table>s"
self.assertEqual(loader.load("escaped.html").generate(name=name),
- b("Bobby <table>s"))
+ b"Bobby <table>s")
self.assertEqual(loader.load("unescaped.html").generate(name=name),
- b("Bobby <table>s"))
+ b"Bobby <table>s")
self.assertEqual(loader.load("default.html").generate(name=name),
- b("Bobby <table>s"))
+ b"Bobby <table>s")
self.assertEqual(loader.load("include.html").generate(name=name),
b("escaped: Bobby <table>s\n"
loader = DictLoader(self.templates)
name = "<script>"
self.assertEqual(loader.load("escaped_block.html").generate(name=name),
- b("base: <script>"))
+ b"base: <script>")
self.assertEqual(loader.load("unescaped_block.html").generate(name=name),
- b("base: <script>"))
+ b"base: <script>")
def test_extended_block(self):
loader = DictLoader(self.templates)
def render(name):
return loader.load(name).generate(name="<script>")
self.assertEqual(render("escaped_extends_unescaped.html"),
- b("base: <script>"))
+ b"base: <script>")
self.assertEqual(render("escaped_overrides_unescaped.html"),
- b("extended: <script>"))
+ b"extended: <script>")
self.assertEqual(render("unescaped_extends_escaped.html"),
- b("base: <script>"))
+ b"base: <script>")
self.assertEqual(render("unescaped_overrides_escaped.html"),
- b("extended: <script>"))
+ b"extended: <script>")
def test_raw_expression(self):
loader = DictLoader(self.templates)
return loader.load(template).generate(py_escape=py_escape,
name=name)
self.assertEqual(render("foo.py", "<html>"),
- b("s = '<html>'\n"))
+ b"s = '<html>'\n")
self.assertEqual(render("foo.py", "';sys.exit()"),
b("""s = "';sys.exit()"\n"""))
self.assertEqual(render("foo.py", ["not a string"]),
class UnicodeLiteralTest(unittest.TestCase):
def test_unicode_escapes(self):
- self.assertEqual(utf8(u('\u00e9')), b('\xc3\xa9'))
+ self.assertEqual(utf8(u('\u00e9')), b'\xc3\xa9')
class SecureCookieTest(unittest.TestCase):
def test_round_trip(self):
handler = CookieTestRequestHandler()
- handler.set_secure_cookie('foo', b('bar'))
- self.assertEqual(handler.get_secure_cookie('foo'), b('bar'))
+ handler.set_secure_cookie('foo', b'bar')
+ self.assertEqual(handler.get_secure_cookie('foo'), b'bar')
def test_cookie_tampering_future_timestamp(self):
handler = CookieTestRequestHandler()
# this string base64-encodes to '12345678'
- handler.set_secure_cookie('foo', binascii.a2b_hex(b('d76df8e7aefc')))
+ handler.set_secure_cookie('foo', binascii.a2b_hex(b'd76df8e7aefc'))
cookie = handler._cookies['foo']
match = re.match(b(r'12345678\|([0-9]+)\|([0-9a-f]+)'), cookie)
self.assertTrue(match)
# works)
self.assertEqual(
_create_signature(handler.application.settings["cookie_secret"],
- 'foo', '1234', b('5678') + timestamp),
+ 'foo', '1234', b'5678' + timestamp),
sig)
# tamper with the cookie
handler._cookies['foo'] = utf8('1234|5678%s|%s' % (
# Secure cookies accept arbitrary data (which is base64 encoded).
# Note that normal cookies accept only a subset of ascii.
handler = CookieTestRequestHandler()
- handler.set_secure_cookie('foo', b('\xe9'))
- self.assertEqual(handler.get_secure_cookie('foo'), b('\xe9'))
+ handler.set_secure_cookie('foo', b'\xe9')
+ self.assertEqual(handler.get_secure_cookie('foo'), b'\xe9')
class CookieTest(WebTestCase):
# to ensure that everything gets encoded correctly
self.set_cookie("str", "asdf")
self.set_cookie("unicode", u("qwer"))
- self.set_cookie("bytes", b("zxcv"))
+ self.set_cookie("bytes", b"zxcv")
class GetCookieHandler(RequestHandler):
def get(self):
def test_get_cookie(self):
response = self.fetch("/get", headers={"Cookie": "foo=bar"})
- self.assertEqual(response.body, b("bar"))
+ self.assertEqual(response.body, b"bar")
response = self.fetch("/get", headers={"Cookie": 'foo="bar"'})
- self.assertEqual(response.body, b("bar"))
+ self.assertEqual(response.body, b"bar")
response = self.fetch("/get", headers={"Cookie": "/=exception;"})
- self.assertEqual(response.body, b("default"))
+ self.assertEqual(response.body, b"default")
def test_set_cookie_domain(self):
response = self.fetch("/set_domain")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
s.connect(("localhost", self.get_http_port()))
self.stream = IOStream(s, io_loop=self.io_loop)
- self.stream.write(b("GET / HTTP/1.0\r\n\r\n"))
+ self.stream.write(b"GET / HTTP/1.0\r\n\r\n")
self.wait()
def on_handler_waiting(self):
raise Exception("Didn't get expected exception")
except ValueError as e:
if "Unsafe header value" in str(e):
- self.finish(b("ok"))
+ self.finish(b"ok")
else:
raise
'/decode_arg/foo')
self.assertEqual(self.app.reverse_url('decode_arg', 42),
'/decode_arg/42')
- self.assertEqual(self.app.reverse_url('decode_arg', b('\xe9')),
+ self.assertEqual(self.app.reverse_url('decode_arg', b'\xe9'),
'/decode_arg/%E9')
self.assertEqual(self.app.reverse_url('decode_arg', u('\u00e9')),
'/decode_arg/%C3%A9')
def test_header_injection(self):
response = self.fetch("/header_injection")
- self.assertEqual(response.body, b("ok"))
+ self.assertEqual(response.body, b"ok")
def test_get_argument(self):
response = self.fetch("/get_argument?foo=bar")
- self.assertEqual(response.body, b("bar"))
+ self.assertEqual(response.body, b"bar")
response = self.fetch("/get_argument?foo=")
- self.assertEqual(response.body, b(""))
+ self.assertEqual(response.body, b"")
response = self.fetch("/get_argument")
- self.assertEqual(response.body, b("default"))
+ self.assertEqual(response.body, b"default")
def test_no_gzip(self):
response = self.fetch('/get_argument')
]
def test_flow_control(self):
- self.assertEqual(self.fetch("/flow_control").body, b("123"))
+ self.assertEqual(self.fetch("/flow_control").body, b"123")
def test_empty_flush(self):
response = self.fetch("/empty_flush")
- self.assertEqual(response.body, b("ok"))
+ self.assertEqual(response.body, b"ok")
class ErrorResponseTest(WebTestCase):
with ExpectLog(app_log, "Uncaught exception"):
response = self.fetch("/default")
self.assertEqual(response.code, 500)
- self.assertTrue(b("500: Internal Server Error") in response.body)
+ self.assertTrue(b"500: Internal Server Error" in response.body)
response = self.fetch("/default?status=503")
self.assertEqual(response.code, 503)
- self.assertTrue(b("503: Service Unavailable") in response.body)
+ self.assertTrue(b"503: Service Unavailable" in response.body)
def test_write_error(self):
with ExpectLog(app_log, "Uncaught exception"):
response = self.fetch("/write_error")
self.assertEqual(response.code, 500)
- self.assertEqual(b("Exception: ZeroDivisionError"), response.body)
+ self.assertEqual(b"Exception: ZeroDivisionError", response.body)
response = self.fetch("/write_error?status=503")
self.assertEqual(response.code, 503)
- self.assertEqual(b("Status: 503"), response.body)
+ self.assertEqual(b"Status: 503", response.body)
def test_get_error_html(self):
with ExpectLog(app_log, "Uncaught exception"):
response = self.fetch("/get_error_html")
self.assertEqual(response.code, 500)
- self.assertEqual(b("Exception: ZeroDivisionError"), response.body)
+ self.assertEqual(b"Exception: ZeroDivisionError", response.body)
response = self.fetch("/get_error_html?status=503")
self.assertEqual(response.code, 503)
- self.assertEqual(b("Status: 503"), response.body)
+ self.assertEqual(b"Status: 503", response.body)
def test_failed_write_error(self):
with ExpectLog(app_log, "Uncaught exception"):
response = self.fetch("/failed_write_error")
self.assertEqual(response.code, 500)
- self.assertEqual(b(""), response.body)
+ self.assertEqual(b"", response.body)
wsgi_safe.append(ErrorResponseTest)
class StaticFileTest(WebTestCase):
def test_static_files(self):
response = self.fetch('/robots.txt')
- self.assertTrue(b("Disallow: /") in response.body)
+ self.assertTrue(b"Disallow: /" in response.body)
response = self.fetch('/static/robots.txt')
- self.assertTrue(b("Disallow: /") in response.body)
+ self.assertTrue(b"Disallow: /" in response.body)
def test_static_url(self):
response = self.fetch("/static_url/robots.txt")
- self.assertEqual(response.body, b("/static/robots.txt?v=f71d2"))
+ self.assertEqual(response.body, b"/static/robots.txt?v=f71d2")
def test_absolute_static_url(self):
response = self.fetch("/abs_static_url/robots.txt")
def test_serve(self):
response = self.fetch("/static/foo.42.txt")
- self.assertEqual(response.body, b("bar"))
+ self.assertEqual(response.body, b"bar")
def test_static_url(self):
with ExpectLog(gen_log, "Could not open static file", required=False):
response = self.fetch("/static_url/foo.txt")
- self.assertEqual(response.body, b("/static/foo.42.txt"))
+ self.assertEqual(response.body, b"/static/foo.42.txt")
wsgi_safe.append(CustomStaticFileTest)
[("/baz", HostMatchingTest.Handler, {"reply": "[2]"})])
response = self.fetch("/foo")
- self.assertEqual(response.body, b("wildcard"))
+ self.assertEqual(response.body, b"wildcard")
response = self.fetch("/bar")
self.assertEqual(response.code, 404)
response = self.fetch("/baz")
self.assertEqual(response.code, 404)
response = self.fetch("/foo", headers={'Host': 'www.example.com'})
- self.assertEqual(response.body, b("[0]"))
+ self.assertEqual(response.body, b"[0]")
response = self.fetch("/bar", headers={'Host': 'www.example.com'})
- self.assertEqual(response.body, b("[1]"))
+ self.assertEqual(response.body, b"[1]")
response = self.fetch("/baz", headers={'Host': 'www.example.com'})
- self.assertEqual(response.body, b("[2]"))
+ self.assertEqual(response.body, b"[2]")
wsgi_safe.append(HostMatchingTest)
def test_named_urlspec_groups(self):
response = self.fetch("/str/foo")
- self.assertEqual(response.body, b("foo"))
+ self.assertEqual(response.body, b"foo")
response = self.fetch("/unicode/bar")
- self.assertEqual(response.body, b("bar"))
+ self.assertEqual(response.body, b"bar")
wsgi_safe.append(NamedURLSpecGroupsTest)
response = self.fetch("/")
self.assertEqual(response.code, 682)
self.assertEqual(response.reason, "Foo")
- self.assertIn(b('682: Foo'), response.body)
+ self.assertIn(b'682: Foo', response.body)
def test_httperror_str(self):
self.assertEqual(str(HTTPError(682, reason="Foo")), "HTTP 682: Foo")
status = "200 OK"
response_headers = [("Content-Type", "text/plain")]
start_response(status, response_headers)
- return [b("Hello world!")]
+ return [b"Hello world!"]
def get_app(self):
return WSGIContainer(validator(self.wsgi_app))
def test_simple(self):
response = self.fetch("/")
- self.assertEqual(response.body, b("Hello world!"))
+ self.assertEqual(response.body, b"Hello world!")
class WSGIApplicationTest(AsyncHTTPTestCase):
def test_simple(self):
response = self.fetch("/")
- self.assertEqual(response.body, b("Hello world!"))
+ self.assertEqual(response.body, b"Hello world!")
def test_path_quoting(self):
response = self.fetch("/path/foo%20bar%C3%A9")
js = ''.join('<script src="' + escape.xhtml_escape(p) +
'" type="text/javascript"></script>'
for p in paths)
- sloc = html.rindex(b('</body>'))
- html = html[:sloc] + utf8(js) + b('\n') + html[sloc:]
+ sloc = html.rindex(b'</body>')
+ html = html[:sloc] + utf8(js) + b'\n' + html[sloc:]
if js_embed:
- js = b('<script type="text/javascript">\n//<![CDATA[\n') + \
- b('\n').join(js_embed) + b('\n//]]>\n</script>')
- sloc = html.rindex(b('</body>'))
- html = html[:sloc] + js + b('\n') + html[sloc:]
+ js = b'<script type="text/javascript">\n//<![CDATA[\n' + \
+ b'\n'.join(js_embed) + b'\n//]]>\n</script>'
+ sloc = html.rindex(b'</body>')
+ html = html[:sloc] + js + b'\n' + html[sloc:]
if css_files:
paths = []
unique_paths = set()
css = ''.join('<link href="' + escape.xhtml_escape(p) + '" '
'type="text/css" rel="stylesheet"/>'
for p in paths)
- hloc = html.index(b('</head>'))
- html = html[:hloc] + utf8(css) + b('\n') + html[hloc:]
+ hloc = html.index(b'</head>')
+ html = html[:hloc] + utf8(css) + b'\n' + html[hloc:]
if css_embed:
- css = b('<style type="text/css">\n') + b('\n').join(css_embed) + \
- b('\n</style>')
- hloc = html.index(b('</head>'))
- html = html[:hloc] + css + b('\n') + html[hloc:]
+ css = b'<style type="text/css">\n' + b'\n'.join(css_embed) + \
+ b'\n</style>'
+ hloc = html.index(b'</head>')
+ html = html[:hloc] + css + b'\n' + html[hloc:]
if html_heads:
- hloc = html.index(b('</head>'))
- html = html[:hloc] + b('').join(html_heads) + b('\n') + html[hloc:]
+ hloc = html.index(b'</head>')
+ html = html[:hloc] + b''.join(html_heads) + b'\n' + html[hloc:]
if html_bodies:
- hloc = html.index(b('</body>'))
- html = html[:hloc] + b('').join(html_bodies) + b('\n') + html[hloc:]
+ hloc = html.index(b'</body>')
+ html = html[:hloc] + b''.join(html_bodies) + b'\n' + html[hloc:]
self.finish(html)
def render_string(self, template_name, **kwargs):
if self.application._wsgi:
raise Exception("WSGI applications do not support flush()")
- chunk = b("").join(self._write_buffer)
+ chunk = b"".join(self._write_buffer)
self._write_buffer = []
if not self._headers_written:
self._headers_written = True
else:
for transform in self._transforms:
chunk = transform.transform_chunk(chunk, include_footers)
- headers = b("")
+ headers = b""
# Ignore the chunk and only write the headers for HEAD requests
if self.request.method == "HEAD":
lines = [utf8(self.request.version + " " +
str(self._status_code) +
" " + reason)]
- lines.extend([(utf8(n) + b(": ") + utf8(v)) for n, v in
+ lines.extend([(utf8(n) + b": " + utf8(v)) for n, v in
itertools.chain(self._headers.items(), self._list_headers)])
if hasattr(self, "_new_cookie"):
for cookie in self._new_cookie.values():
lines.append(utf8("Set-Cookie: " + cookie.OutputString(None)))
- return b("\r\n").join(lines) + b("\r\n\r\n")
+ return b"\r\n".join(lines) + b"\r\n\r\n"
def _log(self):
"""Logs the current request.
def transform_first_chunk(self, status_code, headers, chunk, finishing):
if 'Vary' in headers:
- headers['Vary'] += b(', Accept-Encoding')
+ headers['Vary'] += b', Accept-Encoding'
else:
- headers['Vary'] = b('Accept-Encoding')
+ headers['Vary'] = b'Accept-Encoding'
if self._gzipping:
ctype = _unicode(headers.get("Content-Type", "")).split(";")[0]
self._gzipping = (ctype in self.CONTENT_TYPES) and \
# Don't write out empty chunks because that means END-OF-STREAM
# with chunked encoding
if block:
- block = utf8("%x" % len(block)) + b("\r\n") + block + b("\r\n")
+ block = utf8("%x" % len(block)) + b"\r\n" + block + b"\r\n"
if finishing:
- block += b("0\r\n\r\n")
+ block += b"0\r\n\r\n"
return block
timestamp = utf8(str(int(time.time())))
value = base64.b64encode(utf8(value))
signature = _create_signature(secret, name, value, timestamp)
- value = b("|").join([value, timestamp, signature])
+ value = b"|".join([value, timestamp, signature])
return value
def decode_signed_value(secret, name, value, max_age_days=31):
if not value:
return None
- parts = utf8(value).split(b("|"))
+ parts = utf8(value).split(b"|")
if len(parts) != 3:
return None
signature = _create_signature(secret, name, parts[0], parts[1])
# here instead of modifying _cookie_signature.
gen_log.warning("Cookie timestamp in future; possible tampering %r", value)
return None
- if parts[1].startswith(b("0")):
+ if parts[1].startswith(b"0"):
gen_log.warning("Tampered cookie %r", value)
return None
try:
def _on_frame_type(self, byte):
frame_type = ord(byte)
if frame_type == 0x00:
- self.stream.read_until(b("\xff"), self._on_end_delimiter)
+ self.stream.read_until(b"\xff", self._on_end_delimiter)
elif frame_type == 0xff:
self.stream.read_bytes(1, self._on_length_indicator)
else:
if isinstance(message, unicode):
message = message.encode("utf-8")
assert isinstance(message, bytes_type)
- self.stream.write(b("\x00") + message + b("\xff"))
+ self.stream.write(b"\x00" + message + b"\xff")
def write_ping(self, data):
"""Send ping frame."""
sha1 = hashlib.sha1()
sha1.update(tornado.escape.utf8(
self.request.headers.get("Sec-Websocket-Key")))
- sha1.update(b("258EAFA5-E914-47DA-95CA-C5AB0DC85B11")) # Magic value
+ sha1.update(b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11") # Magic value
return tornado.escape.native_str(base64.b64encode(sha1.digest()))
def _accept_connection(self):
"""Closes the WebSocket connection."""
if not self.server_terminated:
if not self.stream.closed():
- self._write_frame(True, 0x8, b(""))
+ self._write_frame(True, 0x8, b"")
self.server_terminated = True
if self.client_terminated:
if self._waiting is not None:
app_response = self.wsgi_application(
WSGIContainer.environ(request), start_response)
response.extend(app_response)
- body = b("").join(response)
+ body = b"".join(response)
if hasattr(app_response, "close"):
app_response.close()
if not data:
parts = [escape.utf8("HTTP/1.1 " + data["status"] + "\r\n")]
for key, value in headers:
- parts.append(escape.utf8(key) + b(": ") + escape.utf8(value) + b("\r\n"))
- parts.append(b("\r\n"))
+ parts.append(escape.utf8(key) + b": " + escape.utf8(value) + b"\r\n")
+ parts.append(b"\r\n")
parts.append(body)
- request.write(b("").join(parts))
+ request.write(b"".join(parts))
request.finish()
self._log(status_code, request)